Commit 83b65f15 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Merge branch 'v0.6.4' into 'master'

V0.6.4

See merge request !69
parents 5bc2efaf 57bea61f
Pipeline #65657 failed with stages
in 15 minutes and 59 seconds
Subproject commit c36949f0c8421fff340d314d16ee83f7da5974ac
Subproject commit 4157d084e43ec08ad2f4975a276e89fa51dcd535
Subproject commit c360a381926d670575079c3a0412c14bc6b4c8ac
Subproject commit b39569c5fa69254c90f91ec430d28a0941efbe95
Subproject commit 7d850bd76e878b1429d6974f5daac456099f6e4f
Subproject commit d9c9b3c14ecab80e58adab70917267e5e7fbe3f2
Subproject commit aee4be7407124f87b0ba99eb7b4af3646b8602e9
Subproject commit c161aa0f8649f8431cf3ad6e0afc509000460161
example-1.tar.gz
\ No newline at end of file
example-1.tar.gz
\ No newline at end of file
......@@ -16,6 +16,7 @@ import tarfile
import io
import zipfile
import zipstream
import uuid
# config
nomad_url = 'http://labdev-nomad.esc.rzg.mpg.de/fairdi/nomad/mp/api'
......@@ -126,7 +127,7 @@ def upload_next_data(sources: Iterator[Tuple[str, str, str]], upload_name='next
yield chunk
# stream .zip to nomad
response = requests.put(url=url, headers={'X-Token': token}, data=content())
response = requests.put(url=url, headers={'X-Token': token, 'Content-type': 'application/octet-stream'}, data=content())
if response.status_code != 200:
raise Exception('nomad return status %d' % response.status_code)
......
from nomad import infrastructure, files, processing as proc
infrastructure.setup_logging()
infrastructure.setup_mongo()
upload_id = 'NvVyk3gATxCJW6dWS4cRWw'
upload = proc.Upload.get(upload_id)
upload_with_metadata = upload.to_upload_with_metadata()
upload_files = files.PublicUploadFiles(upload_id)
upload_files.repack(upload_with_metadata)
# try:
# public_upload_files = files.PublicUploadFiles(upload_id)
# public_upload_files.delete()
# except Exception:
# pass
# staging_upload_files = files.StagingUploadFiles(upload_id)
# staging_upload_files.pack(upload_with_metadata)
import sys
from nomad import utils
from nomad.cli.parse import parse
utils.configure_logging()
parse(sys.argv[1], '.')
......@@ -110,7 +110,10 @@ class DomainProviderBase extends React.Component {
mainfile: {},
calc_hash: {},
formula: {},
optimade: {}
optimade: {},
quantities: {},
spacegroup: {},
specegroup_symbol: {}
},
/**
* An dict where each object represents a column. Possible keys are label, render.
......
......@@ -84,8 +84,8 @@ class MirrorUploadResource(Resource):
except KeyError:
abort(404, message='Upload with id %s does not exist.' % upload_id)
if not upload.published:
abort(400, message='Only published uploads can be exported')
if upload.tasks_running or upload.process_running:
abort(400, message='Only non processing uploads can be exported')
return {
'upload_id': upload.upload_id,
......
......@@ -127,7 +127,7 @@ class Calculation(Resource):
), 200
@ns.route('/info/calculation')
@ns.route('/info/calculations')
class CalculationInfo(Resource):
@api.doc('calculations_info')
@api.response(400, 'Invalid requests, e.g. bad parameter.')
......
......@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List
from typing import List, Callable
import click
from tabulate import tabulate
from mongoengine import Q
......@@ -216,11 +216,8 @@ def rm(ctx, uploads, with_coe_repo, skip_es, skip_mongo, skip_files):
upload.delete()
@uploads.command(help='Reprocess selected uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
@click.pass_context
def re_process(ctx, uploads, parallel: int):
def __run_processing(
ctx, uploads, parallel: int, process: Callable[[proc.Upload], None], label: str):
_, uploads = query_uploads(ctx, uploads)
uploads_count = uploads.count()
uploads = list(uploads) # copy the whole mongo query set to avoid cursor timeouts
......@@ -235,29 +232,29 @@ def re_process(ctx, uploads, parallel: int):
logger = utils.get_logger(__name__)
print('%d uploads selected, re-processing ...' % uploads_count)
print('%d uploads selected, %s ...' % (uploads_count, label))
def re_process_upload(upload: proc.Upload):
logger.info('re-processing started', upload_id=upload.upload_id)
def process_upload(upload: proc.Upload):
logger.info('%s started' % label, upload_id=upload.upload_id)
completed = False
if upload.process_running:
logger.warn(
'cannot trigger re-process, since the upload is already/still processing',
'cannot trigger %s, since the upload is already/still processing' % label,
current_process=upload.current_process,
current_task=upload.current_task, upload_id=upload.upload_id)
else:
upload.reset()
upload.re_process_upload()
process(upload)
upload.block_until_complete(interval=.5)
if upload.tasks_status == proc.FAILURE:
logger.info('re-processing with failure', upload_id=upload.upload_id)
logger.info('%s with failure' % label, upload_id=upload.upload_id)
completed = True
logger.info('re-processing complete', upload_id=upload.upload_id)
logger.info('%s complete' % label, upload_id=upload.upload_id)
with cv:
state['completed_count'] += 1 if completed else 0
......@@ -265,8 +262,8 @@ def re_process(ctx, uploads, parallel: int):
state['available_threads_count'] += 1
print(
' re-processed %s and skipped %s of %s uploads' %
(state['completed_count'], state['skipped_count'], uploads_count))
' %s %s and skipped %s of %s uploads' %
(label, state['completed_count'], state['skipped_count'], uploads_count))
cv.notify()
......@@ -274,7 +271,7 @@ def re_process(ctx, uploads, parallel: int):
with cv:
cv.wait_for(lambda: state['available_threads_count'] > 0)
state['available_threads_count'] -= 1
thread = threading.Thread(target=lambda: re_process_upload(upload))
thread = threading.Thread(target=lambda: process_upload(upload))
threads.append(thread)
thread.start()
......@@ -282,6 +279,22 @@ def re_process(ctx, uploads, parallel: int):
thread.join()
@uploads.command(help='Reprocess selected uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
@click.pass_context
def re_process(ctx, uploads, parallel: int):
__run_processing(ctx, uploads, parallel, lambda upload: upload.re_process_upload(), 're-processing')
@uploads.command(help='Repack selected uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
@click.pass_context
def re_pack(ctx, uploads, parallel: int):
__run_processing(ctx, uploads, parallel, lambda upload: upload.re_pack(), 're-packing')
@uploads.command(help='Attempt to abort the processing of uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--calcs', is_flag=True, help='Only stop calculation processing.')
......
......@@ -13,6 +13,6 @@
# limitations under the License.
from . import local, migration, upload, integrationtests, mirror, statistics
from . import local, migration, upload, integrationtests, mirror, statistics, update_database
from .client import create_client
from .upload import stream_upload_with_client
......@@ -24,7 +24,8 @@ import queue
import json
from nomad import config, infrastructure
from nomad.migration import NomadCOEMigration, SourceCalc, Package, missing_calcs_data
from nomad.migration import NomadCOEMigration, SourceCalc, Package, missing_calcs_data, \
update_user_metadata as migration_update_user_metadata
from .client import client
......@@ -311,3 +312,11 @@ def missing(start_pid, uploads):
for source_upload_id in uploads:
print(source_upload_id)
@migration.command(help='Updates the user metadata with data from the source calc index.')
@click.option('--update-index', is_flag=True, help='Also update the elastic index')
@click.option('--bulk-size', default=1000, help='Size of the bulk to update with one db query')
def update_user_metadata(update_index, bulk_size):
infrastructure.setup()
migration_update_user_metadata(bulk_size=bulk_size, update_index=update_index)
import requests
import re
import subprocess
from urllib.parse import urlparse
import os
from .client import client
from nomad import config
from nomad.cli.client import upload as nomad_upload
import datetime
import click
import tarfile
import threading
import time
class DbUpdater:
def __init__(self, *args, **kwargs):
self.db_name = 'aflowlib'
self.root_url = 'http://aflowlib.duke.edu/AFLOWDATA/LIB1_LIB'
self.local_path = '/nomad/fairdi/external'
self.dbfile = None
self.nomadfile = None
self.outfile = None
self.do_download = False
self.do_upload = False
self.do_publish = False
self.cleanup = False
self.parallel = 2
self.uids = []
self._set(**kwargs)
self._set_db()
self._set_local_path()
self.configure_client()
def _set(self, **kwargs):
for key, val in kwargs.items():
key = key.lower()
if hasattr(self, key):
setattr(self, key, val)
else:
raise KeyError('Invalid key %s' % key)
def configure_client(self, **kwargs):
from nomad.cli.client import create_client
self.client = create_client()
def _set_local_path(self):
subdir = ''
if self.db_name.lower() == 'aflowlib':
subdir = self.root_url.strip('/').split('/')[-1]
dbpath = os.path.join(self.local_path, self.db_name)
self._local_path = os.path.join(dbpath, subdir)
if not os.path.isdir(dbpath):
os.mkdir(dbpath)
if not os.path.isdir(self._local_path):
os.mkdir(self._local_path)
def _set_db(self):
if self.db_name.lower() == 'aflowlib':
self.max_depth = 4
else:
raise NotImplementedError('%s not yet supported.' % self.db_name)
def _open_page(self, path):
with requests.Session() as session:
response = session.get(path, verify=False)
if not response.ok:
return response.raise_for_status()
return response
def get_paths(self, root):
response = self._open_page(root)
paths = []
for url in re.findall('<a href="([^"]+)">', str(response.content)):
if url in response.url:
continue
paths.append(os.path.join(response.url, url.lstrip('./')))
return paths
def _filter_files(self, files):
new = []
for f in files:
if self._is_mainfile(f):
new.append(f)
return new
def _is_mainfile(self, path):
if 'vasprun.xml' in path:
return True
return False
def _is_dir(self, path):
path = path.strip()
if path[-1] == '/' and self.root_url in path:
return True
return False
def _depth(self, path):
return urlparse(path).path.rstrip('/').count('/')
def _rules_ok(self, path):
ok = False
if self._is_mainfile(path):
ok = True
if self._depth(path) >= self.max_depth:
ok = True
if '?' in path:
ok = False
return ok
def _to_string(self, path):
return path.strip('/').replace('/', '_').replace(':', '_')
def _read_from_file(self, filename):
print('Reading from file %s' % filename)
with open(filename) as f:
data = f.readlines()
data = [s.strip().split() for s in data]
for i in range(len(data)):
for j in range(len(data[i])):
data[i][j] = data[i][j].strip()
if data[i][j] in ['T', 'True', '1', 'Y']:
data[i][j] = True
elif data[i][j] in ['F', 'False', '0', 'N']:
data[i][j] = False
if len(data[i]) == 1:
data[i] = data[i][0]
return data
def _write_to_file(self, data, filename):
with open(filename, 'w') as f:
for i in range(len(data)):
if isinstance(data[i], str):
f.write('%s\n' % data[i])
else:
for j in range(len(data[i])):
e = data[i][j]
if e is True:
e = 'T'
elif e is False:
e = 'F'
else:
e = str(e)
f.write('%s ' % e)
f.write('\n')
def _gen_db_list(self):
print('Generating list from %s' % self.root_url)
self.db_files = []
todo = self.get_paths(self.root_url)
while len(todo) > 0:
cur = todo[-1]
if self._rules_ok(cur):
self.db_files.append(cur)
elif self._is_dir(cur):
add = self.get_paths(cur)
todo = add + todo
todo.pop(-1)
if self.dbfile is not None:
self._write_to_file(self.db_files, self.dbfile)
def get_db_list(self):
if self.dbfile is not None and os.path.isfile(self.dbfile):
self.db_files = self._read_from_file(self.dbfile)
else:
self._gen_db_list()
def _gen_nomad_list(self):
print('Generating NOMAD list')
if self.db_name.lower() == 'aflowlib':
servers = ['LIB1_LIB', 'LIB2_LIB', 'LIB3_LIB']
paths = []
for s in servers:
if s in self.root_url:
paths.append(s)
if len(paths) == 0:
paths = servers
kwargs = dict(authors=['Curtarolo, Stefano'], paths=paths, scroll=True)
self.nomad_files = []
while True:
res = self.client.repo.search(**kwargs).response()
results = res.result.results
if len(results) == 0:
break
for i in range(len(results)):
self.nomad_files.append(results[i]['mainfile'])
scroll_id = res.result.scroll.scroll_id
kwargs.update({'scroll_id': scroll_id})
if self.nomadfile is not None:
for i in range(len(self.nomad_files)):
print(self.nomad_files[i])
self._write_to_file(self.nomad_files, self.nomadfile)
def get_nomad_list(self):
if self.nomadfile is not None and os.path.isfile(self.nomadfile):
self.nomad_files = self._read_from_file(self.nomadfile)
else:
self._gen_nomad_list()
def _reduce_list(self, ilist):
olist = []
for e in ilist:
p = urlparse(e).path.strip('/')
olist.append(os.path.join(*p.split('/')[1:self.max_depth]))
olist = list(set(olist))
olist.sort()
return olist
def compare_lists(self):
print('Identifying differences')
db = self._reduce_list(self.db_files)
nomad = self._reduce_list(self.nomad_files)
ns = set(nomad)
self.update_list = [i for i in db if i not in ns]
ds = set(db)
self.in_nomad = [i for i in nomad if i not in ds]
if len(self.in_nomad) > 0:
fn = 'in_nomad.txt'
print('Warning: Some NOMAD entries not found in db.')
print('See %s for list.' % fn)
self._write_to_file(self.in_nomad, fn)
# add the root back
u = urlparse(self.root_url)
up = u.path.strip('/').split('/')[0]
root = '%s://%s/%s' % (u.scheme, u.netloc, up)
self.update_list = [os.path.join(root, e) for e in self.update_list]
self.is_updated_list = [False] * len(self.update_list)
print('Found %d entries to be added in NOMAD' % len(self.update_list))
if self.outfile is not None:
data = [self.update_list[i] for i in range(len(self.update_list))]
self._write_to_file(data, self.outfile)
def _download(self, path, iodir):
files = self.get_paths(path)
files = [f for f in files if self._rules_ok(f)]
size = 0.0
with requests.Session() as session:
for f in files:
res = session.get(f, stream=True)
fn = res.url.split('/')[-1]
fn = os.path.join(iodir, fn)
with open(fn, 'wb') as fb:
fb.write(res.content)
size += os.path.getsize(fn)
return size
def get_files(self, path):
dirname = urlparse(path).path
dirname = self._to_string(dirname)
dirname = os.path.join(self._local_path, dirname)
if os.path.isdir(dirname):
size = 0.0
files = os.listdir(dirname)
complete = False
for f in files:
if 'vasprun' in f:
complete = True
size += os.path.getsize(os.path.join(dirname, f))
if not complete:
self._cleanup([os.path.join(dirname, f) for f in files])
size = self._download(path, dirname)
else:
os.mkdir(dirname)
size = self._download(path, dirname)
return dirname, size
def _tar_files(self, dirs, tarname):
with tarfile.open(tarname, 'w') as f:
for d in dirs:
files = os.listdir(d)
for fn in files:
f.add(os.path.join(d, fn))
def _cleanup(self, ilist):
if isinstance(ilist, str):
ilist = [ilist]
for name in ilist:
subprocess.Popen(['rm', '-rf', name])
def _is_done_upload(self, uid):
res = self.client.uploads.get_upload(upload_id=uid).response().result
Nproc = res.processed_calcs
Ncalc = res.total_calcs
if Nproc != Ncalc:
return False
return True
def get_payload(self, uid):
timenow = datetime.datetime.utcnow()
if self.db_name == 'aflowlib':
return dict(
operation='publish',
metadata=dict(
with_embargo=False,
comment='',
references=[
'http://www.sciencedirect.com/science/article/pii/S0927025614003322',
'http://aflowlib.org',
'http://www.sciencedirect.com/science/article/pii/S0927025612000687'],
coauthors=[
148, 149, 150, 151, 152, 146, 145, 138, 137,
136, 135, 134, 147, 125],
shared_with=[],
_upload_time=timenow,
_uploader=125))
def publish(self, uids=None):
print('Publishing')
if uids is None:
uids = self.uids
for uid in uids:
if self._is_done_upload(uid):
payload = self.get_payload(uid)
self.client.uploads.exec_upload_operation(upload_id=uid, payload=payload).response()
def upload(self, file_path, name):
res = self.client.uploads.upload(
local_path=os.path.abspath(file_path), name=name).response().result
return res.upload_id
def register_upload(self, donelist, plist, pn):
data = []
for i in plist:
if i in donelist:
self.is_updated_list[i] = True
else:
data.append(self.update_list[i])
self._write_to_file(data, self.outfile + '_%d' % pn)
def aggregate_procs(self):
data = []
for i in range(self.parallel):
data += self._read_from_file(self.outfile + '_%d' % i)
self._write_to_file(data, self.outfile + '_updated')
def download_proc(self, plist, pn):
size = 0.0
max_zip_size = config.max_upload_size
dirs = []
done = []
for i in range(len(plist)):
d, s = self.get_files(self.update_list[plist[i]])
if not self.do_upload:
continue
size += s