Commit 33a90fc9 authored by Marco Selig's avatar Marco Selig
Browse files

probging restructured for shared memory usage.

parent 8f7d9547
......@@ -122,6 +122,8 @@ import pylab as pl
from matplotlib.colors import LogNorm as ln
from matplotlib.ticker import LogFormatter as lf
from multiprocessing import Pool as mp
from multiprocessing import Value as mv
from multiprocessing import Array as ma
## third party libraries
import gfft as gf
import healpy as hp
......@@ -7980,7 +7982,7 @@ class operator(object):
##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
def tr(self,domain=None,target=None,random="pm1",ncpu=2,nrun=8,nper=None,var=False,loop=False,**kwargs):
def tr(self,domain=None,target=None,random="pm1",ncpu=2,nrun=8,nper=1,var=False,loop=False,**kwargs):
"""
Computes the trace of the operator
......@@ -8002,7 +8004,7 @@ class operator(object):
nrun : int, *optional*
total number of probes (default: 8)
nper : int, *optional*
number of tasks performed by one process (default: None)
number of tasks performed by one process (default: 1)
var : bool, *optional*
Indicates whether to additionally return the probing variance
or not (default: False).
......@@ -8026,7 +8028,7 @@ class operator(object):
domain = self.domain
return trace_probing(self,function=self.times,domain=domain,target=target,random=random,ncpu=ncpu,nrun=nrun,nper=nper,var=var,**kwargs)(loop=loop)
def inverse_tr(self,domain=None,target=None,random="pm1",ncpu=2,nrun=8,nper=None,var=False,loop=False,**kwargs):
def inverse_tr(self,domain=None,target=None,random="pm1",ncpu=2,nrun=8,nper=1,var=False,loop=False,**kwargs):
"""
Computes the trace of the inverse operator
......@@ -8048,7 +8050,7 @@ class operator(object):
nrun : int, *optional*
total number of probes (default: 8)
nper : int, *optional*
number of tasks performed by one process (default: Nonoe)
number of tasks performed by one process (default: 1)
var : bool, *optional*
Indicates whether to additionally return the probing variance
or not (default: False).
......@@ -8074,7 +8076,7 @@ class operator(object):
##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
def diag(self,bare=False,domain=None,target=None,random="pm1",ncpu=2,nrun=8,nper=None,var=False,save=False,path="tmp",prefix="",loop=False,**kwargs):
def diag(self,bare=False,domain=None,target=None,random="pm1",ncpu=2,nrun=8,nper=1,var=False,save=False,path="tmp",prefix="",loop=False,**kwargs):
"""
Computes the diagonal of the operator via probing.
......@@ -8100,7 +8102,7 @@ class operator(object):
nrun : int, *optional*
total number of probes (default: 8)
nper : int, *optional*
number of tasks performed by one process (default: None)
number of tasks performed by one process (default: 1)
var : bool, *optional*
Indicates whether to additionally return the probing variance
or not (default: False).
......@@ -8154,7 +8156,7 @@ class operator(object):
else:
return diag
def inverse_diag(self,bare=False,domain=None,target=None,random="pm1",ncpu=2,nrun=8,nper=None,var=False,save=False,path="tmp",prefix="",loop=False,**kwargs):
def inverse_diag(self,bare=False,domain=None,target=None,random="pm1",ncpu=2,nrun=8,nper=1,var=False,save=False,path="tmp",prefix="",loop=False,**kwargs):
"""
Computes the diagonal of the inverse operator via probing.
......@@ -8180,7 +8182,7 @@ class operator(object):
nrun : int, *optional*
total number of probes (default: 8)
nper : int, *optional*
number of tasks performed by one process (default: None)
number of tasks performed by one process (default: 1)
var : bool, *optional*
Indicates whether to additionally return the probing variance
or not (default: False).
......@@ -8286,7 +8288,7 @@ class operator(object):
nrun : int, *optional*
total number of probes (default: 8)
nper : int, *optional*
number of tasks performed by one process (default: None)
number of tasks performed by one process (default: 1)
save : bool, *optional*
whether all individual probing results are saved or not
(default: False)
......@@ -8354,7 +8356,7 @@ class operator(object):
nrun : int, *optional*
total number of probes (default: 8)
nper : int, *optional*
number of tasks performed by one process (default: 8)
number of tasks performed by one process (default: 1)
save : bool, *optional*
whether all individual probing results are saved or not
(default: False)
......@@ -8418,7 +8420,7 @@ class operator(object):
nrun : int, *optional*
total number of probes (default: 8)
nper : int, *optional*
number of tasks performed by one process (default: None)
number of tasks performed by one process (default: 1)
save : bool, *optional*
whether all individual probing results are saved or not
(default: False)
......@@ -8482,7 +8484,7 @@ class operator(object):
nrun : int, *optional*
total number of probes (default: 8)
nper : int, *optional*
number of tasks performed by one process (default: None)
number of tasks performed by one process (default: 1)
save : bool, *optional*
whether all individual probing results are saved or not
(default: False)
......@@ -8753,7 +8755,7 @@ class diagonal_operator(operator):
nrun : int, *optional*
total number of probes (default: 8)
nper : int, *optional*
number of tasks performed by one process (default: None)
number of tasks performed by one process (default: 1)
var : bool, *optional*
Indicates whether to additionally return the probing variance
or not (default: False).
......@@ -8810,7 +8812,7 @@ class diagonal_operator(operator):
nrun : int, *optional*
total number of probes (default: 8)
nper : int, *optional*
number of tasks performed by one process (default: None)
number of tasks performed by one process (default: 1)
var : bool, *optional*
Indicates whether to additionally return the probing variance
or not (default: False).
......@@ -8876,7 +8878,7 @@ class diagonal_operator(operator):
nrun : int, *optional*
total number of probes (default: 8)
nper : int, *optional*
number of tasks performed by one process (default: None)
number of tasks performed by one process (default: 1)
var : bool, *optional*
Indicates whether to additionally return the probing variance
or not (default: False).
......@@ -8959,7 +8961,7 @@ class diagonal_operator(operator):
nrun : int, *optional*
total number of probes (default: 8)
nper : int, *optional*
number of tasks performed by one process (default: None)
number of tasks performed by one process (default: 1)
var : bool, *optional*
Indicates whether to additionally return the probing variance
or not (default: False).
......@@ -9768,7 +9770,7 @@ class projection_operator(operator):
nrun : int, *optional*
total number of probes (default: 8)
nper : int, *optional*
number of tasks performed by one process (default: None)
number of tasks performed by one process (default: 1)
save : bool, *optional*
whether all individual probing results are saved or not
(default: False)
......@@ -9946,7 +9948,7 @@ class vecvec_operator(operator):
nrun : int, *optional*
total number of probes (default: 8)
nper : int, *optional*
number of tasks performed by one process (default: None)
number of tasks performed by one process (default: 1)
var : bool, *optional*
Indicates whether to additionally return the probing variance
or not (default: False).
......@@ -10005,7 +10007,7 @@ class vecvec_operator(operator):
nrun : int, *optional*
total number of probes (default: 8)
nper : int, *optional*
number of tasks performed by one process (default: None)
number of tasks performed by one process (default: 1)
var : bool, *optional*
Indicates whether to additionally return the probing variance
or not (default: False).
......@@ -10357,6 +10359,1060 @@ class response_operator(operator):
###=============================================================================
#
#class probing(object):
# """
# .. __ __
# .. / / /__/
# .. ______ _____ ______ / /___ __ __ ___ ____ __
# .. / _ | / __/ / _ | / _ | / / / _ | / _ /
# .. / /_/ / / / / /_/ / / /_/ / / / / / / / / /_/ /
# .. / ____/ /__/ \______/ \______/ /__/ /__/ /__/ \____ / class
# .. /__/ /______/
#
# NIFTY class for probing (using multiprocessing)
#
# This is the base NIFTY probing class from which other probing classes
# (e.g. diagonal probing) are derived.
#
# When called, a probing class instance evaluates an operator or a
# function using random fields, whose components are random variables
# with mean 0 and variance 1. When an instance is called it returns the
# mean value of f(probe), where probe is a random field with mean 0 and
# variance 1. The mean is calculated as 1/N Sum[ f(probe_i) ].
#
# Parameters
# ----------
# op : operator
# The operator specified by `op` is the operator to be probed.
# If no operator is given, then probing will be done by applying
# `function` to the probes. (default: None)
# function : function, *optional*
# If no operator has been specified as `op`, then specification of
# `function` is non optional. This is the function, that is applied
# to the probes. (default: `op.times`)
# domain : space, *optional*
# If no operator has been specified as `op`, then specification of
# `domain` is non optional. This is the space that the probes live
# in. (default: `op.domain`)
# target : domain, *optional*
# `target` is the codomain of `domain`
# (default: `op.domain.get_codomain()`)
# random : string, *optional*
# the distribution from which the probes are drawn. `random` can be
# either "pm1" or "gau". "pm1" is a uniform distribution over {+1,-1}
# or {+1,+i,-1,-i}, respectively. "gau" is a normal distribution with
# zero-mean and unit-variance (default: "pm1")
# ncpu : int, *optional*
# the number of cpus to be used from parallel probing. (default: 2)
# nrun : int, *optional*
# the number of probes to be evaluated. If `nrun<ncpu**2`, it will be
# set to `ncpu**2`. (default: 8)
# nper : int, *optional*
# this number specifies how many probes will be evaluated by one
# worker. Afterwards a new worker will be created to evaluate a chunk
# of `nper` probes.
# If for example `nper=nrun/ncpu`, then every worker will be created
# for every cpu. This can lead to the case, that all workers but one
# are already finished, but have to wait for the last worker that
# might still have a considerable amount of evaluations left. This is
# obviously not very effective.
# If on the other hand `nper=1`, then for each evaluation a worker will
# be created. In this case all cpus will work until nrun probes have
# been evaluated.
# It is recommended to leave `nper` as the default value. (default: 8)
# var : bool, *optional*
# If `var` is True, then the variance of the sampled function will
# also be returned. The result is then a tuple with the mean in the
# zeroth entry and the variance in the first entry. (default: False)
#
#
# See Also
# --------
# diagonal_probing : A probing class to get the diagonal of an operator
# trace_probing : A probing class to get the trace of an operator
#
#
# Attributes
# ----------
# function : function
# the function, that is applied to the probes
# domain : space
# the space, where the probes live in
# target : space
# the codomain of `domain`
# random : string
# the random number generator used to create the probes
# (either "pm1" or "gau")
# ncpu : int
# the number of cpus used for probing
# nrun : int
# the number of probes to be evaluated, when the instance is called
# nper : int
# number of probes, that will be evaluated by one worker
# var : bool
# whether the variance will be additionally returned, when the
# instance is called
# quargs : dict
# Keyword arguments passed to `function` in each call.
#
# """
# def __init__(self,op=None,function=None,domain=None,target=None,random="pm1",ncpu=2,nrun=8,nper=None,var=False,**quargs):
# """
# initializes a probing instance
#
# Parameters
# ----------
# op : operator
# The operator specified by `op` is the operator to be probed.
# If no operator is given, then probing will be done by applying
# `function` to the probes. (default: None)
# function : function, *optional*
# If no operator has been specified as `op`, then specification of
# `function` is non optional. This is the function, that is applied
# to the probes. (default: `op.times`)
# domain : space, *optional*
# If no operator has been specified as `op`, then specification of
# `domain` is non optional. This is the space that the probes live
# in. (default: `op.domain`)
# target : domain, *optional*
# `target` is the codomain of `domain`
# (default: `op.domain.get_codomain()`)
# random : string, *optional*
# the distribution from which the probes are drawn. `random` can be
# either "pm1" or "gau". "pm1" is a uniform distribution over {+1,-1}
# or {+1,+i,-1,-i}, respectively. "gau" is a normal distribution with
# zero-mean and unit-variance (default: "pm1")
# ncpu : int, *optional*
# the number of cpus to be used from parallel probing. (default: 2)
# nrun : int, *optional*
# the number of probes to be evaluated. If `nrun<ncpu**2`, it will be
# set to `ncpu**2`. (default: 8)
# nper : int, *optional*
# this number specifies how many probes will be evaluated by one
# worker. Afterwards a new worker will be created to evaluate a chunk
# of `nper` probes.
# If for example `nper=nrun/ncpu`, then every worker will be created
# for every cpu. This can lead to the case, that all workers but one
# are already finished, but have to wait for the last worker that
# might still have a considerable amount of evaluations left. This is
# obviously not very effective.
# If on the other hand `nper=1`, then for each evaluation a worker will
# be created. In this case all cpus will work until nrun probes have
# been evaluated.
# It is recommended to leave `nper` as the default value. (default: 8)
# var : bool, *optional*
# If `var` is True, then the variance of the sampled function will
# also be returned. The result is then a tuple with the mean in the
# zeroth entry and the variance in the first entry. (default: False)
#
# """
# if(op is None):
# ## check whether callable
# if(function is None)or(not hasattr(function,"__call__")):
# raise TypeError(about._errors.cstring("ERROR: invalid input."))
# ## check given domain
# if(domain is None)or(not isinstance(domain,space)):
# raise TypeError(about._errors.cstring("ERROR: invalid input."))
# else:
# if(not isinstance(op,operator)):
# raise TypeError(about._errors.cstring("ERROR: invalid input."))
# ## check whether callable
# if(function is None)or(not hasattr(function,"__call__")):
# function = op.times
# elif(op==function):
# function = op.times
# ## check whether correctly bound
# if(op!=function.im_self):
# raise NameError(about._errors.cstring("ERROR: invalid input."))
# ## check given domain
# if(domain is None)or(not isinstance(domain,space)):
# if(function in [op.inverse_times,op.adjoint_times]):
# domain = op.target
# else:
# domain = op.domain
# else:
# if(function in [op.inverse_times,op.adjoint_times]):
# op.target.check_codomain(domain) ## a bit pointless
# else:
# op.domain.check_codomain(domain) ## a bit pointless
#
# self.function = function
# self.domain = domain
#
# if(target is None):
# target = domain.get_codomain()
# ## check codomain
# self.domain.check_codomain(target) ## a bit pointless
# self.target = target
#
# if(random not in ["pm1","gau"]):
# raise ValueError(about._errors.cstring("ERROR: unsupported random key '"+str(random)+"'."))
# self.random = random
#
# self.ncpu = int(max(1,ncpu))
# self.nrun = int(max(self.ncpu**2,nrun))
# if(nper is None):
# self.nper = None
# else:
# self.nper = int(max(1,min(self.nrun//self.ncpu,nper)))
#
# self.var = bool(var)
#
# self.quargs = quargs
#
# ##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
#
# def configure(self,**kwargs):
# """
# changes the attributes of the instance
#
# Parameters
# ----------
# random : string, *optional*
# the random number generator used to create the probes (default: "pm1")
# ncpu : int, *optional*
# the number of cpus to be used for parallel probing. (default: 2)
# nrun : int, *optional*
# the number of probes to be evaluated. If `nrun<ncpu**2`, it will be
# set to `ncpu**2`. (default: 8)
# nper : int, *optional*
# number of probes, that will be evaluated by one worker (default: 8)
# var : bool, *optional*
# whether the variance will be additionally returned (default: False)
#
# """
# if("random" in kwargs):
# if(kwargs.get("random") not in ["pm1","gau"]):
# raise ValueError(about._errors.cstring("ERROR: unsupported random key '"+str(kwargs.get("random"))+"'."))
# self.random = kwargs.get("random")
#
# if("ncpu" in kwargs):
# self.ncpu = int(max(1,kwargs.get("ncpu")))
# if("nrun" in kwargs):
# self.nrun = int(max(self.ncpu**2,kwargs.get("nrun")))
# if("nper" in kwargs):
# if(kwargs.get("nper") is None):
# self.nper = None
# else:
# self.nper = int(max(1,min(self.nrun//self.ncpu,kwargs.get("nper"))))
#
# if("var" in kwargs):
# self.var = bool(kwargs.get("var"))
#
# ##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
#
# def gen_probe(self):
# """
# Generates a single probe
#
# Returns
# -------
# probe : field
# a random field living in `domain` with mean 0 and variance 1 in
# each component
#
# """
# return field(self.domain,val=None,target=self.target,random=self.random)
#
# ##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
#
# def probing(self,idnum,probe):
# """
# Computes a single probing result given one probe
#
# Parameters
# ----------
# probe : field
# the field on which `function` will be applied
# idnum : int
# the identification number of the probing
#
# Returns
# -------
# result : array-like
# the result of applying `function` to `probe`. The exact type
# depends on the function.
#
# """
# f = self.function(probe,**self.quargs)
# if(isinstance(f,field)):
# return f.val
# else:
# return f
#
# ##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
#
# def evaluate(self,results):
# """
# evaluates the probing results
#
# Parameters
# ----------
# results : list
# the list containing the results of the individual probings.
# The type of the list elements depends on the function.
#
# Returns
# -------
# final : array-like
# the final probing result. 1/N Sum[ probing(probe_i) ]
# var : array-like
# the variance of the final probing result.
# (N(N-1))^(-1) Sum[ ( probing(probe_i) - final)^2 ]
# If the variance is returned, the return will be a tuple with
# `final` in the zeroth entry and `var` in the first entry.
#
# """
# if(len(results)==0):
# about.warnings.cprint("WARNING: probing failed.")
# return None
# elif(self.var):
# return np.mean(np.array(results),axis=0,dtype=None,out=None),np.var(np.array(results),axis=0,dtype=None,out=None,ddof=0)/(len(results)-1)
# else:
# return np.mean(np.array(results),axis=0,dtype=None,out=None)
#
# ##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
#
# def _progress(self,idnum): ## > prints progress status by in upto 10 dots
# tenths = 1+(10*idnum//self.nrun)
# about.infos.cflush(("\b")*10+('.')*tenths+(' ')*(10-tenths))
#
# def _single_probing(self,zipped): ## > performs one probing operation
# ## generate probe
# np.random.seed(zipped[0])
# probe = self.gen_probe()
# ## do the actual probing
# self._progress(zipped[1])
# return self.probing(zipped[1],probe)
#
# def _serial_probing(self,zipped): ## > performs the probing operation serially
# try:
# return self._single_probing(zipped)
# except:
# ## kill pool
# os.kill()
#
# def _parallel_probing(self): ## > performs the probing operations in parallel
# ## define random seed
# seed = np.random.randint(10**8,high=None,size=self.nrun)
# ## build pool
# if(about.infos.status):
# so.write(about.infos.cstring("INFO: multiprocessing "+(' ')*10))
# so.flush()
# pool = mp(processes=self.ncpu,initializer=None,initargs=(),maxtasksperchild=self.nper)
# try:
# ## retrieve results
# results = pool.map(self._serial_probing,zip(seed,np.arange(self.nrun,dtype=np.int)),chunksize=None)#,callback=None).get(timeout=None) ## map_async replaced
# ## close and join pool
# about.infos.cflush(" done.")
# pool.close()
# pool.join()
# except:
# ## terminate and join pool
# pool.terminate()
# pool.join()
# raise Exception(about._errors.cstring("ERROR: unknown. NOTE: pool terminated.")) ## traceback by looping
# ## cleanup
# results = [rr for rr in results if(rr is not None)]
# if(len(results)<self.nrun):
# about.infos.cflush(" ( %u probe(s) failed, effectiveness == %.1f%% )\n"%(self.nrun-len(results),100*len(results)/self.nrun))
# else:
# about.infos.cflush("\n")
# ## evaluate
# return self.evaluate(results)
#
# def _nonparallel_probing(self): ## > performs the probing operations one after another
# ## define random seed
# seed = np.random.randint(10**8,high=None,size=self.nrun)
# ## retrieve results
# if(about.infos.status):
# so.write(about.infos.cstring("INFO: looping "+(' ')*10))
# so.flush()
# results = map(self._single_probing,zip(seed,np.arange(self.nrun,dtype=np.int)))
# about.infos.cflush(" done.")
# ## cleanup
# results = [rr for rr in results if(rr is not None)]
# if(len(results)<self.nrun):
# about.infos.cflush(" ( %u probe(s) failed, effectiveness == %.1f%% )\n"%(self.nrun-len(results),100*len(results)/self.nrun))
# else:
# about.infos.cflush("\n")
# ## evaluate
# return self.evaluate(results)
#
# def __call__(self,loop=False,**kwargs):
# """
#
# Starts the probing process.
# All keyword arguments that can be given to `configure` can also be
# given to `__call__` and have the same effect.
#
# Parameters
# ----------
# loop : bool, *optional*
# if `loop` is True, then multiprocessing will be disabled and
# all probes are evaluated by a single worker (default: False)
#
# Returns
# -------
# results : see **Returns** in `evaluate`
#
# other parameters
# ----------------
# kwargs : see **Parameters** in `configure`
#
# """
# self.configure(**kwargs)
# if(not about.multiprocessing.status)or(loop):
# return self._nonparallel_probing()
# else:
# return self._parallel_probing()
#
# ##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
#
# def __repr__(self):
# return "<nifty.probing>"
#
###=============================================================================
#
#
#
###-----------------------------------------------------------------------------
#
#class trace_probing(probing):
# """
# .. __
# .. / /_
# .. / _/ _____ ____ __ _______ _______
# .. / / / __/ / _ / / ____/ / __ /
# .. / /_ / / / /_/ / / /____ / /____/
# .. \___/ /__/ \______| \______/ \______/ probing class
#
# NIFTY subclass for trace probing (using multiprocessing)
#
# When called, a trace_probing class instance samples the trace of an
# operator or a function using random fields, whose components are random
# variables with mean 0 and variance 1. When an instance is called it
# returns the mean value of the scalar product of probe and f(probe),
# where probe is a random field with mean 0 and variance 1.
# The mean is calculated as 1/N Sum[ probe_i.dot(f(probe_i)) ].
#
# Parameters
# ----------
# op : operator
# The operator specified by `op` is the operator to be probed.
# If no operator is given, then probing will be done by applying
# `function` to the probes. (default: None)
# function : function, *optional*
# If no operator has been specified as `op`, then specification of
# `function` is non optional. This is the function, that is applied
# to the probes. (default: `op.times`)
# domain : space, *optional*
# If no operator has been specified as `op`, then specification of
# `domain` is non optional. This is the space that the probes live
# in. (default: `op.domain`)
# target : domain, *optional*
# `target` is the codomain of `domain`
# (default: `op.domain.get_codomain()`)
# random : string, *optional*
# the distribution from which the probes are drawn. `random` can be
# either "pm1" or "gau". "pm1" is a uniform distribution over {+1,-1}
# or {+1,+i,-1,-i}, respectively. "gau" is a normal distribution with
# zero-mean and unit-variance (default: "pm1")
# ncpu : int, *optional*
# the number of cpus to be used from parallel probing. (default: 2)
# nrun : int, *optional*
# the number of probes to be evaluated. If `nrun<ncpu**2`, it will be
# set to `ncpu**2`. (default: 8)
# nper : int, *optional*