Commit a43a3b04 authored by Ultima's avatar Ultima
Browse files

Improved .diag method in diagonal_operator.

Implemented interface for slicing methods in slicing distributors. fftw_distributor is now a special case.
Implemented flatten routines for d2o.
parent afca20a4
...@@ -33,7 +33,7 @@ ...@@ -33,7 +33,7 @@
""" """
from __future__ import division from __future__ import division
from nifty import * # version 0.8.0 from nifty import * # version 0.8.0
about.warnings.off()
# some signal space; e.g., a two-dimensional regular grid # some signal space; e.g., a two-dimensional regular grid
x_space = rg_space([128, 128]) # define signal space x_space = rg_space([128, 128]) # define signal space
...@@ -61,10 +61,10 @@ d = R(s) + n # compute data ...@@ -61,10 +61,10 @@ d = R(s) + n # compute data
j = R.adjoint_times(N.inverse_times(d)) # define information source j = R.adjoint_times(N.inverse_times(d)) # define information source
D = propagator_operator(S=S, N=N, R=R) # define information propagator D = propagator_operator(S=S, N=N, R=R) # define information propagator
m = D(j, W=S, tol=1E-3, note=True) # reconstruct map m = D(j, W=S, tol=1E-1, note=True) # reconstruct map
s.plot(title="signal", save = 'plot_s.png') # plot signal #s.plot(title="signal", save = 'plot_s.png') # plot signal
d_ = field(x_space, val=d.val, target=k_space) #d_ = field(x_space, val=d.val, target=k_space)
d_.plot(title="data", vmin=s.min(), vmax=s.max(), save = 'plot_d.png') # plot data #d_.plot(title="data", vmin=s.min(), vmax=s.max(), save = 'plot_d.png') # plot data
m.plot(title="reconstructed map", vmin=s.min(), vmax=s.max(), save = 'plot_m.png') # plot map #m.plot(title="reconstructed map", vmin=s.min(), vmax=s.max(), save = 'plot_m.png') # plot map
...@@ -51,6 +51,7 @@ except(ImportError): ...@@ -51,6 +51,7 @@ except(ImportError):
found['h5py_parallel'] = False found['h5py_parallel'] = False
class distributed_data_object(object): class distributed_data_object(object):
""" """
...@@ -342,14 +343,22 @@ class distributed_data_object(object): ...@@ -342,14 +343,22 @@ class distributed_data_object(object):
def __div__(self, other): def __div__(self, other):
return self.__builtin_helper__(self.get_local_data().__div__, other) return self.__builtin_helper__(self.get_local_data().__div__, other)
def __truediv__(self, other):
return self.__div__(other)
def __rdiv__(self, other): def __rdiv__(self, other):
return self.__builtin_helper__(self.get_local_data().__rdiv__, other) return self.__builtin_helper__(self.get_local_data().__rdiv__, other)
def __rtruediv__(self, other):
return self.__rdiv__(other)
def __idiv__(self, other): def __idiv__(self, other):
return self.__builtin_helper__(self.get_local_data().__idiv__, return self.__builtin_helper__(self.get_local_data().__idiv__,
other, other,
inplace = True) inplace = True)
def __itruediv(self, other):
return self.__idiv__(other)
def __floordiv__(self, other): def __floordiv__(self, other):
return self.__builtin_helper__(self.get_local_data().__floordiv__, return self.__builtin_helper__(self.get_local_data().__floordiv__,
other) other)
...@@ -702,6 +711,17 @@ class distributed_data_object(object): ...@@ -702,6 +711,17 @@ class distributed_data_object(object):
self.distributor.inject(self.data, to_slices, data, from_slices) self.distributor.inject(self.data, to_slices, data, from_slices)
def flatten(self, inplace = False):
flat_shape = (np.prod(self.shape),)
temp_d2o = self.copy_empty(global_shape = flat_shape)
flat_data = self.distributor.flatten(self.data, inplace = inplace)
temp_d2o.set_local_data(data = flat_data)
if inplace == True:
self = temp_d2o
return self
else:
return temp_d2o
def _get_distributor(self, distribution_strategy): def _get_distributor(self, distribution_strategy):
''' '''
...@@ -714,6 +734,7 @@ class distributed_data_object(object): ...@@ -714,6 +734,7 @@ class distributed_data_object(object):
distributor_dict={ distributor_dict={
'fftw': _fftw_distributor, 'fftw': _fftw_distributor,
'equal': _equal_distributor,
'not': _not_distributor 'not': _not_distributor
} }
if not distributor_dict.has_key(distribution_strategy): if not distributor_dict.has_key(distribution_strategy):
...@@ -814,8 +835,8 @@ class distributed_data_object(object): ...@@ -814,8 +835,8 @@ class distributed_data_object(object):
class _fftw_distributor(object): class _slicing_distributor(object):
def __init__(self, global_data=None, global_shape=None, dtype=None, def __init__(self, slicer, global_data=None, global_shape=None, dtype=None,
comm=MPI.COMM_WORLD, alias=None, path=None): comm=MPI.COMM_WORLD, alias=None, path=None):
if alias != None: if alias != None:
...@@ -876,9 +897,14 @@ class _fftw_distributor(object): ...@@ -876,9 +897,14 @@ class _fftw_distributor(object):
self.mpi_dtype = self._my_dtype_converter.to_mpi(self.dtype) self.mpi_dtype = self._my_dtype_converter.to_mpi(self.dtype)
self._local_size = pyfftw.local_size(self.global_shape) #self._local_size = pyfftw.local_size(self.global_shape)
self.local_start = self._local_size[2] #self.local_start = self._local_size[2]
self.local_end = self.local_start + self._local_size[1] #self.local_end = self.local_start + self._local_size[1]
self.slicer = lambda global_shape: slicer(global_shape, comm = comm)
self._local_size = self.slicer(self.global_shape)
self.local_start = self._local_size[0]
self.local_end = self._local_size[1]
self.local_length = self.local_end-self.local_start self.local_length = self.local_end-self.local_start
self.local_shape = (self.local_length,) + tuple(self.global_shape[1:]) self.local_shape = (self.local_length,) + tuple(self.global_shape[1:])
self.local_dim = np.product(self.local_shape) self.local_dim = np.product(self.local_shape)
...@@ -1022,36 +1048,112 @@ class _fftw_distributor(object): ...@@ -1022,36 +1048,112 @@ class _fftw_distributor(object):
if from_slices == None: if from_slices == None:
update_slice = (slice(o[r], o[r]+l),) update_slice = (slice(o[r], o[r]+l),)
else: else:
## Determine the part of the source array, which is relevant
## for the target rank
if (from_slices[0].step > 0) or (from_slices[0].step is None):
## f_relative_start: index of start of source data in
## source array
f_lower_end = from_slices[0].start
if f_lower_end is None:
f_lower_end = 0
## f_start: index of start of specific source data in
## source array
f_start = f_lower_end + o[r]
## f_stop: index of stop of specific source data
f_stop = f_start + l
elif from_slices[0].step < 0:
## f_relative_start: index of start of source data in
## source array
f_upper_end = from_slices[0].start
if f_upper_end is None:
f_upper_end = data_update.shape[0] - 1
## f_start: index of start of specific source data in
## source array
f_start = f_upper_end - o[r]
## f_stop: index of stop of specific source data
f_stop = f_start - l
f_step = from_slices[0].step
if f_step == None:
f_step = 1
f_direction = np.sign(f_step)
f_relative_start = from_slices[0].start
if f_relative_start != None:
f_start = f_relative_start + f_direction*o[r]
else: else:
f_start = None raise ValueError(about._errors.cstring(\
f_relative_start = 0 "ERROR: step size == 0!"))
f_stop = f_relative_start + f_direction*(o[r]+l*np.abs(f_step)) update_slice = (slice(f_start,
if f_stop < 0: f_stop,
f_stop = None from_slices[0].step),)
## combine the slicing for the first dimension
update_slice = (slice(f_start,
f_stop,
f_step),
)
## add the rest of the from_slicing
update_slice += from_slices[1:] update_slice += from_slices[1:]
data[local_slice] = np.array(data_update[update_slice],\ data[local_slice] = np.array(data_update[update_slice],\
copy=False).astype(self.dtype) copy=False).astype(self.dtype)
# ## TODO: Fallunterscheidung, ob direction positiv oder negativ!!
# if from_slices[0].step > 0:
# f_relative_start = from_slices[0].start
# else:
# f_relative_start = from_slices[0].stop + 1
#
# if f_relative_start is None:
# f_relative_start = 0
#
# local_start = f_relative_start + o[r]
# print ('rank', rank,
# 'f_relative_start', f_relative_start,
# 'local_start', local_start,
# 'o[r]', o[r])
#
#
# update_slice = self._backshift_and_decycle(
# slice_object = from_slices[0],
# shifted_start = local_start,
# shifted_stop = local_start+l,
# global_length = data_update.shape[0])
#
# print ('rank', rank, update_slice)
# f_step = from_slices[0].step
# if f_step == None:
# f_step = 1
#
# f_direction = np.sign(f_step)
#
# f_relative_start = from_slices[0].start
#
# ## Case 1: f_direction is positive
# if f_direction > 0:
# if f_relative_start != None:
# f_start = f_relative_start + o[r]
#
#
#
#
# if f_relative_start != None:
# f_start = f_relative_start + f_direction*o[r]
# else:
# f_start = None
# f_relative_start = self.local_start + l - 1
#
#
# f_stop = f_relative_start + f_direction*(o[r]+l*np.abs(f_step))
# print (rank,
# 'f_start', f_start,
# 'offset', self.local_start,
# 'f_relative_start', f_relative_start,
# 'f_stop', f_stop)
# if f_stop < 0:
# f_stop = None
#
#
# ## combine the slicing for the first dimension
# update_slice = (slice(f_start,
# f_stop,
# f_step),
# )
# ## add the rest of the from_slicing
# update_slice += from_slices[1:]
#
# data[local_slice] = np.array(data_update[update_slice],\
# copy=False).astype(self.dtype)
#
else: else:
## Scatterv the relevant part from the source_rank to the others ## Scatterv the relevant part from the source_rank to the others
## and plug it into data[local_slice] ## and plug it into data[local_slice]
...@@ -1231,8 +1333,11 @@ class _fftw_distributor(object): ...@@ -1231,8 +1333,11 @@ class _fftw_distributor(object):
(global_length-shifted_stop)%step #stepsize compensation (global_length-shifted_stop)%step #stepsize compensation
else: else:
local_start = slice_object.start - shifted_start local_start = slice_object.start - shifted_start
## if the local_start is negative, pull it up to zero ## if the local_start is negative, immediately return the
local_start = 0 if local_start < 0 else local_start ## values for an empty slice
if local_start < 0:
return 0, 0
## if the local_start is greater than the local length, pull ## if the local_start is greater than the local length, pull
## it down ## it down
if local_start > local_length-1: if local_start > local_length-1:
...@@ -1244,11 +1349,11 @@ class _fftw_distributor(object): ...@@ -1244,11 +1349,11 @@ class _fftw_distributor(object):
local_stop = None local_stop = None
else: else:
local_stop = slice_object.stop - shifted_start local_stop = slice_object.stop - shifted_start
## if local_stop is negative, pull it up to zero ## if local_stop is negative, pull it up to None
local_stop = 0 if local_stop < 0 else local_stop local_stop = None if local_stop < 0 else local_stop
## Note: if start or stop are greater than the array length, ## Note: if start or stop are greater than the array length,
## numpy will automatically cut the index value down into the ## numpy will automatically cut the index value down into the
## array's range ## array's range
return local_start, local_stop return local_start, local_stop
def inject(self, data, to_slices, data_update, from_slices, comm=None, def inject(self, data, to_slices, data_update, from_slices, comm=None,
...@@ -1285,7 +1390,7 @@ class _fftw_distributor(object): ...@@ -1285,7 +1390,7 @@ class _fftw_distributor(object):
try: try:
(data_object, matching_dimensions) = \ (data_object, matching_dimensions) = \
self._reshape_foreign_data(data_object) self._reshape_foreign_data(data_object)
## if the shape-casting fails, try to fix things via locall data ## if the shape-casting fails, try to fix things via local data
## matching ## matching
except(ValueError): except(ValueError):
## Check if all the local shapes match the supplied data ## Check if all the local shapes match the supplied data
...@@ -1386,6 +1491,38 @@ class _fftw_distributor(object): ...@@ -1386,6 +1491,38 @@ class _fftw_distributor(object):
root=target_rank) root=target_rank)
return _gathered_data return _gathered_data
def flatten(self, data, inplace = False):
## it can be the case, that the portion af data changes due to
## the flattening. E.g. this is the case if some nodes had no data at
## all, but the higher dimensions are large.
## Check if the amount of data changes because of the flattening
size_now = np.prod(self.local_shape)
(start_then, end_then) = self.slicer((np.prod(self.global_shape),))
size_then = end_then - start_then
if size_now == size_then:
if inplace == True:
return data.ravel()
else:
return data.flatten()
else:
about.warnings.cprint(
"WARNING: Local datasets on the nodes must be changed "+
"and will be exchanged! "+
"The data will be completely gathered!")
if inplace == True:
about.warnings.cprint(
"WARNING: Inplace flattening is not possible when "+
"data-exchange is necessary!")
## TODO: Improve this by making something smarter than a gather_all
full_data = self.consolidate_data(data).ravel()
## extract the local portion
new_data = full_data[slice(start_then, end_then)]
return new_data
print (size_now, size_then)
if found['h5py']: if found['h5py']:
def save_data(self, data, alias, path=None, overwriteQ=True, comm=None): def save_data(self, data, alias, path=None, overwriteQ=True, comm=None):
if comm == None: if comm == None:
...@@ -1442,9 +1579,56 @@ class _fftw_distributor(object): ...@@ -1442,9 +1579,56 @@ class _fftw_distributor(object):
raise ImportError(about._errors.cstring("ERROR: h5py was not imported")) raise ImportError(about._errors.cstring("ERROR: h5py was not imported"))
def _fftw_slicer(global_shape, comm):
local_size = pyfftw.local_size(global_shape, comm = comm)
start = local_size[2]
end = start + local_size[1]
return (start, end)
class _fftw_distributor(_slicing_distributor):
def __init__(self, global_data=None, global_shape=None, dtype=None,
comm=MPI.COMM_WORLD, alias=None, path=None):
super(_fftw_distributor, self).__init__(slicer = _fftw_slicer,
global_data = global_data,
global_shape = global_shape,
dtype = dtype,
comm = comm,
alias = alias,
path = path)
def _equal_slicer(global_shape, comm=MPI.COMM_WORLD):
rank = comm.rank
size = comm.size
global_length = global_shape[0]
## compute the smallest number of rows the node will get
local_length = global_length // size
## calculate how many nodes will get an extra row
number_of_extras = global_length - local_length * size
## calculate the individual offset
offset = rank*local_length + min(rank, number_of_extras)*1
## check if local node will get an extra row or not
if number_of_extras > rank:
## if yes, increase the local_length by one
local_length += 1
return (offset, offset+local_length)
class _equal_distributor(_slicing_distributor):
def __init__(self, global_data=None, global_shape=None, dtype=None,
comm=MPI.COMM_WORLD, alias=None, path=None):
super(_equal_distributor, self).__init__(slicer = _equal_slicer,
global_data = global_data,
global_shape = global_shape,
dtype = dtype,
comm = comm,
alias = alias,
path = path)
class _not_distributor(object): class _not_distributor(object):
def __init__(self, global_data=None, global_shape=None, dtype=None, *args, **kwargs): def __init__(self, global_data=None, global_shape=None, dtype=None, *args, **kwargs):
if dtype != None: if dtype != None:
...@@ -1492,7 +1676,13 @@ class _not_distributor(object): ...@@ -1492,7 +1676,13 @@ class _not_distributor(object):
def extract_local_data(self, data_object): def extract_local_data(self, data_object):
return data_object.get_full_data() return data_object.get_full_data()
def flatten(self, data, inplace = False):
if inplace == False:
return data.flatten()
else:
return data.reshape(data.size)
def save_data(self, *args, **kwargs): def save_data(self, *args, **kwargs):
raise AttributeError(about._errors.cstring( raise AttributeError(about._errors.cstring(
"ERROR: save_data not implemented")) "ERROR: save_data not implemented"))
......
...@@ -1611,13 +1611,18 @@ class diagonal_operator(operator): ...@@ -1611,13 +1611,18 @@ class diagonal_operator(operator):
""" """
diag = super(diagonal_operator, self).diag(bare=bare, if (domain is None) or (domain == self.domain):
if(not self.domain.discrete)and(bare):
diag = self.domain.calc_weight(self.val, power=-1)
else:
diag = self.val
else:
diag = super(diagonal_operator, self).diag(bare=bare,
domain=domain, domain=domain,
nrun=1, nrun=1,
random='pm1', random='pm1',
varQ=False, varQ=False,
**kwargs) **kwargs)
if varQ == True: if varQ == True:
return (diag, diag.domain.cast(1)) return (diag, diag.domain.cast(1))
else: else:
...@@ -1712,13 +1717,20 @@ class diagonal_operator(operator): ...@@ -1712,13 +1717,20 @@ class diagonal_operator(operator):
entries; e.g., as variance in case of an covariance operator. entries; e.g., as variance in case of an covariance operator.
""" """
inverse_diag = super(diagonal_operator, self).inverse_diag(bare=bare,
if (domain is None) or (domain == self.domain):
inverse_val = 1./self.val
if(not self.domain.discrete)and(bare):
inverse_diag = self.domain.calc_weight(inverse_val, power=-1)
else:
inverse_diag = inverse_val
else:
inverse_diag = super(diagonal_operator, self).inverse_diag(bare=bare,
domain=domain, domain=domain,
nrun=1, nrun=1,
random='pm1', random='pm1',
varQ=False, varQ=False,
**kwargs) **kwargs)
if varQ == True: if varQ == True:
return (inverse_diag, inverse_diag.domain.cast(1)) return (inverse_diag, inverse_diag.domain.cast(1))
else: else:
...@@ -3034,7 +3046,8 @@ class response_operator(operator): ...@@ -3034,7 +3046,8 @@ class response_operator(operator):
Whether to consider the arguments as densities or not. Whether to consider the arguments as densities or not.
Mandatory for the correct incorporation of volume weights. Mandatory for the correct incorporation of volume weights.
""" """
def __init__(self,domain,sigma=0,mask=1,assign=None,den=False,target=None): def __init__(self, domain, sigma=0, mask=1, assign=None, den=False,
target=None):
""" """
Sets the standard properties and `density`, `sigma`, `mask` and `assignment(s)`. Sets the standard properties and `density`, `sigma`, `mask` and `assignment(s)`.
......
...@@ -774,11 +774,20 @@ class rg_space(point_space): ...@@ -774,11 +774,20 @@ class rg_space(point_space):
## Case 2: normal distribution with zero-mean and a given standard ## Case 2: normal distribution with zero-mean and a given standard
## deviation or variance ## deviation or variance
elif arg[0] == 'gau': elif arg[0] == 'gau':
var = arg[3]
if np.isscalar(var) == True or var is None:
processed_var = var
else:
try:
processed_var = sample.distributor.extract_local_data(var)
except(AttributeError):
processed_var = var
gen = lambda s: random.gau(datatype=self.datatype, gen = lambda s: random.gau(datatype=self.datatype,
shape = s, shape = s,
mean = arg[1], mean = arg[1],
dev = arg[2], dev = arg[2],
var = arg[3]) var = processed_var)
sample.apply_generator(gen) sample.apply_generator(gen)
if hermitianizeQ == True: if hermitianizeQ == True:
...@@ -2255,9 +2264,14 @@ class utilities(object): ...@@ -2255,9 +2264,14 @@ class utilities(object):
## flip in every direction ## flip in every direction
for i in xrange(dimensions): for i in xrange(dimensions):
slice_picker = slice_primitive[:] slice_picker = slice_primitive[:]
slice_picker[i] = slice(1, None) slice_picker[i] = slice(1, None,None)
slice_inverter = slice_primitive[:] slice_inverter = slice_primitive[:]
slice_inverter[i] = slice(None, 0, -1) slice_inverter[i] = slice(None, 0, -1)
y[slice_picker] = y[slice_inverter]
try:
y.inject(to_slices=slice_picker, data=y,
from_slices=slice_inverter)
except(AttributeError):
y[slice_picker] = y[slice_inverter]
return y return y
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment