diff --git a/nomad/cli/client/update_database.py b/nomad/cli/client/update_database.py index 6fb8ba7cc3c7ea0390b9508df3157fd3cc0765ae..1703abed8448a05e05c8b6386721b4f6fd93e86b 100644 --- a/nomad/cli/client/update_database.py +++ b/nomad/cli/client/update_database.py @@ -1,17 +1,39 @@ +# Copyright 2019 Alvin Noe Ladines, Markus Scheidgen +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an"AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Automatically synchronizes nomad it with a given database. It creates a list of paths +to mainfiles in nomad and compares it with paths in the external database. The missing +paths in nomad will then be downloaded from the external database and subsequently +uploaded to nomad. The downloaded files are by default saved in '/nomad/fairdi/external'. +""" + import requests import re import subprocess from urllib.parse import urlparse import os -from .client import client -from nomad import config -from nomad.cli.client import upload as nomad_upload import datetime import click import tarfile import threading import time +from .client import client +from nomad import config +from nomad.cli.client import upload as nomad_upload + class DbUpdater: def __init__(self, *args, **kwargs): @@ -109,7 +131,8 @@ class DbUpdater: return ok def _to_string(self, path): - return path.strip('/').replace('/', '_').replace(':', '_') + path = path.strip('/') + return re.sub(r'[^\w\d-]', '_', path) def _read_from_file(self, filename): print('Reading from file %s' % filename) @@ -168,7 +191,7 @@ class DbUpdater: def _gen_nomad_list(self): print('Generating NOMAD list') if self.db_name.lower() == 'aflowlib': - servers = ['LIB1_LIB', 'LIB2_LIB', 'LIB3_LIB'] + servers = ['LIB%d_LIB' % n for n in range(1, 10)] paths = [] for s in servers: if s in self.root_url: @@ -247,20 +270,29 @@ class DbUpdater: size += os.path.getsize(fn) return size + def _download_size(self, dirname): + complete = False + files = os.listdir(dirname) + if self.db_name.lower() == 'aflowlib': + if 'vasprun.xml.relax2.xz' in files: + complete = True + else: + complete = True + size = 0.0 + if not complete: + self._cleanup([os.path.join(dirname, f) for f in files]) + else: + for f in files: + size += os.path.getsize(os.path.join(dirname, f)) + return size + def get_files(self, path): dirname = urlparse(path).path dirname = self._to_string(dirname) dirname = os.path.join(self._local_path, dirname) if os.path.isdir(dirname): - size = 0.0 - files = os.listdir(dirname) - complete = False - for f in files: - if 'vasprun' in f: - complete = True - size += os.path.getsize(os.path.join(dirname, f)) - if not complete: - self._cleanup([os.path.join(dirname, f) for f in files]) + size = self._download_size(dirname) + if size == 0.0: size = self._download(path, dirname) else: os.mkdir(dirname) @@ -268,11 +300,27 @@ class DbUpdater: return dirname, size def _tar_files(self, dirs, tarname): - with tarfile.open(tarname, 'w') as f: + if os.path.isfile(tarname): + return + try: + f = tarfile.open(tarname, 'w') for d in dirs: files = os.listdir(d) for fn in files: f.add(os.path.join(d, fn)) + f.close() + except Exception as e: + os.remove(tarname) + print('Error writing tar file %s. %s' % (tarname, e)) + + def _make_name(self, dirs): + # name will be first and last entries + d1 = self._to_string(dirs[0].lstrip(self._local_path)) + d2 = self._to_string(dirs[-1].lstrip(self._local_path)) + tarname = '%s-%s' % (d1, d2) + uploadname = 'AFLOWLIB_%s' % tarname + tarname = os.path.join(self._local_path, tarname + '.tar') + return tarname, uploadname def _cleanup(self, ilist): if isinstance(ilist, str): @@ -288,6 +336,20 @@ class DbUpdater: return False return True + def _get_status_upload(self, uploadname): + res = self.client.uploads.get_uploads(name=uploadname, state='all').response().result + entries = res.results + status = None + upload_id = None + for entry in entries: + if entry['name'] == uploadname: + status = 'uploaded' + if entry['published']: + status = 'published' + upload_id = entry['upload_id'] + break + return status, upload_id + def get_payload(self, uid): timenow = datetime.datetime.utcnow() if self.db_name == 'aflowlib': @@ -301,11 +363,23 @@ class DbUpdater: 'http://aflowlib.org', 'http://www.sciencedirect.com/science/article/pii/S0927025612000687'], coauthors=[ - 148, 149, 150, 151, 152, 146, 145, 138, 137, - 136, 135, 134, 147, 125], + 'f409d859-2639-4f82-b198-85e1c7c62f8b', + '580f7036-97b8-42b1-a9e6-815058bcac72', + '4c8d767d-335f-4ccd-9459-d0152b2026bc', + '7effd16a-a65c-4d95-b692-652488d94146', + 'd59b3610-6335-4ad8-aca0-24a905de3a25', + '3df68fed-6ca0-4bf9-b2b1-d6be71e18f72', + 'f7b540eb-a266-4379-9611-911ed8e3630e', + '7b3fe468-0011-4ba8-bd53-1ee65acda114', + '48e7a028-1a41-440d-9986-38540f5079c9', + 'd63d07e6-ccc8-4eac-82af-d841627b6c53', + '9c308f66-eed1-4996-b33e-af78fb4944c7', + 'd2621bc7-c45a-4d35-9dc1-5c05fa8326cb', + 'ecba0e68-65ee-4b40-8fbf-a42714b1072b', + '81b96683-7170-49d7-8c4e-e9f34906b3ea'], shared_with=[], _upload_time=timenow, - _uploader=125)) + _uploader='81b96683-7170-49d7-8c4e-e9f34906b3ea')) def publish(self, uids=None): print('Publishing') @@ -321,21 +395,6 @@ class DbUpdater: local_path=os.path.abspath(file_path), name=name).response().result return res.upload_id - def register_upload(self, donelist, plist, pn): - data = [] - for i in plist: - if i in donelist: - self.is_updated_list[i] = True - else: - data.append(self.update_list[i]) - self._write_to_file(data, self.outfile + '_%d' % pn) - - def aggregate_procs(self): - data = [] - for i in range(self.parallel): - data += self._read_from_file(self.outfile + '_%d' % i) - self._write_to_file(data, self.outfile + '_updated') - def download_proc(self, plist, pn): size = 0.0 max_zip_size = config.max_upload_size @@ -349,18 +408,21 @@ class DbUpdater: dirs.append(d) done.append(plist[i]) if size > max_zip_size or i == (len(plist) - 1): - tstamp = datetime.datetime.now().strftime('_%y%m%d%H%M%S%f') - tname = self._to_string(dirs[0].lstrip(self._local_path)) - tname = os.path.join(self._local_path, tname + '%s.tar' % tstamp) - self._tar_files(dirs, tname) - uid = nomad_upload.upload_file(tname, name='AFLOWLIB_%s' % tstamp, offline=True) + if len(dirs) == 0: + continue + tarname, uploadname = self._make_name(dirs) + status, uid = self._get_status_upload(uploadname) + if status == 'published': + continue + if status != 'uploaded': + self._tar_files(dirs, tarname) + uid = nomad_upload.upload_file(tarname, name=uploadname, offline=True) if self.do_publish: self.publish([uid]) self.uids.append(uid) - self.register_upload(done, plist, pn) if self.cleanup: self._cleanup(dirs) - self._cleanup(tname) + self._cleanup(tarname) size = 0.0 dirs = [] @@ -374,13 +436,15 @@ class DbUpdater: continue plist[cur % self.parallel].append(i) cur += 1 - procs = [] - for i in range(self.parallel): - p = threading.Thread(target=self.download_proc, args=(plist[i], i,)) - procs.append(p) - [p.start() for p in procs] - [p.join() for p in procs] - self.aggregate_procs() + if self.parallel > 1: + procs = [] + for i in range(self.parallel): + p = threading.Thread(target=self.download_proc, args=(plist[i], i,)) + procs.append(p) + [p.start() for p in procs] + [p.join() for p in procs] + else: + self.download_proc(plist[0], 0) print('Time for download and upload (s)', time.time() - s) def prep_list(self): @@ -396,11 +460,15 @@ class DbUpdater: self.update_list.append(l[0]) self.is_updated_list.append(l[1]) else: - procs = [] - procs.append(threading.Thread(target=self.get_db_list)) - procs.append(threading.Thread(target=self.get_nomad_list)) - [p.start() for p in procs] - [p.join() for p in procs] + if self.parallel > 1: + procs = [] + procs.append(threading.Thread(target=self.get_db_list)) + procs.append(threading.Thread(target=self.get_nomad_list)) + [p.start() for p in procs] + [p.join() for p in procs] + else: + self.get_db_list() + self.get_nomad_list() self.compare_lists() def update(self):