From 33a90fc9a2430177105b2e1b96a47a5da604c808 Mon Sep 17 00:00:00 2001 From: Marco Selig <mselig@ncg-02.MPA-Garching.MPG.DE> Date: Thu, 24 Oct 2013 09:02:53 +0200 Subject: [PATCH] probging restructured for shared memory usage. --- nifty_core.py | 1539 ++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 1409 insertions(+), 130 deletions(-) diff --git a/nifty_core.py b/nifty_core.py index a4833b582..308d89d68 100644 --- a/nifty_core.py +++ b/nifty_core.py @@ -122,6 +122,8 @@ import pylab as pl from matplotlib.colors import LogNorm as ln from matplotlib.ticker import LogFormatter as lf from multiprocessing import Pool as mp +from multiprocessing import Value as mv +from multiprocessing import Array as ma ## third party libraries import gfft as gf import healpy as hp @@ -7980,7 +7982,7 @@ class operator(object): ##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ - def tr(self,domain=None,target=None,random="pm1",ncpu=2,nrun=8,nper=None,var=False,loop=False,**kwargs): + def tr(self,domain=None,target=None,random="pm1",ncpu=2,nrun=8,nper=1,var=False,loop=False,**kwargs): """ Computes the trace of the operator @@ -8002,7 +8004,7 @@ class operator(object): nrun : int, *optional* total number of probes (default: 8) nper : int, *optional* - number of tasks performed by one process (default: None) + number of tasks performed by one process (default: 1) var : bool, *optional* Indicates whether to additionally return the probing variance or not (default: False). @@ -8026,7 +8028,7 @@ class operator(object): domain = self.domain return trace_probing(self,function=self.times,domain=domain,target=target,random=random,ncpu=ncpu,nrun=nrun,nper=nper,var=var,**kwargs)(loop=loop) - def inverse_tr(self,domain=None,target=None,random="pm1",ncpu=2,nrun=8,nper=None,var=False,loop=False,**kwargs): + def inverse_tr(self,domain=None,target=None,random="pm1",ncpu=2,nrun=8,nper=1,var=False,loop=False,**kwargs): """ Computes the trace of the inverse operator @@ -8048,7 +8050,7 @@ class operator(object): nrun : int, *optional* total number of probes (default: 8) nper : int, *optional* - number of tasks performed by one process (default: Nonoe) + number of tasks performed by one process (default: 1) var : bool, *optional* Indicates whether to additionally return the probing variance or not (default: False). @@ -8074,7 +8076,7 @@ class operator(object): ##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ - def diag(self,bare=False,domain=None,target=None,random="pm1",ncpu=2,nrun=8,nper=None,var=False,save=False,path="tmp",prefix="",loop=False,**kwargs): + def diag(self,bare=False,domain=None,target=None,random="pm1",ncpu=2,nrun=8,nper=1,var=False,save=False,path="tmp",prefix="",loop=False,**kwargs): """ Computes the diagonal of the operator via probing. @@ -8100,7 +8102,7 @@ class operator(object): nrun : int, *optional* total number of probes (default: 8) nper : int, *optional* - number of tasks performed by one process (default: None) + number of tasks performed by one process (default: 1) var : bool, *optional* Indicates whether to additionally return the probing variance or not (default: False). @@ -8154,7 +8156,7 @@ class operator(object): else: return diag - def inverse_diag(self,bare=False,domain=None,target=None,random="pm1",ncpu=2,nrun=8,nper=None,var=False,save=False,path="tmp",prefix="",loop=False,**kwargs): + def inverse_diag(self,bare=False,domain=None,target=None,random="pm1",ncpu=2,nrun=8,nper=1,var=False,save=False,path="tmp",prefix="",loop=False,**kwargs): """ Computes the diagonal of the inverse operator via probing. @@ -8180,7 +8182,7 @@ class operator(object): nrun : int, *optional* total number of probes (default: 8) nper : int, *optional* - number of tasks performed by one process (default: None) + number of tasks performed by one process (default: 1) var : bool, *optional* Indicates whether to additionally return the probing variance or not (default: False). @@ -8286,7 +8288,7 @@ class operator(object): nrun : int, *optional* total number of probes (default: 8) nper : int, *optional* - number of tasks performed by one process (default: None) + number of tasks performed by one process (default: 1) save : bool, *optional* whether all individual probing results are saved or not (default: False) @@ -8354,7 +8356,7 @@ class operator(object): nrun : int, *optional* total number of probes (default: 8) nper : int, *optional* - number of tasks performed by one process (default: 8) + number of tasks performed by one process (default: 1) save : bool, *optional* whether all individual probing results are saved or not (default: False) @@ -8418,7 +8420,7 @@ class operator(object): nrun : int, *optional* total number of probes (default: 8) nper : int, *optional* - number of tasks performed by one process (default: None) + number of tasks performed by one process (default: 1) save : bool, *optional* whether all individual probing results are saved or not (default: False) @@ -8482,7 +8484,7 @@ class operator(object): nrun : int, *optional* total number of probes (default: 8) nper : int, *optional* - number of tasks performed by one process (default: None) + number of tasks performed by one process (default: 1) save : bool, *optional* whether all individual probing results are saved or not (default: False) @@ -8753,7 +8755,7 @@ class diagonal_operator(operator): nrun : int, *optional* total number of probes (default: 8) nper : int, *optional* - number of tasks performed by one process (default: None) + number of tasks performed by one process (default: 1) var : bool, *optional* Indicates whether to additionally return the probing variance or not (default: False). @@ -8810,7 +8812,7 @@ class diagonal_operator(operator): nrun : int, *optional* total number of probes (default: 8) nper : int, *optional* - number of tasks performed by one process (default: None) + number of tasks performed by one process (default: 1) var : bool, *optional* Indicates whether to additionally return the probing variance or not (default: False). @@ -8876,7 +8878,7 @@ class diagonal_operator(operator): nrun : int, *optional* total number of probes (default: 8) nper : int, *optional* - number of tasks performed by one process (default: None) + number of tasks performed by one process (default: 1) var : bool, *optional* Indicates whether to additionally return the probing variance or not (default: False). @@ -8959,7 +8961,7 @@ class diagonal_operator(operator): nrun : int, *optional* total number of probes (default: 8) nper : int, *optional* - number of tasks performed by one process (default: None) + number of tasks performed by one process (default: 1) var : bool, *optional* Indicates whether to additionally return the probing variance or not (default: False). @@ -9768,7 +9770,7 @@ class projection_operator(operator): nrun : int, *optional* total number of probes (default: 8) nper : int, *optional* - number of tasks performed by one process (default: None) + number of tasks performed by one process (default: 1) save : bool, *optional* whether all individual probing results are saved or not (default: False) @@ -9946,7 +9948,7 @@ class vecvec_operator(operator): nrun : int, *optional* total number of probes (default: 8) nper : int, *optional* - number of tasks performed by one process (default: None) + number of tasks performed by one process (default: 1) var : bool, *optional* Indicates whether to additionally return the probing variance or not (default: False). @@ -10005,7 +10007,7 @@ class vecvec_operator(operator): nrun : int, *optional* total number of probes (default: 8) nper : int, *optional* - number of tasks performed by one process (default: None) + number of tasks performed by one process (default: 1) var : bool, *optional* Indicates whether to additionally return the probing variance or not (default: False). @@ -10357,6 +10359,1060 @@ class response_operator(operator): +###============================================================================= +# +#class probing(object): +# """ +# .. __ __ +# .. / / /__/ +# .. ______ _____ ______ / /___ __ __ ___ ____ __ +# .. / _ | / __/ / _ | / _ | / / / _ | / _ / +# .. / /_/ / / / / /_/ / / /_/ / / / / / / / / /_/ / +# .. / ____/ /__/ \______/ \______/ /__/ /__/ /__/ \____ / class +# .. /__/ /______/ +# +# NIFTY class for probing (using multiprocessing) +# +# This is the base NIFTY probing class from which other probing classes +# (e.g. diagonal probing) are derived. +# +# When called, a probing class instance evaluates an operator or a +# function using random fields, whose components are random variables +# with mean 0 and variance 1. When an instance is called it returns the +# mean value of f(probe), where probe is a random field with mean 0 and +# variance 1. The mean is calculated as 1/N Sum[ f(probe_i) ]. +# +# Parameters +# ---------- +# op : operator +# The operator specified by `op` is the operator to be probed. +# If no operator is given, then probing will be done by applying +# `function` to the probes. (default: None) +# function : function, *optional* +# If no operator has been specified as `op`, then specification of +# `function` is non optional. This is the function, that is applied +# to the probes. (default: `op.times`) +# domain : space, *optional* +# If no operator has been specified as `op`, then specification of +# `domain` is non optional. This is the space that the probes live +# in. (default: `op.domain`) +# target : domain, *optional* +# `target` is the codomain of `domain` +# (default: `op.domain.get_codomain()`) +# random : string, *optional* +# the distribution from which the probes are drawn. `random` can be +# either "pm1" or "gau". "pm1" is a uniform distribution over {+1,-1} +# or {+1,+i,-1,-i}, respectively. "gau" is a normal distribution with +# zero-mean and unit-variance (default: "pm1") +# ncpu : int, *optional* +# the number of cpus to be used from parallel probing. (default: 2) +# nrun : int, *optional* +# the number of probes to be evaluated. If `nrun<ncpu**2`, it will be +# set to `ncpu**2`. (default: 8) +# nper : int, *optional* +# this number specifies how many probes will be evaluated by one +# worker. Afterwards a new worker will be created to evaluate a chunk +# of `nper` probes. +# If for example `nper=nrun/ncpu`, then every worker will be created +# for every cpu. This can lead to the case, that all workers but one +# are already finished, but have to wait for the last worker that +# might still have a considerable amount of evaluations left. This is +# obviously not very effective. +# If on the other hand `nper=1`, then for each evaluation a worker will +# be created. In this case all cpus will work until nrun probes have +# been evaluated. +# It is recommended to leave `nper` as the default value. (default: 8) +# var : bool, *optional* +# If `var` is True, then the variance of the sampled function will +# also be returned. The result is then a tuple with the mean in the +# zeroth entry and the variance in the first entry. (default: False) +# +# +# See Also +# -------- +# diagonal_probing : A probing class to get the diagonal of an operator +# trace_probing : A probing class to get the trace of an operator +# +# +# Attributes +# ---------- +# function : function +# the function, that is applied to the probes +# domain : space +# the space, where the probes live in +# target : space +# the codomain of `domain` +# random : string +# the random number generator used to create the probes +# (either "pm1" or "gau") +# ncpu : int +# the number of cpus used for probing +# nrun : int +# the number of probes to be evaluated, when the instance is called +# nper : int +# number of probes, that will be evaluated by one worker +# var : bool +# whether the variance will be additionally returned, when the +# instance is called +# quargs : dict +# Keyword arguments passed to `function` in each call. +# +# """ +# def __init__(self,op=None,function=None,domain=None,target=None,random="pm1",ncpu=2,nrun=8,nper=None,var=False,**quargs): +# """ +# initializes a probing instance +# +# Parameters +# ---------- +# op : operator +# The operator specified by `op` is the operator to be probed. +# If no operator is given, then probing will be done by applying +# `function` to the probes. (default: None) +# function : function, *optional* +# If no operator has been specified as `op`, then specification of +# `function` is non optional. This is the function, that is applied +# to the probes. (default: `op.times`) +# domain : space, *optional* +# If no operator has been specified as `op`, then specification of +# `domain` is non optional. This is the space that the probes live +# in. (default: `op.domain`) +# target : domain, *optional* +# `target` is the codomain of `domain` +# (default: `op.domain.get_codomain()`) +# random : string, *optional* +# the distribution from which the probes are drawn. `random` can be +# either "pm1" or "gau". "pm1" is a uniform distribution over {+1,-1} +# or {+1,+i,-1,-i}, respectively. "gau" is a normal distribution with +# zero-mean and unit-variance (default: "pm1") +# ncpu : int, *optional* +# the number of cpus to be used from parallel probing. (default: 2) +# nrun : int, *optional* +# the number of probes to be evaluated. If `nrun<ncpu**2`, it will be +# set to `ncpu**2`. (default: 8) +# nper : int, *optional* +# this number specifies how many probes will be evaluated by one +# worker. Afterwards a new worker will be created to evaluate a chunk +# of `nper` probes. +# If for example `nper=nrun/ncpu`, then every worker will be created +# for every cpu. This can lead to the case, that all workers but one +# are already finished, but have to wait for the last worker that +# might still have a considerable amount of evaluations left. This is +# obviously not very effective. +# If on the other hand `nper=1`, then for each evaluation a worker will +# be created. In this case all cpus will work until nrun probes have +# been evaluated. +# It is recommended to leave `nper` as the default value. (default: 8) +# var : bool, *optional* +# If `var` is True, then the variance of the sampled function will +# also be returned. The result is then a tuple with the mean in the +# zeroth entry and the variance in the first entry. (default: False) +# +# """ +# if(op is None): +# ## check whether callable +# if(function is None)or(not hasattr(function,"__call__")): +# raise TypeError(about._errors.cstring("ERROR: invalid input.")) +# ## check given domain +# if(domain is None)or(not isinstance(domain,space)): +# raise TypeError(about._errors.cstring("ERROR: invalid input.")) +# else: +# if(not isinstance(op,operator)): +# raise TypeError(about._errors.cstring("ERROR: invalid input.")) +# ## check whether callable +# if(function is None)or(not hasattr(function,"__call__")): +# function = op.times +# elif(op==function): +# function = op.times +# ## check whether correctly bound +# if(op!=function.im_self): +# raise NameError(about._errors.cstring("ERROR: invalid input.")) +# ## check given domain +# if(domain is None)or(not isinstance(domain,space)): +# if(function in [op.inverse_times,op.adjoint_times]): +# domain = op.target +# else: +# domain = op.domain +# else: +# if(function in [op.inverse_times,op.adjoint_times]): +# op.target.check_codomain(domain) ## a bit pointless +# else: +# op.domain.check_codomain(domain) ## a bit pointless +# +# self.function = function +# self.domain = domain +# +# if(target is None): +# target = domain.get_codomain() +# ## check codomain +# self.domain.check_codomain(target) ## a bit pointless +# self.target = target +# +# if(random not in ["pm1","gau"]): +# raise ValueError(about._errors.cstring("ERROR: unsupported random key '"+str(random)+"'.")) +# self.random = random +# +# self.ncpu = int(max(1,ncpu)) +# self.nrun = int(max(self.ncpu**2,nrun)) +# if(nper is None): +# self.nper = None +# else: +# self.nper = int(max(1,min(self.nrun//self.ncpu,nper))) +# +# self.var = bool(var) +# +# self.quargs = quargs +# +# ##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ +# +# def configure(self,**kwargs): +# """ +# changes the attributes of the instance +# +# Parameters +# ---------- +# random : string, *optional* +# the random number generator used to create the probes (default: "pm1") +# ncpu : int, *optional* +# the number of cpus to be used for parallel probing. (default: 2) +# nrun : int, *optional* +# the number of probes to be evaluated. If `nrun<ncpu**2`, it will be +# set to `ncpu**2`. (default: 8) +# nper : int, *optional* +# number of probes, that will be evaluated by one worker (default: 8) +# var : bool, *optional* +# whether the variance will be additionally returned (default: False) +# +# """ +# if("random" in kwargs): +# if(kwargs.get("random") not in ["pm1","gau"]): +# raise ValueError(about._errors.cstring("ERROR: unsupported random key '"+str(kwargs.get("random"))+"'.")) +# self.random = kwargs.get("random") +# +# if("ncpu" in kwargs): +# self.ncpu = int(max(1,kwargs.get("ncpu"))) +# if("nrun" in kwargs): +# self.nrun = int(max(self.ncpu**2,kwargs.get("nrun"))) +# if("nper" in kwargs): +# if(kwargs.get("nper") is None): +# self.nper = None +# else: +# self.nper = int(max(1,min(self.nrun//self.ncpu,kwargs.get("nper")))) +# +# if("var" in kwargs): +# self.var = bool(kwargs.get("var")) +# +# ##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ +# +# def gen_probe(self): +# """ +# Generates a single probe +# +# Returns +# ------- +# probe : field +# a random field living in `domain` with mean 0 and variance 1 in +# each component +# +# """ +# return field(self.domain,val=None,target=self.target,random=self.random) +# +# ##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ +# +# def probing(self,idnum,probe): +# """ +# Computes a single probing result given one probe +# +# Parameters +# ---------- +# probe : field +# the field on which `function` will be applied +# idnum : int +# the identification number of the probing +# +# Returns +# ------- +# result : array-like +# the result of applying `function` to `probe`. The exact type +# depends on the function. +# +# """ +# f = self.function(probe,**self.quargs) +# if(isinstance(f,field)): +# return f.val +# else: +# return f +# +# ##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ +# +# def evaluate(self,results): +# """ +# evaluates the probing results +# +# Parameters +# ---------- +# results : list +# the list containing the results of the individual probings. +# The type of the list elements depends on the function. +# +# Returns +# ------- +# final : array-like +# the final probing result. 1/N Sum[ probing(probe_i) ] +# var : array-like +# the variance of the final probing result. +# (N(N-1))^(-1) Sum[ ( probing(probe_i) - final)^2 ] +# If the variance is returned, the return will be a tuple with +# `final` in the zeroth entry and `var` in the first entry. +# +# """ +# if(len(results)==0): +# about.warnings.cprint("WARNING: probing failed.") +# return None +# elif(self.var): +# return np.mean(np.array(results),axis=0,dtype=None,out=None),np.var(np.array(results),axis=0,dtype=None,out=None,ddof=0)/(len(results)-1) +# else: +# return np.mean(np.array(results),axis=0,dtype=None,out=None) +# +# ##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ +# +# def _progress(self,idnum): ## > prints progress status by in upto 10 dots +# tenths = 1+(10*idnum//self.nrun) +# about.infos.cflush(("\b")*10+('.')*tenths+(' ')*(10-tenths)) +# +# def _single_probing(self,zipped): ## > performs one probing operation +# ## generate probe +# np.random.seed(zipped[0]) +# probe = self.gen_probe() +# ## do the actual probing +# self._progress(zipped[1]) +# return self.probing(zipped[1],probe) +# +# def _serial_probing(self,zipped): ## > performs the probing operation serially +# try: +# return self._single_probing(zipped) +# except: +# ## kill pool +# os.kill() +# +# def _parallel_probing(self): ## > performs the probing operations in parallel +# ## define random seed +# seed = np.random.randint(10**8,high=None,size=self.nrun) +# ## build pool +# if(about.infos.status): +# so.write(about.infos.cstring("INFO: multiprocessing "+(' ')*10)) +# so.flush() +# pool = mp(processes=self.ncpu,initializer=None,initargs=(),maxtasksperchild=self.nper) +# try: +# ## retrieve results +# results = pool.map(self._serial_probing,zip(seed,np.arange(self.nrun,dtype=np.int)),chunksize=None)#,callback=None).get(timeout=None) ## map_async replaced +# ## close and join pool +# about.infos.cflush(" done.") +# pool.close() +# pool.join() +# except: +# ## terminate and join pool +# pool.terminate() +# pool.join() +# raise Exception(about._errors.cstring("ERROR: unknown. NOTE: pool terminated.")) ## traceback by looping +# ## cleanup +# results = [rr for rr in results if(rr is not None)] +# if(len(results)<self.nrun): +# about.infos.cflush(" ( %u probe(s) failed, effectiveness == %.1f%% )\n"%(self.nrun-len(results),100*len(results)/self.nrun)) +# else: +# about.infos.cflush("\n") +# ## evaluate +# return self.evaluate(results) +# +# def _nonparallel_probing(self): ## > performs the probing operations one after another +# ## define random seed +# seed = np.random.randint(10**8,high=None,size=self.nrun) +# ## retrieve results +# if(about.infos.status): +# so.write(about.infos.cstring("INFO: looping "+(' ')*10)) +# so.flush() +# results = map(self._single_probing,zip(seed,np.arange(self.nrun,dtype=np.int))) +# about.infos.cflush(" done.") +# ## cleanup +# results = [rr for rr in results if(rr is not None)] +# if(len(results)<self.nrun): +# about.infos.cflush(" ( %u probe(s) failed, effectiveness == %.1f%% )\n"%(self.nrun-len(results),100*len(results)/self.nrun)) +# else: +# about.infos.cflush("\n") +# ## evaluate +# return self.evaluate(results) +# +# def __call__(self,loop=False,**kwargs): +# """ +# +# Starts the probing process. +# All keyword arguments that can be given to `configure` can also be +# given to `__call__` and have the same effect. +# +# Parameters +# ---------- +# loop : bool, *optional* +# if `loop` is True, then multiprocessing will be disabled and +# all probes are evaluated by a single worker (default: False) +# +# Returns +# ------- +# results : see **Returns** in `evaluate` +# +# other parameters +# ---------------- +# kwargs : see **Parameters** in `configure` +# +# """ +# self.configure(**kwargs) +# if(not about.multiprocessing.status)or(loop): +# return self._nonparallel_probing() +# else: +# return self._parallel_probing() +# +# ##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ +# +# def __repr__(self): +# return "<nifty.probing>" +# +###============================================================================= +# +# +# +###----------------------------------------------------------------------------- +# +#class trace_probing(probing): +# """ +# .. __ +# .. / /_ +# .. / _/ _____ ____ __ _______ _______ +# .. / / / __/ / _ / / ____/ / __ / +# .. / /_ / / / /_/ / / /____ / /____/ +# .. \___/ /__/ \______| \______/ \______/ probing class +# +# NIFTY subclass for trace probing (using multiprocessing) +# +# When called, a trace_probing class instance samples the trace of an +# operator or a function using random fields, whose components are random +# variables with mean 0 and variance 1. When an instance is called it +# returns the mean value of the scalar product of probe and f(probe), +# where probe is a random field with mean 0 and variance 1. +# The mean is calculated as 1/N Sum[ probe_i.dot(f(probe_i)) ]. +# +# Parameters +# ---------- +# op : operator +# The operator specified by `op` is the operator to be probed. +# If no operator is given, then probing will be done by applying +# `function` to the probes. (default: None) +# function : function, *optional* +# If no operator has been specified as `op`, then specification of +# `function` is non optional. This is the function, that is applied +# to the probes. (default: `op.times`) +# domain : space, *optional* +# If no operator has been specified as `op`, then specification of +# `domain` is non optional. This is the space that the probes live +# in. (default: `op.domain`) +# target : domain, *optional* +# `target` is the codomain of `domain` +# (default: `op.domain.get_codomain()`) +# random : string, *optional* +# the distribution from which the probes are drawn. `random` can be +# either "pm1" or "gau". "pm1" is a uniform distribution over {+1,-1} +# or {+1,+i,-1,-i}, respectively. "gau" is a normal distribution with +# zero-mean and unit-variance (default: "pm1") +# ncpu : int, *optional* +# the number of cpus to be used from parallel probing. (default: 2) +# nrun : int, *optional* +# the number of probes to be evaluated. If `nrun<ncpu**2`, it will be +# set to `ncpu**2`. (default: 8) +# nper : int, *optional* +# this number specifies how many probes will be evaluated by one +# worker. Afterwards a new worker will be created to evaluate a chunk +# of `nper` probes. +# If for example `nper=nrun/ncpu`, then every worker will be created +# for every cpu. This can lead to the case, that all workers but one +# are already finished, but have to wait for the last worker that +# might still have a considerable amount of evaluations left. This is +# obviously not very effective. +# If on the other hand `nper=1`, then for each evaluation a worker will +# be created. In this case all cpus will work until nrun probes have +# been evaluated. +# It is recommended to leave `nper` as the default value. (default: 8) +# var : bool, *optional* +# If `var` is True, then the variance of the sampled function will +# also be returned. The result is then a tuple with the mean in the +# zeroth entry and the variance in the first entry. (default: False) +# +# +# See Also +# -------- +# probing : The base probing class +# diagonal_probing : A probing class to get the diagonal of an operator +# operator.tr : the trace function uses trace probing in the case of non +# diagonal operators +# +# +# Attributes +# ---------- +# function : function +# the function, that is applied to the probes +# domain : space +# the space, where the probes live in +# target : space +# the codomain of `domain` +# random : string +# the random number generator used to create the probes +# (either "pm1" or "gau") +# ncpu : int +# the number of cpus used for probing +# nrun : int +# the number of probes to be evaluated, when the instance is called +# nper : int +# number of probes, that will be evaluated by one worker +# var : bool +# whether the variance will be additionally returned, when the +# instance is called +# quargs : dict +# Keyword arguments passed to `function` in each call. +# +# """ +# def __init__(self,op,function=None,domain=None,target=None,random="pm1",ncpu=2,nrun=8,nper=None,var=False,**quargs): +# """ +# initializes a trace probing instance +# +# Parameters +# ---------- +# op : operator +# The operator specified by `op` is the operator to be probed. +# If no operator is given, then probing will be done by applying +# `function` to the probes. (default: None) +# function : function, *optional* +# If no operator has been specified as `op`, then specification of +# `function` is non optional. This is the function, that is applied +# to the probes. (default: `op.times`) +# domain : space, *optional* +# If no operator has been specified as `op`, then specification of +# `domain` is non optional. This is the space that the probes live +# in. (default: `op.domain`) +# target : domain, *optional* +# `target` is the codomain of `domain` +# (default: `op.domain.get_codomain()`) +# random : string, *optional* +# the distribution from which the probes are drawn. `random` can be +# either "pm1" or "gau". "pm1" is a uniform distribution over {+1,-1} +# or {+1,+i,-1,-i}, respectively. "gau" is a normal distribution with +# zero-mean and unit-variance (default: "pm1") +# ncpu : int, *optional* +# the number of cpus to be used from parallel probing. (default: 2) +# nrun : int, *optional* +# the number of probes to be evaluated. If `nrun<ncpu**2`, it will be +# set to `ncpu**2`. (default: 8) +# nper : int, *optional* +# this number specifies how many probes will be evaluated by one +# worker. Afterwards a new worker will be created to evaluate a chunk +# of `nper` probes. +# If for example `nper=nrun/ncpu`, then every worker will be created +# for every cpu. This can lead to the case, that all workers but one +# are already finished, but have to wait for the last worker that +# might still have a considerable amount of evaluations left. This is +# obviously not very effective. +# If on the other hand `nper=1`, then for each evaluation a worker will +# be created. In this case all cpus will work until nrun probes have +# been evaluated. +# It is recommended to leave `nper` as the default value. (default: 8) +# var : bool, *optional* +# If `var` is True, then the variance of the sampled function will +# also be returned. The result is then a tuple with the mean in the +# zeroth entry and the variance in the first entry. (default: False) +# +# """ +# if(not isinstance(op,operator)): +# raise TypeError(about._errors.cstring("ERROR: invalid input.")) +# elif(op.nrow()!=op.ncol()): +# raise ValueError(about._errors.cstring("ERROR: trace ill-defined for "+str(op.nrow())+" x "+str(op.ncol())+" matrices.")) +# +# ## check whether callable +# if(function is None)or(not hasattr(function,"__call__")): +# function = op.times +# elif(op==function): +# function = op.times +# ## check whether correctly bound +# if(op!=function.im_self): +# raise NameError(about._errors.cstring("ERROR: invalid input.")) +# self.function = function +# +# ## check given domain +# if(domain is None)or(not isinstance(domain,space)): +# if(self.function in [op.inverse_times,op.adjoint_times]): +# domain = op.target +# else: +# domain = op.domain +# elif(not op.domain.check_codomain(domain))or(not op.target.check_codomain(domain)): ## restrictive +# raise ValueError(about._errors.cstring("ERROR: incompatible domains.")) +# self.domain = domain +# +# if(target is None): +# target = domain.get_codomain() +# ## check codomain +# self.domain.check_codomain(target) ## a bit pointless +# self.target = target +# +# ## check degrees of freedom +# if(op.domain.dof()>self.domain.dof()): +# about.infos.cprint("INFO: variant numbers of degrees of freedom ( "+str(op.domain.dof())+" / "+str(self.domain.dof())+" ).") +# +# if(random not in ["pm1","gau"]): +# raise ValueError(about._errors.cstring("ERROR: unsupported random key '"+str(random)+"'.")) +# self.random = random +# +# self.ncpu = int(max(1,ncpu)) +# self.nrun = int(max(self.ncpu**2,nrun)) +# if(nper is None): +# self.nper = None +# else: +# self.nper = int(max(1,min(self.nrun//self.ncpu,nper))) +# +# self.var = bool(var) +# +# self.quargs = quargs +# +# ##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ +# +# def probing(self,idnum,probe): +# """ +# Computes a single probing result given one probe +# +# Parameters +# ---------- +# probe : field +# the field on which `function` will be applied +# idnum : int +# the identification number of the probing +# +# Returns +# ------- +# result : float +# the result of `probe.dot(function(probe))` +# """ +# f = self.function(probe,**self.quargs) +# if(f is None): +# return None +# else: +# return self.domain.calc_dot(probe.val,f.val) ## discrete inner product +# +# ##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ +# +# def __repr__(self): +# return "<nifty.trace_probing>" +# +###----------------------------------------------------------------------------- +# +# +# +###----------------------------------------------------------------------------- +# +#class diagonal_probing(probing): +# """ +# .. __ __ __ +# .. / / /__/ / / +# .. ____/ / __ ____ __ ____ __ ______ __ ___ ____ __ / / +# .. / _ / / / / _ / / _ / / _ | / _ | / _ / / / +# .. / /_/ / / / / /_/ / / /_/ / / /_/ / / / / / / /_/ / / /_ +# .. \______| /__/ \______| \___ / \______/ /__/ /__/ \______| \___/ probing class +# .. /______/ +# +# NIFTY subclass for diagonal probing (using multiprocessing) +# +# When called, a diagonal_probing class instance samples the diagonal of +# an operator or a function using random fields, whose components are +# random variables with mean 0 and variance 1. When an instance is called +# it returns the mean value of probe*f(probe), where probe is a random +# field with mean 0 and variance 1. +# The mean is calculated as 1/N Sum[ probe_i*f(probe_i) ] +# ('*' denoting component wise multiplication) +# +# Parameters +# ---------- +# op : operator +# The operator specified by `op` is the operator to be probed. +# If no operator is given, then probing will be done by applying +# `function` to the probes. (default: None) +# function : function, *optional* +# If no operator has been specified as `op`, then specification of +# `function` is non optional. This is the function, that is applied +# to the probes. (default: `op.times`) +# domain : space, *optional* +# If no operator has been specified as `op`, then specification of +# `domain` is non optional. This is the space that the probes live +# in. (default: `op.domain`) +# target : domain, *optional* +# `target` is the codomain of `domain` +# (default: `op.domain.get_codomain()`) +# random : string, *optional* +# the distribution from which the probes are drawn. `random` can be +# either "pm1" or "gau". "pm1" is a uniform distribution over {+1,-1} +# or {+1,+i,-1,-i}, respectively. "gau" is a normal distribution with +# zero-mean and unit-variance (default: "pm1") +# ncpu : int, *optional* +# the number of cpus to be used for parallel probing. (default: 2) +# nrun : int, *optional* +# the number of probes to be evaluated. If `nrun<ncpu**2`, it will be +# set to `ncpu**2`. (default: 8) +# nper : int, *optional* +# this number specifies how many probes will be evaluated by one +# worker. Afterwards a new worker will be created to evaluate a chunk +# of `nper` probes. +# If for example `nper=nrun/ncpu`, then every worker will be created +# for every cpu. This can lead to the case, that all workers but one +# are already finished, but have to wait for the last worker that +# might still have a considerable amount of evaluations left. This is +# obviously not very effective. +# If on the other hand `nper=1`, then for each evaluation a worker will +# be created. In this case all cpus will work until nrun probes have +# been evaluated. +# It is recommended to leave `nper` as the default value. (default: 8) +# var : bool, *optional* +# If `var` is True, then the variance of the sampled function will +# also be returned. The result is then a tuple with the mean in the +# zeroth entry and the variance in the first entry. (default: False) +# save : bool, *optional* +# If `save` is True, then the probing results will be written to the +# hard disk instead of being saved in the RAM. This is recommended +# for high dimensional fields whose probes would otherwise fill up +# the memory. (default: False) +# path : string, *optional* +# the path, where the probing results are saved, if `save` is True. +# (default: "tmp") +# prefix : string, *optional* +# a prefix for the saved probing results. The saved results will be +# named using that prefix and an 8-digit number +# (e.g. "<prefix>00000001.npy"). (default: "") +# +# +# See Also +# -------- +# trace_probing : A probing class to get the trace of an operator +# probing : The base probing class +# operator.diag : The diag function uses diagonal probing in the case of +# non diagonal operators +# operator.hat : The hat function uses diagonal probing in the case of +# non diagonal operators +# +# +# Attributes +# ---------- +# function : function +# the function, that is applied to the probes +# domain : space +# the space, where the probes live in +# target : space +# the codomain of `domain` +# random : string +# the random number generator used to create the probes +# (either "pm1" or "gau") +# ncpu : int +# the number of cpus used for probing +# nrun : int +# the number of probes to be evaluated, when the instance is called +# nper : int +# number of probes, that will be evaluated by one worker +# var : bool +# whether the variance will be additionally returned, when the +# instance is called +# save : {string, None} +# the path and prefix for saved probe files. None in the case where +# the probing results are stored in the RAM. +# quargs : dict +# Keyword arguments passed to `function` in each call. +# +# """ +# def __init__(self,op,function=None,domain=None,target=None,random="pm1",ncpu=2,nrun=8,nper=None,var=False,save=False,path="tmp",prefix="",**quargs): +# """ +# initializes a diagonal probing instance +# +# Parameters +# ---------- +# op : operator +# The operator specified by `op` is the operator to be probed. +# If no operator is given, then probing will be done by applying +# `function` to the probes. (default: None) +# function : function, *optional* +# If no operator has been specified as `op`, then specification of +# `function` is non optional. This is the function, that is applied +# to the probes. (default: `op.times`) +# domain : space, *optional* +# If no operator has been specified as `op`, then specification of +# `domain` is non optional. This is the space that the probes live +# in. (default: `op.domain`) +# target : domain, *optional* +# `target` is the codomain of `domain` +# (default: `op.domain.get_codomain()`) +# random : string, *optional* +# the distribution from which the probes are drawn. `random` can be +# either "pm1" or "gau". "pm1" is a uniform distribution over {+1,-1} +# or {+1,+i,-1,-i}, respectively. "gau" is a normal distribution with +# zero-mean and unit-variance (default: "pm1") +# ncpu : int, *optional* +# the number of cpus to be used for parallel probing. (default: 2) +# nrun : int, *optional* +# the number of probes to be evaluated. If `nrun<ncpu**2`, it will be +# set to `ncpu**2`. (default: 8) +# nper : int, *optional* +# this number specifies how many probes will be evaluated by one +# worker. Afterwards a new worker will be created to evaluate a chunk +# of `nper` probes. +# If for example `nper=nrun/ncpu`, then every worker will be created +# for every cpu. This can lead to the case, that all workers but one +# are already finished, but have to wait for the last worker that +# might still have a considerable amount of evaluations left. This is +# obviously not very effective. +# If on the other hand `nper=1`, then for each evaluation a worker will +# be created. In this case all cpus will work until nrun probes have +# been evaluated. +# It is recommended to leave `nper` as the default value. (default: 8) +# var : bool, *optional* +# If `var` is True, then the variance of the sampled function will +# also be returned. The result is then a tuple with the mean in the +# zeroth entry and the variance in the first entry. (default: False) +# save : bool, *optional* +# If `save` is True, then the probing results will be written to the +# hard disk instead of being saved in the RAM. This is recommended +# for high dimensional fields whose probes would otherwise fill up +# the memory. (default: False) +# path : string, *optional* +# the path, where the probing results are saved, if `save` is True. +# (default: "tmp") +# prefix : string, *optional* +# a prefix for the saved probing results. The saved results will be +# named using that prefix and an 8-digit number +# (e.g. "<prefix>00000001.npy"). (default: "") +# +# """ +# +# if(not isinstance(op,operator)): +# raise TypeError(about._errors.cstring("ERROR: invalid input.")) +# elif(op.nrow()!=op.ncol()): +# raise ValueError(about._errors.cstring("ERROR: diagonal ill-defined for "+str(op.nrow())+" x "+str(op.ncol())+" matrices.")) +# +# ## check whether callable +# if(function is None)or(not hasattr(function,"__call__")): +# function = op.times +# elif(op==function): +# function = op.times +# ## check whether correctly bound +# if(op!=function.im_self): +# raise NameError(about._errors.cstring("ERROR: invalid input.")) +# self.function = function +# +# ## check given domain +# if(domain is None)or(not isinstance(domain,space)): +# if(self.function in [op.inverse_times,op.adjoint_times]): +# domain = op.target +# else: +# domain = op.domain +# elif(not op.domain.check_codomain(domain))or(not op.target.check_codomain(domain)): ## restrictive +# raise ValueError(about._errors.cstring("ERROR: incompatible domains.")) +# self.domain = domain +# +# if(target is None): +# target = domain.get_codomain() +# ## check codomain +# self.domain.check_codomain(target) ## a bit pointless +# self.target = target +# +# ## check degrees of freedom +# if(self.domain.dof()>op.domain.dof()): +# about.infos.cprint("INFO: variant numbers of degrees of freedom ( "+str(self.domain.dof())+" / "+str(op.domain.dof())+" ).") +# +# if(random not in ["pm1","gau"]): +# raise ValueError(about._errors.cstring("ERROR: unsupported random key '"+str(random)+"'.")) +# self.random = random +# +# self.ncpu = int(max(1,ncpu)) +# self.nrun = int(max(self.ncpu**2,nrun)) +# if(nper is None): +# self.nper = None +# else: +# self.nper = int(max(1,min(self.nrun//self.ncpu,nper))) +# +# self.var = bool(var) +# +# if(save): +# path = os.path.expanduser(str(path)) +# if(not os.path.exists(path)): +# os.makedirs(path) +# self.save = os.path.join(path,str(prefix)) ## (back)slash inserted if needed +# else: +# self.save = None +# +# self.quargs = quargs +# +# ##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ +# +# def configure(self,**kwargs): +# """ +# changes the attributes of the instance +# +# Parameters +# ---------- +# random : string, *optional* +# the random number generator used to create the probes +# (default: "pm1") +# ncpu : int, *optional* +# the number of cpus to be used for parallel probing +# (default: 2) +# nrun : int, *optional* +# the number of probes to be evaluated. If `nrun<ncpu**2`, it will +# be set to `ncpu**2`. (default: 8) +# nper : int, *optional* +# number of probes, that will be evaluated by one worker +# (default: 8) +# var : bool, *optional* +# whether the variance will be additionally returned +# (default: False) +# save : bool, *optional* +# whether the individual probing results will be saved to the HDD +# (default: False) +# path : string, *optional* +# the path, where the probing results are saved (default: "tmp") +# prefix : string, *optional* +# a prefix for the saved probing results (default: "") +# +# """ +# if("random" in kwargs): +# if(kwargs.get("random") not in ["pm1","gau"]): +# raise ValueError(about._errors.cstring("ERROR: unsupported random key '"+str(kwargs.get("random"))+"'.")) +# self.random = kwargs.get("random") +# +# if("ncpu" in kwargs): +# self.ncpu = int(max(1,kwargs.get("ncpu"))) +# if("nrun" in kwargs): +# self.nrun = int(max(self.ncpu**2,kwargs.get("nrun"))) +# if("nper" in kwargs): +# if(kwargs.get("nper") is None): +# self.nper = None +# else: +# self.nper = int(max(1,min(self.nrun//self.ncpu,kwargs.get("nper")))) +# +# if("var" in kwargs): +# self.var = bool(kwargs.get("var")) +# +# if("save" in kwargs): +# if(kwargs.get("save")): +# if("path" in kwargs): +# path = kwargs.get("path") +# else: +# if(self.save is not None): +# about.warnings.cprint("WARNING: save path set to default.") +# path = "tmp" +# if("prefix" in kwargs): +# prefix = kwargs.get("prefix") +# else: +# if(self.save is not None): +# about.warnings.cprint("WARNING: save prefix set to default.") +# prefix = "" +# path = os.path.expanduser(str(path)) +# if(not os.path.exists(path)): +# os.makedirs(path) +# self.save = os.path.join(path,str(prefix)) ## (back)slash inserted if needed +# else: +# self.save = None +# +# ##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ +# +# def probing(self,idnum,probe): +# +# """ +# Computes a single probing result given one probe +# +# Parameters +# ---------- +# probe : field +# the field on which `function` will be applied +# idnum : int +# the identification number of the probing +# +# Returns +# ------- +# result : ndarray +# the result of `probe*(function(probe))` +# """ +# f = self.function(probe,**self.quargs) +# if(f is None): +# return None +# else: +# if(self.save is None): +# return np.conjugate(probe.val)*f.val +# else: +# result = np.conjugate(probe.val)*f.val +# np.save(self.save+"%08u"%idnum,result) +# return idnum +# +# ##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ +# +# def evaluate(self,results): +# """ +# evaluates the probing results +# +# Parameters +# ---------- +# results : list +# the list of ndarrays containing the results of the individual +# probings. +# +# Returns +# ------- +# final : ndarray +# the final probing result. 1/N Sum[ probe_i*f(probe_i) ] +# var : ndarray +# the variance of the final probing result. +# (N(N-1))^(-1) Sum[ (probe_i*f(probe_i) - final)^2 ] +# If the variance is returned, the return will be a tuple with +# final in the zeroth entry and var in the first entry. +# +# """ +# num = len(results) +# if(num==0): +# about.warnings.cprint("WARNING: probing failed.") +# return None +# elif(self.save is None): +# if(self.var): +# return np.mean(np.array(results),axis=0,dtype=None,out=None),np.var(np.array(results),axis=0,dtype=None,out=None,ddof=0)/(num-1) +# else: +# return np.mean(np.array(results),axis=0,dtype=None,out=None) +# else: +# final = np.copy(np.load(self.save+"%08u.npy"%results[0],mmap_mode=None)) +# for ii in xrange(1,num): +# final += np.load(self.save+"%08u.npy"%results[ii],mmap_mode=None) +# if(self.var): +# var = np.zeros(self.domain.dim(split=True),dtype=self.domain.datatype,order='C') +# for ii in xrange(num): +# var += (final-np.load(self.save+"%08u.npy"%results[ii],mmap_mode=None))**2 +# return final/num,var/(num*(num-1)) +# else: +# return final/num +# +# ##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ +# +# def __repr__(self): +# return "<nifty.diagonal_probing>" +# +###----------------------------------------------------------------------------- + +##----------------------------------------------------------------------------- + +class _share(object): + + __init__ = None + + @staticmethod + def _init_share(_sum,_num,_var): + _share.sum = _sum + _share.num = _num + _share.var = _var + +##----------------------------------------------------------------------------- + ##============================================================================= class probing(object): @@ -10419,7 +11475,7 @@ class probing(object): If on the other hand `nper=1`, then for each evaluation a worker will be created. In this case all cpus will work until nrun probes have been evaluated. - It is recommended to leave `nper` as the default value. (default: 8) + It is recommended to leave `nper` as the default value. (default: 1) var : bool, *optional* If `var` is True, then the variance of the sampled function will also be returned. The result is then a tuple with the mean in the @@ -10456,7 +11512,7 @@ class probing(object): Keyword arguments passed to `function` in each call. """ - def __init__(self,op=None,function=None,domain=None,target=None,random="pm1",ncpu=2,nrun=8,nper=None,var=False,**quargs): + def __init__(self,op=None,function=None,domain=None,target=None,random="pm1",ncpu=2,nrun=8,nper=1,var=False,**quargs): """ initializes a probing instance @@ -10499,7 +11555,7 @@ class probing(object): If on the other hand `nper=1`, then for each evaluation a worker will be created. In this case all cpus will work until nrun probes have been evaluated. - It is recommended to leave `nper` as the default value. (default: 8) + It is recommended to leave `nper` as the default value. (default: 1) var : bool, *optional* If `var` is True, then the variance of the sampled function will also be returned. The result is then a tuple with the mean in the @@ -10524,7 +11580,7 @@ class probing(object): ## check whether correctly bound if(op!=function.im_self): raise NameError(about._errors.cstring("ERROR: invalid input.")) - ## check given domain + ## check given shape and domain if(domain is None)or(not isinstance(domain,space)): if(function in [op.inverse_times,op.adjoint_times]): domain = op.target @@ -10533,16 +11589,21 @@ class probing(object): else: if(function in [op.inverse_times,op.adjoint_times]): op.target.check_codomain(domain) ## a bit pointless + if(target is None)or(not isinstance(target,space)): + target = op.target else: op.domain.check_codomain(domain) ## a bit pointless + if(target is None)or(not isinstance(target,space)): + target = op.domain self.function = function self.domain = domain + ## check codomain if(target is None): target = domain.get_codomain() - ## check codomain - self.domain.check_codomain(target) ## a bit pointless + else: + self.domain.check_codomain(target) ## a bit pointless self.target = target if(random not in ["pm1","gau"]): @@ -10576,7 +11637,7 @@ class probing(object): the number of probes to be evaluated. If `nrun<ncpu**2`, it will be set to `ncpu**2`. (default: 8) nper : int, *optional* - number of probes, that will be evaluated by one worker (default: 8) + number of probes, that will be evaluated by one worker (default: 1) var : bool, *optional* whether the variance will be additionally returned (default: False) @@ -10642,39 +11703,59 @@ class probing(object): ##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ - def evaluate(self,results): + def evaluate(self,sum_,num_,var_): """ - evaluates the probing results + Evaluates the probing results. Parameters ---------- - results : list - the list containing the results of the individual probings. - The type of the list elements depends on the function. + sum_ : numpy.array + Sum of all probing results. + num_ : int + Number of successful probings (not returning ``None``). + var_ : numpy.array + Sum of all squared probing results Returns ------- - final : array-like - the final probing result. 1/N Sum[ probing(probe_i) ] + final : numpy.array + The final probing result; 1/N Sum[ probing(probe_i) ] var : array-like - the variance of the final probing result. - (N(N-1))^(-1) Sum[ ( probing(probe_i) - final)^2 ] - If the variance is returned, the return will be a tuple with - `final` in the zeroth entry and `var` in the first entry. + The variance of the final probing result; + 1/(N(N-1)) Sum[( probing(probe_i) - final )^2]; + if the variance is returned, the return will be a tuple of + (`final`,`var`). """ - if(len(results)==0): - about.warnings.cprint("WARNING: probing failed.") - return None - elif(self.var): - return np.mean(np.array(results),axis=0,dtype=None,out=None),np.var(np.array(results),axis=0,dtype=None,out=None,ddof=0)/(len(results)-1) + if(num_<self.nrun): + about.infos.cflush(" ( %u probe(s) failed, effectiveness == %.1f%% )\n"%(self.nrun-num_,100*num_/self.nrun)) + if(num_==0): + about.warnings.cprint("WARNING: probing failed.") + return None else: - return np.mean(np.array(results),axis=0,dtype=None,out=None) + about.infos.cflush("\n") + + if(sum_.size==1): + sum_ = sum_.flatten(order='C')[0] + var_ = var_.flatten(order='C')[0] + if(np.iscomplexobj(sum_))and(np.all(np.imag(sum_)==0)): + sum_ = np.real(sum_) + + final = sum_*(1/num_) + if(self.var): + if(num_==1): + about.warnings.cprint("WARNING: infinite variance.") + return final,None + else: + var = var_*(1/(num_*(num_-1)))-np.real(np.conjugate(final)*final)*(1/(num_-1)) + return final,var + else: + return final ##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ - def _progress(self,idnum): ## > prints progress status by in upto 10 dots - tenths = 1+(10*idnum//self.nrun) + def _progress(self,irun): ## > prints progress status by in upto 10 dots + tenths = 1+(10*irun//self.nrun) about.infos.cflush(("\b")*10+('.')*tenths+(' ')*(10-tenths)) def _single_probing(self,zipped): ## > performs one probing operation @@ -10682,27 +11763,63 @@ class probing(object): np.random.seed(zipped[0]) probe = self.gen_probe() ## do the actual probing - self._progress(zipped[1]) return self.probing(zipped[1],probe) def _serial_probing(self,zipped): ## > performs the probing operation serially try: - return self._single_probing(zipped) + result = np.array(self._single_probing(zipped)).flatten(order='C') except: ## kill pool os.kill() + else: + if(result is not None): + if(np.iscomplexobj(result)): + _share.sum[0].acquire(block=True,timeout=None) + _share.sum[0][:] += np.real(result) + _share.sum[0].release() + _share.sum[1].acquire(block=True,timeout=None) + _share.sum[1][:] += np.imag(result) + _share.sum[1].release() + else: + _share.sum.acquire(block=True,timeout=None) + _share.sum[:] += result + _share.sum.release() + _share.num.acquire(block=True,timeout=None) + _share.num.value += 1 + _share.num.release() + if(self.var): + _share.var.acquire(block=True,timeout=None) + _share.var[:] += np.real(np.conjugate(result)*result) + _share.var.release() + self._progress(_share.num.value) def _parallel_probing(self): ## > performs the probing operations in parallel ## define random seed seed = np.random.randint(10**8,high=None,size=self.nrun) + ## get output shape + result = self.probing(0,field(self.domain,val=None,target=self.target)) + if(np.isscalar(result))or(np.size(result)==1): + shape = np.ones(1,dtype=np.int,order='C') + else: + shape = np.array(np.array(result).shape) + ## define shared objects + if(np.iscomplexobj(result)): + _sum = (ma('d',np.zeros(np.prod(shape,axis=0,dtype=np.int,out=None),dtype=np.float64,order='C'),lock=True),ma('d',np.zeros(np.prod(shape,axis=0,dtype=np.int,out=None),dtype=np.float64,order='C'),lock=True)) ## tuple(real,imag) + else: + _sum = ma('d',np.zeros(np.prod(shape,axis=0,dtype=np.int,out=None),dtype=np.float64,order='C'),lock=True) + _num = mv('I',0,lock=True) + if(self.var): + _var = ma('d',np.zeros(np.prod(shape,axis=0,dtype=np.int,out=None),dtype=np.float64,order='C'),lock=True) + else: + _var = None ## build pool if(about.infos.status): so.write(about.infos.cstring("INFO: multiprocessing "+(' ')*10)) so.flush() - pool = mp(processes=self.ncpu,initializer=None,initargs=(),maxtasksperchild=self.nper) + pool = mp(processes=self.ncpu,initializer=_share._init_share,initargs=(_sum,_num,_var),maxtasksperchild=self.nper) try: - ## retrieve results - results = pool.map(self._serial_probing,zip(seed,np.arange(self.nrun,dtype=np.int)),chunksize=None)#,callback=None).get(timeout=None) ## map_async replaced + ## pooling + pool.map_async(self._serial_probing,zip(seed,np.arange(self.nrun,dtype=np.int)),chunksize=None,callback=None).get(timeout=None) ## close and join pool about.infos.cflush(" done.") pool.close() @@ -10712,34 +11829,36 @@ class probing(object): pool.terminate() pool.join() raise Exception(about._errors.cstring("ERROR: unknown. NOTE: pool terminated.")) ## traceback by looping - ## cleanup - results = [rr for rr in results if(rr is not None)] - if(len(results)<self.nrun): - about.infos.cflush(" ( %u probe(s) failed, effectiveness == %.1f%% )\n"%(self.nrun-len(results),100*len(results)/self.nrun)) - else: - about.infos.cflush("\n") ## evaluate - return self.evaluate(results) + if(np.iscomplexobj(result)): + _sum = (np.array(_sum[0][:])+np.array(_sum[1][:])*1j).reshape(shape) ## comlpex array + else: + _sum = np.array(_sum[:]).reshape(shape) + if(self.var): + _var = np.array(_var[:]).reshape(shape) + return self.evaluate(_sum,_num.value,_var) def _nonparallel_probing(self): ## > performs the probing operations one after another - ## define random seed - seed = np.random.randint(10**8,high=None,size=self.nrun) - ## retrieve results + ## looping if(about.infos.status): so.write(about.infos.cstring("INFO: looping "+(' ')*10)) so.flush() - results = map(self._single_probing,zip(seed,np.arange(self.nrun,dtype=np.int))) + _sum = 0 + _num = 0 + _var = 0 + for ii in xrange(self.nrun): + result = self._single_probing((np.random.randint(10**8,high=None,size=None),ii)) ## tuple(seed,idnum) + if(result is not None): + _sum += result ## result: {scalar, np.array} + _num += 1 + if(self.var): + _var += np.real(np.conjugate(result)*result) + self._progress(_num) about.infos.cflush(" done.") - ## cleanup - results = [rr for rr in results if(rr is not None)] - if(len(results)<self.nrun): - about.infos.cflush(" ( %u probe(s) failed, effectiveness == %.1f%% )\n"%(self.nrun-len(results),100*len(results)/self.nrun)) - else: - about.infos.cflush("\n") ## evaluate - return self.evaluate(results) + return self.evaluate(_sum,_num,_var) - def __call__(self,loop=False,**kwargs): + def __call__(self,loop=False,**kwargs): ## FIXME: doc """ Starts the probing process. @@ -10835,7 +11954,7 @@ class trace_probing(probing): If on the other hand `nper=1`, then for each evaluation a worker will be created. In this case all cpus will work until nrun probes have been evaluated. - It is recommended to leave `nper` as the default value. (default: 8) + It is recommended to leave `nper` as the default value. (default: 1) var : bool, *optional* If `var` is True, then the variance of the sampled function will also be returned. The result is then a tuple with the mean in the @@ -10874,7 +11993,7 @@ class trace_probing(probing): Keyword arguments passed to `function` in each call. """ - def __init__(self,op,function=None,domain=None,target=None,random="pm1",ncpu=2,nrun=8,nper=None,var=False,**quargs): + def __init__(self,op,function=None,domain=None,target=None,random="pm1",ncpu=2,nrun=8,nper=1,var=False,**quargs): """ initializes a trace probing instance @@ -10917,7 +12036,7 @@ class trace_probing(probing): If on the other hand `nper=1`, then for each evaluation a worker will be created. In this case all cpus will work until nrun probes have been evaluated. - It is recommended to leave `nper` as the default value. (default: 8) + It is recommended to leave `nper` as the default value. (default: 1) var : bool, *optional* If `var` is True, then the variance of the sampled function will also be returned. The result is then a tuple with the mean in the @@ -10945,14 +12064,24 @@ class trace_probing(probing): domain = op.target else: domain = op.domain - elif(not op.domain.check_codomain(domain))or(not op.target.check_codomain(domain)): ## restrictive - raise ValueError(about._errors.cstring("ERROR: incompatible domains.")) + else: + if(function in [op.inverse_times,op.adjoint_times]): + if(not op.target.check_codomain(domain)): ## restrictive + raise ValueError(about._errors.cstring("ERROR: incompatible domains.")) + if(target is None)or(not isinstance(target,space)): + target = op.target + else: + if(not op.domain.check_codomain(domain)): ## restrictive + raise ValueError(about._errors.cstring("ERROR: incompatible domains.")) + if(target is None)or(not isinstance(target,space)): + target = op.domain self.domain = domain + ## check codomain if(target is None): target = domain.get_codomain() - ## check codomain - self.domain.check_codomain(target) ## a bit pointless + else: + self.domain.check_codomain(target) ## a bit pointless self.target = target ## check degrees of freedom @@ -10992,7 +12121,7 @@ class trace_probing(probing): result : float the result of `probe.dot(function(probe))` """ - f = self.function(probe,**self.quargs) + f = self.function(probe,**self.quargs) ## field if(f is None): return None else: @@ -11000,6 +12129,123 @@ class trace_probing(probing): ##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + def evaluate(self,sum_,num_,var_): + """ + Evaluates the probing results. + + Parameters + ---------- + sum_ : scalar + Sum of all probing results. + num_ : int + Number of successful probings (not returning ``None``). + var_ : scalar + Sum of all squared probing results + + Returns + ------- + final : scalar + The final probing result; 1/N Sum[ probing(probe_i) ] + var : scalar + The variance of the final probing result; + 1/(N(N-1)) Sum[( probing(probe_i) - final )^2]; + if the variance is returned, the return will be a tuple of + (`final`,`var`). + + """ + if(num_<self.nrun): + about.infos.cflush(" ( %u probe(s) failed, effectiveness == %.1f%% )\n"%(self.nrun-num_,100*num_/self.nrun)) + if(num_==0): + about.warnings.cprint("WARNING: probing failed.") + return None + else: + about.infos.cflush("\n") + + if(self.domain.datatype in [np.complex64,np.complex128]): + sum_ = np.real(sum_) + + final = sum_/num_ + if(self.var): + if(num_==1): + about.warnings.cprint("WARNING: infinite variance.") + return final,None + else: + var = var_/(num_*(num_-1))-np.real(np.conjugate(final)*final)/(num_-1) + return final,var + else: + return final + + ##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + + def _serial_probing(self,zipped): ## > performs the probing operation serially + try: + result = self._single_probing(zipped) + except: + ## kill pool + os.kill() + else: + if(result is not None): + if(np.iscomplexobj(result)): + _share.sum[0].acquire(block=True,timeout=None) + _share.sum[0].value += np.real(result) + _share.sum[0].release() + _share.sum[1].acquire(block=True,timeout=None) + _share.sum[1].value += np.imag(result) + _share.sum[1].release() + else: + _share.sum.acquire(block=True,timeout=None) + _share.sum.value += result + _share.sum.release() + _share.num.acquire(block=True,timeout=None) + _share.num.value += 1 + _share.num.release() + if(self.var): + _share.var.acquire(block=True,timeout=None) + _share.var.value += np.real(np.conjugate(result)*result) + _share.var.release() + self._progress(_share.num.value) + + def _parallel_probing(self): ## > performs the probing operations in parallel + ## define random seed + seed = np.random.randint(10**8,high=None,size=self.nrun) + ## define shared objects + if(self.domain.datatype in [np.complex64,np.complex128]): + _sum = (mv('d',0,lock=True),mv('d',0,lock=True)) ## tuple(real,imag) + else: + _sum = mv('d',0,lock=True) + _num = mv('I',0,lock=True) + if(self.var): + _var = mv('d',0,lock=True) + else: + _var = None + ## build pool + if(about.infos.status): + so.write(about.infos.cstring("INFO: multiprocessing "+(' ')*10)) + so.flush() + pool = mp(processes=self.ncpu,initializer=_share._init_share,initargs=(_sum,_num,_var),maxtasksperchild=self.nper) + try: + ## pooling + pool.map_async(self._serial_probing,zip(seed,np.arange(self.nrun,dtype=np.int)),chunksize=None,callback=None).get(timeout=None) + ## close and join pool + about.infos.cflush(" done.") + pool.close() + pool.join() + except: + ## terminate and join pool + pool.terminate() + pool.join() + raise Exception(about._errors.cstring("ERROR: unknown. NOTE: pool terminated.")) ## traceback by looping + ## evaluate + if(self.domain.datatype in [np.complex64,np.complex128]): + _sum = np.complex(_sum[0].value,_sum[1].value) + else: + _sum = _sum.value + if(self.var): + _var = _var.value + return self.evaluate(_sum,_num.value,_var) + + ##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + def __repr__(self): return "<nifty.trace_probing>" @@ -11068,7 +12314,7 @@ class diagonal_probing(probing): If on the other hand `nper=1`, then for each evaluation a worker will be created. In this case all cpus will work until nrun probes have been evaluated. - It is recommended to leave `nper` as the default value. (default: 8) + It is recommended to leave `nper` as the default value. (default: 1) var : bool, *optional* If `var` is True, then the variance of the sampled function will also be returned. The result is then a tuple with the mean in the @@ -11124,7 +12370,7 @@ class diagonal_probing(probing): Keyword arguments passed to `function` in each call. """ - def __init__(self,op,function=None,domain=None,target=None,random="pm1",ncpu=2,nrun=8,nper=None,var=False,save=False,path="tmp",prefix="",**quargs): + def __init__(self,op,function=None,domain=None,target=None,random="pm1",ncpu=2,nrun=8,nper=1,var=False,save=False,path="tmp",prefix="",**quargs): """ initializes a diagonal probing instance @@ -11208,14 +12454,24 @@ class diagonal_probing(probing): domain = op.target else: domain = op.domain - elif(not op.domain.check_codomain(domain))or(not op.target.check_codomain(domain)): ## restrictive - raise ValueError(about._errors.cstring("ERROR: incompatible domains.")) + else: + if(function in [op.inverse_times,op.adjoint_times]): + if(not op.target.check_codomain(domain)): ## restrictive + raise ValueError(about._errors.cstring("ERROR: incompatible domains.")) + if(target is None)or(not isinstance(target,space)): + target = op.target + else: + if(not op.domain.check_codomain(domain)): ## restrictive + raise ValueError(about._errors.cstring("ERROR: incompatible domains.")) + if(target is None)or(not isinstance(target,space)): + target = op.domain self.domain = domain + ## check codomain if(target is None): target = domain.get_codomain() - ## check codomain - self.domain.check_codomain(target) ## a bit pointless + else: + self.domain.check_codomain(target) ## a bit pointless self.target = target ## check degrees of freedom @@ -11264,7 +12520,7 @@ class diagonal_probing(probing): be set to `ncpu**2`. (default: 8) nper : int, *optional* number of probes, that will be evaluated by one worker - (default: 8) + (default: 1) var : bool, *optional* whether the variance will be additionally returned (default: False) @@ -11335,60 +12591,83 @@ class diagonal_probing(probing): result : ndarray the result of `probe*(function(probe))` """ - f = self.function(probe,**self.quargs) + f = self.function(probe,**self.quargs) ## field if(f is None): return None else: - if(self.save is None): - return np.conjugate(probe.val)*f.val - else: - result = np.conjugate(probe.val)*f.val + result = np.conjugate(probe.val)*f.val + if(self.save is not None): np.save(self.save+"%08u"%idnum,result) - return idnum + return result ##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ - def evaluate(self,results): - """ - evaluates the probing results - - Parameters - ---------- - results : list - the list of ndarrays containing the results of the individual - probings. - - Returns - ------- - final : ndarray - the final probing result. 1/N Sum[ probe_i*f(probe_i) ] - var : ndarray - the variance of the final probing result. - (N(N-1))^(-1) Sum[ (probe_i*f(probe_i) - final)^2 ] - If the variance is returned, the return will be a tuple with - final in the zeroth entry and var in the first entry. + def _serial_probing(self,zipped): ## > performs the probing operation serially + try: + result = np.array(self._single_probing(zipped)).flatten(order='C') + except: + ## kill pool + os.kill() + else: + if(result is not None): + if(np.iscomplexobj(result)): + _share.sum[0].acquire(block=True,timeout=None) + _share.sum[0][:] += np.real(result) + _share.sum[0].release() + _share.sum[1].acquire(block=True,timeout=None) + _share.sum[1][:] += np.imag(result) + _share.sum[1].release() + else: + _share.sum.acquire(block=True,timeout=None) + _share.sum[:] += result + _share.sum.release() + _share.num.acquire(block=True,timeout=None) + _share.num.value += 1 + _share.num.release() + if(self.var): + _share.var.acquire(block=True,timeout=None) + _share.var[:] += np.real(np.conjugate(result)*result) + _share.var.release() + self._progress(_share.num.value) - """ - num = len(results) - if(num==0): - about.warnings.cprint("WARNING: probing failed.") - return None - elif(self.save is None): - if(self.var): - return np.mean(np.array(results),axis=0,dtype=None,out=None),np.var(np.array(results),axis=0,dtype=None,out=None,ddof=0)/(num-1) - else: - return np.mean(np.array(results),axis=0,dtype=None,out=None) - else: - final = np.copy(np.load(self.save+"%08u.npy"%results[0],mmap_mode=None)) - for ii in xrange(1,num): - final += np.load(self.save+"%08u.npy"%results[ii],mmap_mode=None) - if(self.var): - var = np.zeros(self.domain.dim(split=True),dtype=self.domain.datatype,order='C') - for ii in xrange(num): - var += (final-np.load(self.save+"%08u.npy"%results[ii],mmap_mode=None))**2 - return final/num,var/(num*(num-1)) - else: - return final/num + def _parallel_probing(self): ## > performs the probing operations in parallel + ## define random seed + seed = np.random.randint(10**8,high=None,size=self.nrun) + ## define shared objects + if(self.domain.datatype in [np.complex64,np.complex128]): + _sum = (ma('d',np.zeros(self.domain.dim(split=False),dtype=np.float64,order='C'),lock=True),ma('d',np.zeros(self.domain.dim(split=False),dtype=np.float64,order='C'),lock=True)) ## tuple(real,imag) + else: + _sum = ma('d',np.zeros(self.domain.dim(split=False),dtype=np.float64,order='C'),lock=True) + _num = mv('I',0,lock=True) + if(self.var): + _var = ma('d',np.zeros(self.domain.dim(split=False),dtype=np.float64,order='C'),lock=True) + else: + _var = None + ## build pool + if(about.infos.status): + so.write(about.infos.cstring("INFO: multiprocessing "+(' ')*10)) + so.flush() + pool = mp(processes=self.ncpu,initializer=_share._init_share,initargs=(_sum,_num,_var),maxtasksperchild=self.nper) + try: + ## pooling + pool.map_async(self._serial_probing,zip(seed,np.arange(self.nrun,dtype=np.int)),chunksize=None,callback=None).get(timeout=None) + ## close and join pool + about.infos.cflush(" done.") + pool.close() + pool.join() + except: + ## terminate and join pool + pool.terminate() + pool.join() + raise Exception(about._errors.cstring("ERROR: unknown. NOTE: pool terminated.")) ## traceback by looping + ## evaluate + if(self.domain.datatype in [np.complex64,np.complex128]): + _sum = (np.array(_sum[0][:])+np.array(_sum[1][:])*1j).reshape(self.domain.dim(split=True)) ## comlpex array + else: + _sum = np.array(_sum[:]).reshape(self.domain.dim(split=True)) + if(self.var): + _var = np.array(_var[:]).reshape(self.domain.dim(split=True)) + return self.evaluate(_sum,_num.value,_var) ##+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ -- GitLab