Commit ba1f78e7 authored by Marcel Henrik Schubert's avatar Marcel Henrik Schubert
Browse files

fixed multiprocessing

parent e30e3fec
......@@ -815,6 +815,21 @@ def _main(args):
datapath = os.path.join(args['datapath'], args['workset'], args['part'])
os.makedirs(savepath, exist_ok=True)
#must use Manager queue here, or will not work
manager = mp.Manager()
#create a two queues to split work into two parts
q = [manager.Queue(), manager.Queue()]
if not args['test']:
ncpus = mp.cpu_count()
if ncpus < 80:
print('failed to get 80 cpus - only got {}'.format(ncpus))
else:
ncpus = 4
print(' got {} cpus for calculations'.format(ncpus))
#load linebytes
if not os.path.exists(os.path.join(datapath, 'linebytes.json')) or args['rerun']:
......@@ -831,7 +846,7 @@ def _main(args):
lineBytes = check_done(lineBytes, savepath)
#chunkify list
lineBytes = chunkify(lineBytes, chunksize=1000)
lineBytes = chunkify(lineBytes, chunksize=ncpus-1)
#lineBytes = [[el,] for el in lineBytes]
#lineBytes =[lineBytes[1]]
......@@ -839,18 +854,9 @@ def _main(args):
py = psutil.Process(pid)
print('create managed queue...')
sys.stdout.flush()
#must use Manager queue here, or will not work
manager = mp.Manager()
#create a two queues to split work into two parts
q = [manager.Queue(), manager.Queue()]
if not args['test']:
ncpus = mp.cpu_count()
if ncpus < 80:
print('failed to get 80 cpus - only got '.format(ncpus))
else:
ncpus = 4
pool = mp.Pool(ncpus, maxtasksperchild=1)
#pool = mp.Pool(mp.cpu_count(), maxtasksperchild=1)
pool = mp.Pool(ncpus, maxtasksperchild=4)
#pool = mp.Pool(mp.cpu_count()-1, maxtasksperchild=1)
print('create listener for saving of data...')
sys.stdout.flush()
#put listeners to work first
......@@ -865,6 +871,10 @@ def _main(args):
#create jobs
print('create jobs')
sys.stdout.flush()
print(len(lineBytes))
print(type(lineBytes))
print(type(lineBytes[0]))
runs = 0
for chunk in lineBytes:
job = pool.apply_async(process_wrapper,(chunk, q, datapath, args['file'], range_CHAR,
range_WORD,range_TAG, range_DEP, range_POS,
......@@ -874,22 +884,24 @@ def _main(args):
print('''Process memory used by parent after
async: {}\nVirtual Memory used by parent after async: {}\n\n'''.format(py.memory_info()[0]*9.31*10e-10,py.memory_info()[1]*9.31*10e-10)
)
if q[1].qsize() > 1000:
if q[1].qsize() > 1000000:
print('sleeping because queue is too long...')
sys.stdout.flush()
time.sleep(10)
sys.stdout.flush()
gc.collect()
runs +=1
print("made {} tasks to pool".format(runs))
# collect results from the workers through the pool result queue
print('collect results from job cycle...')
sys.stdout.flush()
for i in range(0, len(jobs)):
tmp = jobs.pop(0)
tmp.get()
del tmp
print('sleep after cycle...')
sys.stdout.flush()
# collect results from the workers through the pool result queue
print('collect results from job cycle...')
sys.stdout.flush()
for i in range(0, len(jobs)):
tmp = jobs.pop(0)
tmp.get()
del tmp
print('sleep after cycle...')
sys.stdout.flush()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment