Commit 40aae561 authored by Martin Reinecke's avatar Martin Reinecke
Browse files

different multiprocessing

parent 641e1576
...@@ -53,9 +53,7 @@ if __name__ == "__main__": ...@@ -53,9 +53,7 @@ if __name__ == "__main__":
# Probing the uncertainty |\label{code:wf_uncertainty_probing}| # Probing the uncertainty |\label{code:wf_uncertainty_probing}|
class Proby(ift.DiagonalProberMixin, ift.Prober): pass class Proby(ift.DiagonalProberMixin, ift.Prober): pass
proby = Proby(signal_space, probe_count=10) proby = Proby(signal_space, probe_count=10,ncpu=1)
# class Proby(ift.DiagonalProberMixin, ift.ParallelProber): pass
# proby = Proby(signal_space, probe_count=10,ncpu=2)
proby(lambda z: fft(wiener_curvature.inverse_times(fft.inverse_times(z)))) #|\label{code:wf_variance_fft_wrap}| proby(lambda z: fft(wiener_curvature.inverse_times(fft.inverse_times(z)))) #|\label{code:wf_variance_fft_wrap}|
sm = ift.FFTSmoothingOperator(signal_space, sigma=0.03) sm = ift.FFTSmoothingOperator(signal_space, sigma=0.03)
......
...@@ -72,6 +72,10 @@ class Prober(object): ...@@ -72,6 +72,10 @@ class Prober(object):
# ---Probing methods--- # ---Probing methods---
def gen_parallel_probe(self,callee):
for i in range(self.probe_count):
yield (callee, self.get_probe(i))
def probing_run(self, callee): def probing_run(self, callee):
""" controls the generation, evaluation and finalization of probes """ """ controls the generation, evaluation and finalization of probes """
self.reset() self.reset()
...@@ -81,21 +85,16 @@ class Prober(object): ...@@ -81,21 +85,16 @@ class Prober(object):
pre_result = self.process_probe(callee, current_probe, index) pre_result = self.process_probe(callee, current_probe, index)
self.finish_probe(current_probe, pre_result) self.finish_probe(current_probe, pre_result)
else: else:
probes = [None]*self.probe_count from multiprocess import Pool
callee = [callee]*self.probe_count pool = Pool(self._ncpu)
index = np.arange(self.probe_count) for i in pool.imap_unordered(self.evaluate_probe_parallel,
for ii in range(self.probe_count): self.gen_parallel_probe(callee)):
probes[ii] = self.get_probe(index[ii]) self.finish_probe(i[0],i[1])
from pathos.multiprocessing import ProcessingPool as Pool def evaluate_probe_parallel(self, argtuple):
pool = Pool(ncpus=self._ncpu) callee = argtuple[0]
pre_results = pool.map(self.evaluate_probe_parallel, callee, probe = argtuple[1]
probes) return (probe, callee(probe[1]))
for ii in xrange(self.probe_count):
self.finish_probe(probes[ii], pre_results[ii])
def evaluate_probe_parallel(self, callee, probe):
return callee(probe[1])
def reset(self): def reset(self):
pass pass
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment