Commit f3ee41d2 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Merge branch 'v0.7.5' into 'master'

V0.7.5

See merge request !89
parents 4e198668 d1b24bd0
Pipeline #69075 failed with stages
in 14 minutes and 40 seconds
......@@ -31,6 +31,8 @@ Omitted versions are plain bugfix releases with only minor changes and fixes.
### v0.7.5
- AFLOWLIB prototypes (archive)
- primitive label search
- improved search performance based on excluded fields
- improved logs
- minor bugfixes
......
......@@ -207,6 +207,8 @@ class Api {
this.onStartLoading = () => null
this.onFinishLoading = () => null
this.statistics = {}
this._swaggerClient = Swagger(`${apiBase}/swagger.json`)
this.keycloak = keycloak
......@@ -384,9 +386,34 @@ class Api {
async search(search) {
this.onStartLoading()
return this.swagger()
.then(client => client.apis.repo.search(search))
.then(client => client.apis.repo.search({
exclude: ['atoms', 'only_atoms', 'files', 'quantities', 'optimade', 'labels', 'geometries'],
...search}))
.catch(handleApiError)
.then(response => response.body)
.then(response => {
// fill absent statistics values with values from prior searches
// this helps to keep consistent values, e.g. in the metadata search view
if (response.statistics) {
const empty = {}
Object.keys(response.statistics.total.all).forEach(metric => empty[metric] = 0)
Object.keys(response.statistics)
.filter(key => !['total', 'authors', 'atoms'].includes(key))
.forEach(key => {
if (!this.statistics[key]) {
this.statistics[key] = new Set()
}
const values = this.statistics[key]
Object.keys(response.statistics[key]).forEach(value => values.add(value))
values.forEach(value => {
if (!response.statistics[key][value]) {
response.statistics[key][value] = empty
}
})
})
}
return response
})
.finally(this.onFinishLoading)
}
......
......@@ -113,8 +113,10 @@ class DomainProviderBase extends React.Component {
optimade: {},
quantities: {},
spacegroup: {},
specegroup_symbol: {},
labels: {}
spacegroup_symbol: {},
labels: {},
raw_id: {},
upload_name: {}
},
/**
* An dict where each object represents a column. Possible keys are label, render.
......
......@@ -177,7 +177,7 @@ class RawFiles extends React.Component {
}
filterPotcar(file) {
if (file.toLowerCase().endsWith('potcar')) {
if (file.includes('POTCAR') && !file.endsWith('.stripped')) {
return this.props.user && this.props.data.uploader.user_id === this.props.user.sub
} else {
return true
......
......@@ -147,6 +147,7 @@ class ArchiveDownloadResource(Resource):
search_request = search.SearchRequest()
apply_search_parameters(search_request, args)
search_request.include('calc_id', 'upload_id', 'mainfile')
calcs = search_request.execute_scan(
order_by='upload_id',
......@@ -273,6 +274,7 @@ class ArchiveQueryResource(Resource):
search_request = search.SearchRequest()
apply_search_parameters(search_request, args)
search_request.include('calc_id', 'upload_id', 'mainfile')
try:
if scroll:
......
......@@ -421,6 +421,7 @@ class RawFileQueryResource(Resource):
search_request = search.SearchRequest()
apply_search_parameters(search_request, _raw_file_from_query_parser.parse_args())
search_request.include('calc_id', 'upload_id', 'mainfile')
def path(entry):
return '%s/%s' % (entry['upload_id'], entry['mainfile'])
......
......@@ -84,6 +84,8 @@ _search_request_parser.add_argument(
'Possible values are %s.' % ', '.join(datamodel.Domain.instance.metrics_names)))
_search_request_parser.add_argument(
'statistics', type=bool, help=('Return statistics.'))
_search_request_parser.add_argument(
'exclude', type=str, action='split', help='Excludes the given keys in the returned data.')
for group_name in search.groups:
_search_request_parser.add_argument(
group_name, type=bool, help=('Return %s group data.' % group_name))
......@@ -150,8 +152,9 @@ class RepoCalcsResource(Resource):
"""
try:
parsed_args = _search_request_parser.parse_args()
args = {
key: value for key, value in _search_request_parser.parse_args().items()
key: value for key, value in parsed_args.items()
if value is not None}
scroll = args.get('scroll', False)
......@@ -202,6 +205,11 @@ class RepoCalcsResource(Resource):
elif len(metrics) > 0:
search_request.totals(metrics_to_use=metrics)
if 'exclude' in parsed_args:
excludes = parsed_args['exclude']
if excludes is not None:
search_request.exclude(*excludes)
try:
if scroll:
results = search_request.execute_scrolled(scroll_id=scroll_id, size=per_page)
......@@ -297,7 +305,7 @@ _repo_edit_model = api.model('RepoEdit', {
def edit(parsed_query: Dict[str, Any], mongo_update: Dict[str, Any] = None, re_index=True) -> List[str]:
# get all calculations that have to change
with utils.timer(common.logger, 'edit query executed'):
search_request = search.SearchRequest()
search_request = search.SearchRequest().include('calc_id', 'upload_id')
apply_search_parameters(search_request, parsed_query)
upload_ids = set()
calc_ids = []
......@@ -689,7 +697,7 @@ class RepoPidResource(Resource):
except ValueError:
abort(400, 'Wrong PID format')
search_request = search.SearchRequest()
search_request = search.SearchRequest().include('upload_id', 'calc_id')
if g.user is not None:
search_request.owner('all', user_id=g.user.user_id)
......
......@@ -65,7 +65,7 @@ class CalculationList(Resource):
except Exception:
abort(400, message='bad parameter types') # TODO Specific json API error handling
search_request = base_search_request()
search_request = base_search_request().include('calc_id')
if filter is not None:
try:
......
# Copyright 2019 Alvin Noe Ladines, Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Automatically synchronizes nomad it with a given database. It creates a list of paths
to mainfiles in nomad and compares it with paths in the external database. The missing
paths in nomad will then be downloaded from the external database and subsequently
uploaded to nomad. The downloaded files are by default saved in '/nomad/fairdi/external'.
"""
import requests
import re
import subprocess
from urllib.parse import urlparse
import os
from .client import client
from nomad import config
from nomad.cli.client import upload as nomad_upload
import datetime
import click
import tarfile
import threading
import time
from .client import client
from nomad import config
from nomad.cli.client import upload as nomad_upload
class DbUpdater:
def __init__(self, *args, **kwargs):
......@@ -109,7 +131,8 @@ class DbUpdater:
return ok
def _to_string(self, path):
return path.strip('/').replace('/', '_').replace(':', '_')
path = path.strip('/')
return re.sub(r'[^\w\d-]', '_', path)
def _read_from_file(self, filename):
print('Reading from file %s' % filename)
......@@ -168,7 +191,7 @@ class DbUpdater:
def _gen_nomad_list(self):
print('Generating NOMAD list')
if self.db_name.lower() == 'aflowlib':
servers = ['LIB1_LIB', 'LIB2_LIB', 'LIB3_LIB']
servers = ['LIB%d_LIB' % n for n in range(1, 10)]
paths = []
for s in servers:
if s in self.root_url:
......@@ -247,20 +270,29 @@ class DbUpdater:
size += os.path.getsize(fn)
return size
def _download_size(self, dirname):
complete = False
files = os.listdir(dirname)
if self.db_name.lower() == 'aflowlib':
if 'vasprun.xml.relax2.xz' in files:
complete = True
else:
complete = True
size = 0.0
if not complete:
self._cleanup([os.path.join(dirname, f) for f in files])
else:
for f in files:
size += os.path.getsize(os.path.join(dirname, f))
return size
def get_files(self, path):
dirname = urlparse(path).path
dirname = self._to_string(dirname)
dirname = os.path.join(self._local_path, dirname)
if os.path.isdir(dirname):
size = 0.0
files = os.listdir(dirname)
complete = False
for f in files:
if 'vasprun' in f:
complete = True
size += os.path.getsize(os.path.join(dirname, f))
if not complete:
self._cleanup([os.path.join(dirname, f) for f in files])
size = self._download_size(dirname)
if size == 0.0:
size = self._download(path, dirname)
else:
os.mkdir(dirname)
......@@ -268,11 +300,27 @@ class DbUpdater:
return dirname, size
def _tar_files(self, dirs, tarname):
with tarfile.open(tarname, 'w') as f:
if os.path.isfile(tarname):
return
try:
f = tarfile.open(tarname, 'w')
for d in dirs:
files = os.listdir(d)
for fn in files:
f.add(os.path.join(d, fn))
f.close()
except Exception as e:
os.remove(tarname)
print('Error writing tar file %s. %s' % (tarname, e))
def _make_name(self, dirs):
# name will be first and last entries
d1 = self._to_string(dirs[0].lstrip(self._local_path))
d2 = self._to_string(dirs[-1].lstrip(self._local_path))
tarname = '%s-%s' % (d1, d2)
uploadname = 'AFLOWLIB_%s' % tarname
tarname = os.path.join(self._local_path, tarname + '.tar')
return tarname, uploadname
def _cleanup(self, ilist):
if isinstance(ilist, str):
......@@ -288,6 +336,20 @@ class DbUpdater:
return False
return True
def _get_status_upload(self, uploadname):
res = self.client.uploads.get_uploads(name=uploadname, state='all').response().result
entries = res.results
status = None
upload_id = None
for entry in entries:
if entry['name'] == uploadname:
status = 'uploaded'
if entry['published']:
status = 'published'
upload_id = entry['upload_id']
break
return status, upload_id
def get_payload(self, uid):
timenow = datetime.datetime.utcnow()
if self.db_name == 'aflowlib':
......@@ -301,11 +363,23 @@ class DbUpdater:
'http://aflowlib.org',
'http://www.sciencedirect.com/science/article/pii/S0927025612000687'],
coauthors=[
148, 149, 150, 151, 152, 146, 145, 138, 137,
136, 135, 134, 147, 125],
'f409d859-2639-4f82-b198-85e1c7c62f8b',
'580f7036-97b8-42b1-a9e6-815058bcac72',
'4c8d767d-335f-4ccd-9459-d0152b2026bc',
'7effd16a-a65c-4d95-b692-652488d94146',
'd59b3610-6335-4ad8-aca0-24a905de3a25',
'3df68fed-6ca0-4bf9-b2b1-d6be71e18f72',
'f7b540eb-a266-4379-9611-911ed8e3630e',
'7b3fe468-0011-4ba8-bd53-1ee65acda114',
'48e7a028-1a41-440d-9986-38540f5079c9',
'd63d07e6-ccc8-4eac-82af-d841627b6c53',
'9c308f66-eed1-4996-b33e-af78fb4944c7',
'd2621bc7-c45a-4d35-9dc1-5c05fa8326cb',
'ecba0e68-65ee-4b40-8fbf-a42714b1072b',
'81b96683-7170-49d7-8c4e-e9f34906b3ea'],
shared_with=[],
_upload_time=timenow,
_uploader=125))
_uploader='81b96683-7170-49d7-8c4e-e9f34906b3ea'))
def publish(self, uids=None):
print('Publishing')
......@@ -321,21 +395,6 @@ class DbUpdater:
local_path=os.path.abspath(file_path), name=name).response().result
return res.upload_id
def register_upload(self, donelist, plist, pn):
data = []
for i in plist:
if i in donelist:
self.is_updated_list[i] = True
else:
data.append(self.update_list[i])
self._write_to_file(data, self.outfile + '_%d' % pn)
def aggregate_procs(self):
data = []
for i in range(self.parallel):
data += self._read_from_file(self.outfile + '_%d' % i)
self._write_to_file(data, self.outfile + '_updated')
def download_proc(self, plist, pn):
size = 0.0
max_zip_size = config.max_upload_size
......@@ -349,18 +408,21 @@ class DbUpdater:
dirs.append(d)
done.append(plist[i])
if size > max_zip_size or i == (len(plist) - 1):
tstamp = datetime.datetime.now().strftime('_%y%m%d%H%M%S%f')
tname = self._to_string(dirs[0].lstrip(self._local_path))
tname = os.path.join(self._local_path, tname + '%s.tar' % tstamp)
self._tar_files(dirs, tname)
uid = nomad_upload.upload_file(tname, name='AFLOWLIB_%s' % tstamp, offline=True)
if len(dirs) == 0:
continue
tarname, uploadname = self._make_name(dirs)
status, uid = self._get_status_upload(uploadname)
if status == 'published':
continue
if status != 'uploaded':
self._tar_files(dirs, tarname)
uid = nomad_upload.upload_file(tarname, name=uploadname, offline=True)
if self.do_publish:
self.publish([uid])
self.uids.append(uid)
self.register_upload(done, plist, pn)
if self.cleanup:
self._cleanup(dirs)
self._cleanup(tname)
self._cleanup(tarname)
size = 0.0
dirs = []
......@@ -374,13 +436,15 @@ class DbUpdater:
continue
plist[cur % self.parallel].append(i)
cur += 1
procs = []
for i in range(self.parallel):
p = threading.Thread(target=self.download_proc, args=(plist[i], i,))
procs.append(p)
[p.start() for p in procs]
[p.join() for p in procs]
self.aggregate_procs()
if self.parallel > 1:
procs = []
for i in range(self.parallel):
p = threading.Thread(target=self.download_proc, args=(plist[i], i,))
procs.append(p)
[p.start() for p in procs]
[p.join() for p in procs]
else:
self.download_proc(plist[0], 0)
print('Time for download and upload (s)', time.time() - s)
def prep_list(self):
......@@ -396,11 +460,15 @@ class DbUpdater:
self.update_list.append(l[0])
self.is_updated_list.append(l[1])
else:
procs = []
procs.append(threading.Thread(target=self.get_db_list))
procs.append(threading.Thread(target=self.get_nomad_list))
[p.start() for p in procs]
[p.join() for p in procs]
if self.parallel > 1:
procs = []
procs.append(threading.Thread(target=self.get_db_list))
procs.append(threading.Thread(target=self.get_nomad_list))
[p.start() for p in procs]
[p.join() for p in procs]
else:
self.get_db_list()
self.get_nomad_list()
self.compare_lists()
def update(self):
......
......@@ -193,8 +193,6 @@ class DomainQuantity:
0 (the default) means no aggregations.
metric: Indicates that this quantity should be used as search metric. Values need
to be tuples with metric name and elastic aggregation (e.g. sum, cardinality)
zero_aggs: Return aggregation values for values with zero hits in the search. Default
is with zero aggregations.
elastic_mapping: An optional elasticsearch_dsl mapping. Default is ``Keyword``.
elastic_search_type: An optional elasticsearch search type. Default is ``term``.
elastic_field: An optional elasticsearch key. Default is the name of the quantity.
......@@ -206,8 +204,7 @@ class DomainQuantity:
def __init__(
self, description: str = None, multi: bool = False, aggregations: int = 0,
order_default: bool = False, metric: Tuple[str, str] = None,
zero_aggs: bool = True, metadata_field: str = None,
elastic_mapping: type = None,
metadata_field: str = None, elastic_mapping: type = None,
elastic_search_type: str = 'term', elastic_field: str = None,
elastic_value: Callable[[Any], Any] = None,
argparse_action: str = 'append'):
......@@ -218,7 +215,6 @@ class DomainQuantity:
self.order_default = order_default
self.aggregations = aggregations
self.metric = metric
self.zero_aggs = zero_aggs
self.elastic_mapping = elastic_mapping
self.elastic_search_type = elastic_search_type
self.metadata_field = metadata_field
......
......@@ -18,7 +18,7 @@ DFT specific metadata
from typing import List
import re
from elasticsearch_dsl import Integer, Object, InnerDoc
from elasticsearch_dsl import Integer, Object, InnerDoc, Keyword
import ase.data
from nomadcore.local_backend import ParserEvent
......@@ -82,9 +82,15 @@ class Label(MSection):
m_def = Section(a_elastic=dict(type=InnerDoc))
label = Quantity(type=str)
type = Quantity(type=MEnum('compound_class', 'classification', 'prototype', 'prototype_id'))
source = Quantity(type=MEnum('springer', 'aflow_prototype_library'))
label = Quantity(type=str, a_elastic=dict(type=Keyword))
type = Quantity(type=MEnum(
'compound_class', 'classification', 'prototype', 'prototype_id'),
a_elastic=dict(type=Keyword))
source = Quantity(
type=MEnum('springer', 'aflow_prototype_library'),
a_elastic=dict(type=Keyword))
ESLabel = elastic_mapping(Label.m_def, InnerDoc)
......@@ -253,6 +259,13 @@ def only_atoms(atoms):
return ''.join(only_atoms)
def _elastic_label_value(label):
if isinstance(label, str):
return label
else:
return elastic_obj(label, ESLabel)
Domain(
'DFT', DFTCalcWithMetadata,
quantities=dict(
......@@ -261,7 +274,7 @@ Domain(
order_default=True),
atoms=DomainQuantity(
'The atom labels of all atoms in the simulated system.',
aggregations=len(ase.data.chemical_symbols), multi=True, zero_aggs=False),
aggregations=len(ase.data.chemical_symbols), multi=True),
only_atoms=DomainQuantity(
'The atom labels concatenated in species-number order. Used with keyword search '
'to facilitate exclusive searches.',
......@@ -304,7 +317,7 @@ Domain(
'Search based for springer classification and aflow prototypes',
elastic_field='labels.label',
elastic_mapping=Object(ESLabel),
elastic_value=lambda labels: [elastic_obj(label, ESLabel) for label in labels],
elastic_value=lambda labels: [_elastic_label_value(label) for label in labels],
multi=True),
optimade=DomainQuantity(
'Search based on optimade\'s filter query language',
......
......@@ -132,6 +132,10 @@ class DOI(Document):
status_code=response.status_code, body=response.content,
doi=self.doi)
return False
else:
return True
def create_draft(self):
if config.datacite.enabled:
assert self.state == 'created', 'can only create a draft for created DOIs'
......@@ -140,9 +144,9 @@ class DOI(Document):
headers={'Content-Type': 'application/xml;charset=UTF-8'},
data=self.metadata_xml, **_requests_args())
self.__handle_datacite_errors(response, 'create draft DOI')
self.state = 'draft'
self.save()
if self.__handle_datacite_errors(response, 'create draft DOI'):
self.state = 'draft'
self.save()
def delete(self, *args, **kwargs):
if config.datacite.enabled:
......@@ -156,10 +160,11 @@ class DOI(Document):
def make_findable(self):
if config.datacite.enabled:
assert self.state == 'draft', 'can only make drafts findable'
body = ('doi= %s\nurl= %s' % (self.doi, self.url)).encode('utf-8')
requests.post(
body = ('doi=%s\nurl=%s' % (self.doi, self.url)).encode('utf-8')
response = requests.put(
self.doi_url, **_requests_args(),
headers={'Content-Type': 'text/plain;charset=UTF-8'}, data=body)
self.state = 'findable'
self.save()
if self.__handle_datacite_errors(response, 'make DOI findable'):
self.state = 'findable'
self.save()
......@@ -55,7 +55,6 @@ def setup():
Will create client instances for the databases and has to be called before they
can be used.
"""
global elastic_client
setup_logging()
setup_mongo()
setup_elastic()
......
......@@ -461,6 +461,9 @@ class NomadCeleryRequest(Request):
def on_failure(self, exc_info, send_failed_event=True, return_ok=False):
if isinstance(exc_info.exception, WorkerLostError):
infrastructure.setup()
utils.get_logger(__name__).error(
'detected WorkerLostError', exc_info=exc_info.exception)
self._fail(
'task failed due to worker lost: %s' % str(exc_info.exception),
exc_info=exc_info)
......
......@@ -158,9 +158,8 @@ class Entry(Document, metaclass=WithDomain):
self.external_id = source.external_id
for quantity in datamodel.Domain.instance.domain_quantities.values():
setattr(
self, quantity.name,
quantity.elastic_value(getattr(source, quantity.metadata_field)))
quantity_value = quantity.elastic_value(getattr(source, quantity.metadata_field))
setattr(self, quantity.name, quantity_value)
def delete_upload(upload_id):
......@@ -441,10 +440,7 @@ class SearchRequest:
The basic doc_count metric ``code_runs`` is always given.
"""
quantity = quantities[quantity_name]
min_doc_count = 0 if quantity.zero_aggs else 1
terms = A(
'terms', field=quantity.elastic_field, size=size, min_doc_count=min_doc_count,
order=dict(_key='asc'))
terms = A('terms'