Commit 77655401 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Merge branch 'v0.7.5' of gitlab.mpcdf.mpg.de:nomad-lab/nomad-FAIR into v0.7.5

parents 2a29ba58 73d935b9
# Copyright 2019 Alvin Noe Ladines, Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Automatically synchronizes nomad it with a given database. It creates a list of paths
to mainfiles in nomad and compares it with paths in the external database. The missing
paths in nomad will then be downloaded from the external database and subsequently
uploaded to nomad. The downloaded files are by default saved in '/nomad/fairdi/external'.
"""
import requests
import re
import subprocess
from urllib.parse import urlparse
import os
from .client import client
from nomad import config
from nomad.cli.client import upload as nomad_upload
import datetime
import click
import tarfile
import threading
import time
from .client import client
from nomad import config
from nomad.cli.client import upload as nomad_upload
class DbUpdater:
def __init__(self, *args, **kwargs):
......@@ -109,7 +131,8 @@ class DbUpdater:
return ok
def _to_string(self, path):
return path.strip('/').replace('/', '_').replace(':', '_')
path = path.strip('/')
return re.sub(r'[^\w\d-]', '_', path)
def _read_from_file(self, filename):
print('Reading from file %s' % filename)
......@@ -168,7 +191,7 @@ class DbUpdater:
def _gen_nomad_list(self):
print('Generating NOMAD list')
if self.db_name.lower() == 'aflowlib':
servers = ['LIB1_LIB', 'LIB2_LIB', 'LIB3_LIB']
servers = ['LIB%d_LIB' % n for n in range(1, 10)]
paths = []
for s in servers:
if s in self.root_url:
......@@ -247,20 +270,29 @@ class DbUpdater:
size += os.path.getsize(fn)
return size
def _download_size(self, dirname):
complete = False
files = os.listdir(dirname)
if self.db_name.lower() == 'aflowlib':
if 'vasprun.xml.relax2.xz' in files:
complete = True
else:
complete = True
size = 0.0
if not complete:
self._cleanup([os.path.join(dirname, f) for f in files])
else:
for f in files:
size += os.path.getsize(os.path.join(dirname, f))
return size
def get_files(self, path):
dirname = urlparse(path).path
dirname = self._to_string(dirname)
dirname = os.path.join(self._local_path, dirname)
if os.path.isdir(dirname):
size = 0.0
files = os.listdir(dirname)
complete = False
for f in files:
if 'vasprun' in f:
complete = True
size += os.path.getsize(os.path.join(dirname, f))
if not complete:
self._cleanup([os.path.join(dirname, f) for f in files])
size = self._download_size(dirname)
if size == 0.0:
size = self._download(path, dirname)
else:
os.mkdir(dirname)
......@@ -268,11 +300,27 @@ class DbUpdater:
return dirname, size
def _tar_files(self, dirs, tarname):
with tarfile.open(tarname, 'w') as f:
if os.path.isfile(tarname):
return
try:
f = tarfile.open(tarname, 'w')
for d in dirs:
files = os.listdir(d)
for fn in files:
f.add(os.path.join(d, fn))
f.close()
except Exception as e:
os.remove(tarname)
print('Error writing tar file %s. %s' % (tarname, e))
def _make_name(self, dirs):
# name will be first and last entries
d1 = self._to_string(dirs[0].lstrip(self._local_path))
d2 = self._to_string(dirs[-1].lstrip(self._local_path))
tarname = '%s-%s' % (d1, d2)
uploadname = 'AFLOWLIB_%s' % tarname
tarname = os.path.join(self._local_path, tarname + '.tar')
return tarname, uploadname
def _cleanup(self, ilist):
if isinstance(ilist, str):
......@@ -288,6 +336,20 @@ class DbUpdater:
return False
return True
def _get_status_upload(self, uploadname):
res = self.client.uploads.get_uploads(name=uploadname, state='all').response().result
entries = res.results
status = None
upload_id = None
for entry in entries:
if entry['name'] == uploadname:
status = 'uploaded'
if entry['published']:
status = 'published'
upload_id = entry['upload_id']
break
return status, upload_id
def get_payload(self, uid):
timenow = datetime.datetime.utcnow()
if self.db_name == 'aflowlib':
......@@ -301,11 +363,23 @@ class DbUpdater:
'http://aflowlib.org',
'http://www.sciencedirect.com/science/article/pii/S0927025612000687'],
coauthors=[
148, 149, 150, 151, 152, 146, 145, 138, 137,
136, 135, 134, 147, 125],
'f409d859-2639-4f82-b198-85e1c7c62f8b',
'580f7036-97b8-42b1-a9e6-815058bcac72',
'4c8d767d-335f-4ccd-9459-d0152b2026bc',
'7effd16a-a65c-4d95-b692-652488d94146',
'd59b3610-6335-4ad8-aca0-24a905de3a25',
'3df68fed-6ca0-4bf9-b2b1-d6be71e18f72',
'f7b540eb-a266-4379-9611-911ed8e3630e',
'7b3fe468-0011-4ba8-bd53-1ee65acda114',
'48e7a028-1a41-440d-9986-38540f5079c9',
'd63d07e6-ccc8-4eac-82af-d841627b6c53',
'9c308f66-eed1-4996-b33e-af78fb4944c7',
'd2621bc7-c45a-4d35-9dc1-5c05fa8326cb',
'ecba0e68-65ee-4b40-8fbf-a42714b1072b',
'81b96683-7170-49d7-8c4e-e9f34906b3ea'],
shared_with=[],
_upload_time=timenow,
_uploader=125))
_uploader='81b96683-7170-49d7-8c4e-e9f34906b3ea'))
def publish(self, uids=None):
print('Publishing')
......@@ -321,21 +395,6 @@ class DbUpdater:
local_path=os.path.abspath(file_path), name=name).response().result
return res.upload_id
def register_upload(self, donelist, plist, pn):
data = []
for i in plist:
if i in donelist:
self.is_updated_list[i] = True
else:
data.append(self.update_list[i])
self._write_to_file(data, self.outfile + '_%d' % pn)
def aggregate_procs(self):
data = []
for i in range(self.parallel):
data += self._read_from_file(self.outfile + '_%d' % i)
self._write_to_file(data, self.outfile + '_updated')
def download_proc(self, plist, pn):
size = 0.0
max_zip_size = config.max_upload_size
......@@ -349,18 +408,21 @@ class DbUpdater:
dirs.append(d)
done.append(plist[i])
if size > max_zip_size or i == (len(plist) - 1):
tstamp = datetime.datetime.now().strftime('_%y%m%d%H%M%S%f')
tname = self._to_string(dirs[0].lstrip(self._local_path))
tname = os.path.join(self._local_path, tname + '%s.tar' % tstamp)
self._tar_files(dirs, tname)
uid = nomad_upload.upload_file(tname, name='AFLOWLIB_%s' % tstamp, offline=True)
if len(dirs) == 0:
continue
tarname, uploadname = self._make_name(dirs)
status, uid = self._get_status_upload(uploadname)
if status == 'published':
continue
if status != 'uploaded':
self._tar_files(dirs, tarname)
uid = nomad_upload.upload_file(tarname, name=uploadname, offline=True)
if self.do_publish:
self.publish([uid])
self.uids.append(uid)
self.register_upload(done, plist, pn)
if self.cleanup:
self._cleanup(dirs)
self._cleanup(tname)
self._cleanup(tarname)
size = 0.0
dirs = []
......@@ -374,13 +436,15 @@ class DbUpdater:
continue
plist[cur % self.parallel].append(i)
cur += 1
procs = []
for i in range(self.parallel):
p = threading.Thread(target=self.download_proc, args=(plist[i], i,))
procs.append(p)
[p.start() for p in procs]
[p.join() for p in procs]
self.aggregate_procs()
if self.parallel > 1:
procs = []
for i in range(self.parallel):
p = threading.Thread(target=self.download_proc, args=(plist[i], i,))
procs.append(p)
[p.start() for p in procs]
[p.join() for p in procs]
else:
self.download_proc(plist[0], 0)
print('Time for download and upload (s)', time.time() - s)
def prep_list(self):
......@@ -396,11 +460,15 @@ class DbUpdater:
self.update_list.append(l[0])
self.is_updated_list.append(l[1])
else:
procs = []
procs.append(threading.Thread(target=self.get_db_list))
procs.append(threading.Thread(target=self.get_nomad_list))
[p.start() for p in procs]
[p.join() for p in procs]
if self.parallel > 1:
procs = []
procs.append(threading.Thread(target=self.get_db_list))
procs.append(threading.Thread(target=self.get_nomad_list))
[p.start() for p in procs]
[p.join() for p in procs]
else:
self.get_db_list()
self.get_nomad_list()
self.compare_lists()
def update(self):
......
......@@ -55,7 +55,6 @@ def setup():
Will create client instances for the databases and has to be called before they
can be used.
"""
global elastic_client
setup_logging()
setup_mongo()
setup_elastic()
......
......@@ -461,6 +461,9 @@ class NomadCeleryRequest(Request):
def on_failure(self, exc_info, send_failed_event=True, return_ok=False):
if isinstance(exc_info.exception, WorkerLostError):
infrastructure.setup()
utils.get_logger(__name__).error(
'detected WorkerLostError', exc_info=exc_info.exception)
self._fail(
'task failed due to worker lost: %s' % str(exc_info.exception),
exc_info=exc_info)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment