Skip to content
Snippets Groups Projects
Commit a522e65a authored by Alvin Noe Ladines's avatar Alvin Noe Ladines
Browse files

Improvement on managing uploads during update

parent ff0eba27
No related branches found
No related tags found
2 merge requests!115V0.8.0 beta,!113V0.8.0
Pipeline #68582 passed
...@@ -30,7 +30,7 @@ import nomad_meta_info ...@@ -30,7 +30,7 @@ import nomad_meta_info
from nomad.files import UploadFiles, Restricted from nomad.files import UploadFiles, Restricted
from nomad import utils, search, config from nomad import utils, search, config
from nomad.archive_library.query import ArchiveFileDBs from nomad.archive_library.utils import get_dbs
from .auth import authenticate, create_authorization_predicate from .auth import authenticate, create_authorization_predicate
from .api import api from .api import api
...@@ -312,8 +312,7 @@ class ArchiveQueryResource(Resource): ...@@ -312,8 +312,7 @@ class ArchiveQueryResource(Resource):
upload_id = entry['upload_id'] upload_id = entry['upload_id']
calc_id = entry['calc_id'] calc_id = entry['calc_id']
if msgdbs is None or msgdbs.upload_id != upload_id: if msgdbs is None or msgdbs.upload_id != upload_id:
msgdbs = get_dbs(upload_id)
msgdbs = ArchiveFileDBs(upload_id).get_dbs()
for msgdb in msgdbs: for msgdb in msgdbs:
data.append(msgdb.query({calc_id: qschema})) data.append(msgdb.query({calc_id: qschema}))
......
...@@ -24,13 +24,10 @@ for c in metainfo.calcs: ...@@ -24,13 +24,10 @@ for c in metainfo.calcs:
""" """
import requests import requests
import json import os.path
from nomad.app.api.common import query_api_url from nomad import config
from nomad.archive_library.metainfo import ArchiveMetainfo from nomad.archive_library.metainfo import ArchiveMetainfo
from nomad.archive_library.filedb import ArchiveFileDB
from nomad.files import UploadFiles
from nomad.app.api.auth import create_authorization_predicate
class ArchiveQuery: class ArchiveQuery:
...@@ -71,8 +68,7 @@ class ArchiveQuery: ...@@ -71,8 +68,7 @@ class ArchiveQuery:
in_dict[name] = value in_dict[name] = value
def _api_query(self): def _api_query(self):
url = query_api_url(self._archive_path, self._query_path) url = os.path.join(config.api_url(False), self._archive_path, self._query_path)
data = self._query_params data = self._query_params
if not isinstance(self._archive_schema, list): if not isinstance(self._archive_schema, list):
data['results'] = [self._archive_schema] data['results'] = [self._archive_schema]
...@@ -83,8 +79,7 @@ class ArchiveQuery: ...@@ -83,8 +79,7 @@ class ArchiveQuery:
if self._scroll_id is not None: if self._scroll_id is not None:
self._set_value('scroll_id', self._scroll_id, data) self._set_value('scroll_id', self._scroll_id, data)
response = requests.post( response = requests.post(url, headers=self._authentication, json=data)
url, headers=self._authentication, content_type='application/json', data=json.dumps(data))
if response.status_code != 200: if response.status_code != 200:
raise Exception('Query returned %s' % response.status_code) raise Exception('Query returned %s' % response.status_code)
...@@ -117,19 +112,3 @@ class ArchiveQuery: ...@@ -117,19 +112,3 @@ class ArchiveQuery:
if self._archive_data: if self._archive_data:
metainfo = ArchiveMetainfo(archive_data=self._archive_data, archive_schema=self._archive_schema) metainfo = ArchiveMetainfo(archive_data=self._archive_data, archive_schema=self._archive_schema)
return metainfo return metainfo
class ArchiveFileDBs:
def __init__(self, upload_id):
self.upload_id = upload_id
def get_dbs(self):
upload_files = UploadFiles.get(
self.upload_id, create_authorization_predicate(self.upload_id))
if upload_files is None:
return []
files = upload_files.archive_file_msg('X')
msgdbs = [ArchiveFileDB(f) for f in files if f is not None]
return msgdbs
from nomad.files import UploadFiles
from nomad.app.api.auth import create_authorization_predicate
from nomad.archive_library.filedb import ArchiveFileDB
def get_dbs(upload_id):
upload_files = UploadFiles.get(upload_id, create_authorization_predicate(upload_id))
if upload_files is None:
return []
files = upload_files.archive_file_msg('X')
msgdbs = [ArchiveFileDB(f) for f in files if f is not None]
return msgdbs
# Copyright 2019 Alvin Noe Ladines, Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Automatically synchronizes nomad it with a given database. It creates a list of paths
to mainfiles in nomad and compares it with paths in the external database. The missing
paths in nomad will then be downloaded from the external database and subsequently
uploaded to nomad. The downloaded files are by default saved in '/nomad/fairdi/external'.
"""
import requests import requests
import re import re
import subprocess import subprocess
from urllib.parse import urlparse from urllib.parse import urlparse
import os import os
from .client import client
from nomad import config
from nomad.cli.client import upload as nomad_upload
import datetime import datetime
import click import click
import tarfile import tarfile
import threading import threading
import time import time
from .client import client
from nomad import config
from nomad.cli.client import upload as nomad_upload
class DbUpdater: class DbUpdater:
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
...@@ -109,7 +131,8 @@ class DbUpdater: ...@@ -109,7 +131,8 @@ class DbUpdater:
return ok return ok
def _to_string(self, path): def _to_string(self, path):
return path.strip('/').replace('/', '_').replace(':', '_') path = path.strip('/')
return re.sub(r'[^\w\d-]', '_', path)
def _read_from_file(self, filename): def _read_from_file(self, filename):
print('Reading from file %s' % filename) print('Reading from file %s' % filename)
...@@ -168,7 +191,7 @@ class DbUpdater: ...@@ -168,7 +191,7 @@ class DbUpdater:
def _gen_nomad_list(self): def _gen_nomad_list(self):
print('Generating NOMAD list') print('Generating NOMAD list')
if self.db_name.lower() == 'aflowlib': if self.db_name.lower() == 'aflowlib':
servers = ['LIB1_LIB', 'LIB2_LIB', 'LIB3_LIB'] servers = ['LIB%d_LIB' % n for n in range(1, 10)]
paths = [] paths = []
for s in servers: for s in servers:
if s in self.root_url: if s in self.root_url:
...@@ -247,20 +270,29 @@ class DbUpdater: ...@@ -247,20 +270,29 @@ class DbUpdater:
size += os.path.getsize(fn) size += os.path.getsize(fn)
return size return size
def _download_size(self, dirname):
complete = False
files = os.listdir(dirname)
if self.db_name.lower() == 'aflowlib':
if 'vasprun.xml.relax2.xz' in files:
complete = True
else:
complete = True
size = 0.0
if not complete:
self._cleanup([os.path.join(dirname, f) for f in files])
else:
for f in files:
size += os.path.getsize(os.path.join(dirname, f))
return size
def get_files(self, path): def get_files(self, path):
dirname = urlparse(path).path dirname = urlparse(path).path
dirname = self._to_string(dirname) dirname = self._to_string(dirname)
dirname = os.path.join(self._local_path, dirname) dirname = os.path.join(self._local_path, dirname)
if os.path.isdir(dirname): if os.path.isdir(dirname):
size = 0.0 size = self._download_size(dirname)
files = os.listdir(dirname) if size == 0.0:
complete = False
for f in files:
if 'vasprun' in f:
complete = True
size += os.path.getsize(os.path.join(dirname, f))
if not complete:
self._cleanup([os.path.join(dirname, f) for f in files])
size = self._download(path, dirname) size = self._download(path, dirname)
else: else:
os.mkdir(dirname) os.mkdir(dirname)
...@@ -268,11 +300,27 @@ class DbUpdater: ...@@ -268,11 +300,27 @@ class DbUpdater:
return dirname, size return dirname, size
def _tar_files(self, dirs, tarname): def _tar_files(self, dirs, tarname):
with tarfile.open(tarname, 'w') as f: if os.path.isfile(tarname):
return
try:
f = tarfile.open(tarname, 'w')
for d in dirs: for d in dirs:
files = os.listdir(d) files = os.listdir(d)
for fn in files: for fn in files:
f.add(os.path.join(d, fn)) f.add(os.path.join(d, fn))
f.close()
except Exception as e:
os.remove(tarname)
print('Error writing tar file %s. %s' % (tarname, e))
def _make_name(self, dirs):
# name will be first and last entries
d1 = self._to_string(dirs[0].lstrip(self._local_path))
d2 = self._to_string(dirs[-1].lstrip(self._local_path))
tarname = '%s-%s' % (d1, d2)
uploadname = 'AFLOWLIB_%s' % tarname
tarname = os.path.join(self._local_path, tarname + '.tar')
return tarname, uploadname
def _cleanup(self, ilist): def _cleanup(self, ilist):
if isinstance(ilist, str): if isinstance(ilist, str):
...@@ -288,6 +336,20 @@ class DbUpdater: ...@@ -288,6 +336,20 @@ class DbUpdater:
return False return False
return True return True
def _get_status_upload(self, uploadname):
res = self.client.uploads.get_uploads(name=uploadname, state='all').response().result
entries = res.results
status = None
upload_id = None
for entry in entries:
if entry['name'] == uploadname:
status = 'uploaded'
if entry['published']:
status = 'published'
upload_id = entry['upload_id']
break
return status, upload_id
def get_payload(self, uid): def get_payload(self, uid):
timenow = datetime.datetime.utcnow() timenow = datetime.datetime.utcnow()
if self.db_name == 'aflowlib': if self.db_name == 'aflowlib':
...@@ -301,11 +363,23 @@ class DbUpdater: ...@@ -301,11 +363,23 @@ class DbUpdater:
'http://aflowlib.org', 'http://aflowlib.org',
'http://www.sciencedirect.com/science/article/pii/S0927025612000687'], 'http://www.sciencedirect.com/science/article/pii/S0927025612000687'],
coauthors=[ coauthors=[
148, 149, 150, 151, 152, 146, 145, 138, 137, 'f409d859-2639-4f82-b198-85e1c7c62f8b',
136, 135, 134, 147, 125], '580f7036-97b8-42b1-a9e6-815058bcac72',
'4c8d767d-335f-4ccd-9459-d0152b2026bc',
'7effd16a-a65c-4d95-b692-652488d94146',
'd59b3610-6335-4ad8-aca0-24a905de3a25',
'3df68fed-6ca0-4bf9-b2b1-d6be71e18f72',
'f7b540eb-a266-4379-9611-911ed8e3630e',
'7b3fe468-0011-4ba8-bd53-1ee65acda114',
'48e7a028-1a41-440d-9986-38540f5079c9',
'd63d07e6-ccc8-4eac-82af-d841627b6c53',
'9c308f66-eed1-4996-b33e-af78fb4944c7',
'd2621bc7-c45a-4d35-9dc1-5c05fa8326cb',
'ecba0e68-65ee-4b40-8fbf-a42714b1072b',
'81b96683-7170-49d7-8c4e-e9f34906b3ea'],
shared_with=[], shared_with=[],
_upload_time=timenow, _upload_time=timenow,
_uploader=125)) _uploader='81b96683-7170-49d7-8c4e-e9f34906b3ea'))
def publish(self, uids=None): def publish(self, uids=None):
print('Publishing') print('Publishing')
...@@ -321,21 +395,6 @@ class DbUpdater: ...@@ -321,21 +395,6 @@ class DbUpdater:
local_path=os.path.abspath(file_path), name=name).response().result local_path=os.path.abspath(file_path), name=name).response().result
return res.upload_id return res.upload_id
def register_upload(self, donelist, plist, pn):
data = []
for i in plist:
if i in donelist:
self.is_updated_list[i] = True
else:
data.append(self.update_list[i])
self._write_to_file(data, self.outfile + '_%d' % pn)
def aggregate_procs(self):
data = []
for i in range(self.parallel):
data += self._read_from_file(self.outfile + '_%d' % i)
self._write_to_file(data, self.outfile + '_updated')
def download_proc(self, plist, pn): def download_proc(self, plist, pn):
size = 0.0 size = 0.0
max_zip_size = config.max_upload_size max_zip_size = config.max_upload_size
...@@ -349,18 +408,19 @@ class DbUpdater: ...@@ -349,18 +408,19 @@ class DbUpdater:
dirs.append(d) dirs.append(d)
done.append(plist[i]) done.append(plist[i])
if size > max_zip_size or i == (len(plist) - 1): if size > max_zip_size or i == (len(plist) - 1):
tstamp = datetime.datetime.now().strftime('_%y%m%d%H%M%S%f') tarname, uploadname = self._make_name(dirs)
tname = self._to_string(dirs[0].lstrip(self._local_path)) status, uid = self._get_status_upload(uploadname)
tname = os.path.join(self._local_path, tname + '%s.tar' % tstamp) if status == 'published':
self._tar_files(dirs, tname) continue
uid = nomad_upload.upload_file(tname, name='AFLOWLIB_%s' % tstamp, offline=True) if status != 'uploaded':
self._tar_files(dirs, tarname)
uid = nomad_upload.upload_file(tarname, name=uploadname, offline=True)
if self.do_publish: if self.do_publish:
self.publish([uid]) self.publish([uid])
self.uids.append(uid) self.uids.append(uid)
self.register_upload(done, plist, pn)
if self.cleanup: if self.cleanup:
self._cleanup(dirs) self._cleanup(dirs)
self._cleanup(tname) self._cleanup(tarname)
size = 0.0 size = 0.0
dirs = [] dirs = []
...@@ -374,13 +434,15 @@ class DbUpdater: ...@@ -374,13 +434,15 @@ class DbUpdater:
continue continue
plist[cur % self.parallel].append(i) plist[cur % self.parallel].append(i)
cur += 1 cur += 1
if self.parallel > 1:
procs = [] procs = []
for i in range(self.parallel): for i in range(self.parallel):
p = threading.Thread(target=self.download_proc, args=(plist[i], i,)) p = threading.Thread(target=self.download_proc, args=(plist[i], i,))
procs.append(p) procs.append(p)
[p.start() for p in procs] [p.start() for p in procs]
[p.join() for p in procs] [p.join() for p in procs]
self.aggregate_procs() else:
self.download_proc(plist[0], 0)
print('Time for download and upload (s)', time.time() - s) print('Time for download and upload (s)', time.time() - s)
def prep_list(self): def prep_list(self):
...@@ -396,11 +458,15 @@ class DbUpdater: ...@@ -396,11 +458,15 @@ class DbUpdater:
self.update_list.append(l[0]) self.update_list.append(l[0])
self.is_updated_list.append(l[1]) self.is_updated_list.append(l[1])
else: else:
if self.parallel > 1:
procs = [] procs = []
procs.append(threading.Thread(target=self.get_db_list)) procs.append(threading.Thread(target=self.get_db_list))
procs.append(threading.Thread(target=self.get_nomad_list)) procs.append(threading.Thread(target=self.get_nomad_list))
[p.start() for p in procs] [p.start() for p in procs]
[p.join() for p in procs] [p.join() for p in procs]
else:
self.get_db_list()
self.get_nomad_list()
self.compare_lists() self.compare_lists()
def update(self): def update(self):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment