Commit e4c25332 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Merge branch 'experimental' into 'master'

Occasional merge towards v0.4.4

See merge request !43
parents 3689a5cd 8a7610db
Pipeline #47826 passed with stages
in 17 minutes and 24 seconds
......@@ -16,4 +16,3 @@ target/
*.swp
*.vscode
nomad.yaml
nomad/gitinfo.py
......@@ -199,6 +199,10 @@ class RepoCalcsResource(Resource):
if g.user is None:
abort(401, message='Authentication required for owner value user.')
q = Q('term', published=False) & Q('term', owners__user_id=g.user.user_id)
elif owner == 'admin':
if g.user is None or not g.user.is_admin:
abort(401, message='This can only be used by the admin user.')
q = None
else:
abort(400, message='Invalid owner value. Valid values are all|user|staging, default is all')
......
......@@ -295,19 +295,20 @@ def pid_prefix(prefix: int):
@click.option('--delete-failed', default='', type=str, help='String from N, U, P to determine if empty (N), failed (U), or failed to publish (P) uploads should be deleted or kept for debugging.')
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
@click.option('--create-packages', is_flag=True, help='Indicate that packages should be created, if they do not already exist.')
@click.option('--republish', is_flag=True, help='Will only republish already published packages.')
@click.option('--only-republish', is_flag=True, help='Will only republish already published packages.')
@click.option('--republish', is_flag=True, help='Will process normally and republish already published packages.')
@click.option('--all', is_flag=True, help='Go through all known packages. Ignores pattern and args.')
@click.option('--wait', default=0, type=int, help='Wait for a random (upto given) number of seconds before each upload to scatter io and compute heavy processing tasks.')
def upload(
upload_paths: list, pattern: str, parallel: int, delete_failed: str,
create_packages: bool, republish: bool, wait: int, all: bool):
create_packages: bool, only_republish: bool, republish: bool, wait: int, all: bool):
infrastructure.setup_logging()
infrastructure.setup_mongo()
_Migration(threads=parallel).migrate(
*determine_upload_paths(upload_paths, pattern=pattern, all=all), delete_failed=delete_failed,
create_packages=create_packages, only_republish=republish, wait=wait)
create_packages=create_packages, only_republish=only_republish, wait=wait, republish=republish)
@migration.command(help='Get an report about not migrated calcs based on source calcs.')
......
......@@ -41,11 +41,12 @@ To load an entity from the database use :data:`nomad.infrastructure.repository_d
.. autoclass:: Calc
:members:
:undoc-members:
.. autofunction:: create_handle
.. autoclass:: DataSet
:members:
:undoc-members:
"""
from .user import User, ensure_test_user, admin_user, LoginException
from .calc import Calc, DataSet
from .calc import Calc, DataSet, create_handle
from .upload import UploadMetaData, Upload
......@@ -19,7 +19,7 @@ from sqlalchemy.orm import relationship, aliased
from sqlalchemy.sql.expression import literal
from datetime import datetime
from nomad import infrastructure, utils
from nomad import infrastructure, utils, config
from nomad.datamodel import DFTCalcWithMetadata
from . import base
......@@ -29,6 +29,25 @@ from .base import Base, calc_citation_association, ownership, co_authorship, sha
CodeVersion, StructRatio, UserMetaData
handle_base = '0123456789abcdefghijklmnopqrstuvwxyz'
def create_handle(pid: int) -> str:
"""
Create a handle for the given pid. The pid is an autoincrement number. The handle
a 'base32' encoded string of that number. Therefore, its string representation is a
little shorter. The handle is prefixed with the configured handle prefix.
"""
value = pid
result = ''
while value > 0:
result += handle_base[value & 31]
value = value >> 5
return config.repository_db.handle_prefix + result[::-1]
class PublishContext:
"""
Utilities necessary during adding calculations to the repo db.
......@@ -58,6 +77,7 @@ class Calc(Base):
__tablename__ = 'calculations'
coe_calc_id = Column('calc_id', Integer, primary_key=True, autoincrement=True)
handlepid = Column(String)
origin_id = Column(Integer, ForeignKey('uploads.upload_id'))
upload = relationship('Upload', lazy='joined')
checksum = Column(String)
......
......@@ -54,7 +54,7 @@ from sqlalchemy import exc as sa_exc
from nomad import utils, infrastructure, config
from nomad.datamodel import UploadWithMetadata, DFTCalcWithMetadata
from .calc import Calc, PublishContext
from .calc import Calc, PublishContext, create_handle
from .base import Base
from .user import User
......@@ -186,6 +186,7 @@ class Upload(Base): # type: ignore
# reuse the cache for the whole transaction to profit from repeating
# star schema entries for users, ds, topics, etc.
context = PublishContext(upload_id=upload.upload_id)
coe_calcs = []
for calc in upload.calcs:
has_calcs = True
coe_calc = Calc(
......@@ -193,6 +194,7 @@ class Upload(Base): # type: ignore
checksum=calc.calc_id,
upload=coe_upload)
repo_db.add(coe_calc)
coe_calcs.append(coe_calc)
coe_calc.apply_calc_with_metadata(
cast(DFTCalcWithMetadata, calc), context=context)
......@@ -203,6 +205,11 @@ class Upload(Base): # type: ignore
result = None
if has_calcs:
repo_db.flush()
for coe_calc in coe_calcs:
coe_calc.handlepid = create_handle(coe_calc.coe_calc_id)
logger.debug('created all handlepids')
repo_db.commit()
logger.info('committed publish transaction')
result = coe_upload
......
......@@ -97,7 +97,8 @@ repository_db = NomadConfig(
port=5432,
dbname='nomad_fairdi_repo_db',
user='postgres',
password='nomad'
password='nomad',
handle_prefix='21.11132/'
)
mongo = NomadConfig(
......
# This file will be overridden during build.
# The values here are just placeholder
log = "git commit msg placeholder"
ref = "git ref placeholder"
version = "git version placeholder"
commit = "git commit sha placeholder"
......@@ -152,11 +152,15 @@ class Package(Document):
migration_version = IntField(default=-1)
""" The version of the last successful migration of this package """
migration_id = StringField()
""" A random uuid that ids the migration run on this package """
report = DictField()
""" The report of the last successful migration of this package """
migration_failure = StringField()
""" String that describe the cause for last failed migration attempt """
migration_failure_type = StringField()
""" The type of migration failure: ``no_calcs``, ``processing``, ``publish``, ``exception``. """
meta = dict(indexes=['upload_id', 'migration_version'])
......@@ -293,9 +297,9 @@ class Package(Document):
'packaged upload', source_upload_id=upload_id, source_upload_path=upload_path,
restricted=restricted)
return package_query
return package_query.timeout(False)
else:
return cls.objects(upload_id=upload_id)
return cls.objects(upload_id=upload_id).timeout(False)
@classmethod
@contextmanager
......@@ -625,6 +629,7 @@ class NomadCOEMigration:
self.logger = utils.get_logger(__name__, migration_version=migration_version)
self.migration_version = migration_version
self.migration_id = utils.create_uuid()
self.package_directory = package_directory if package_directory is not None else config.fs.migration_packages
self.compress_packages = compress_packages
self._client = None
......@@ -803,7 +808,7 @@ class NomadCOEMigration:
def migrate(
self, *args, delete_failed: str = '',
create_packages: bool = False, only_republish: bool = False,
wait: int = 0) -> utils.POPO:
wait: int = 0, republish: bool = False) -> utils.POPO:
"""
Migrate the given uploads.
......@@ -827,7 +832,12 @@ class NomadCOEMigration:
create_packages: If True, it will attempt to create upload packages if they
do not exists.
only_republish: If the package exists and is published, it will be republished.
Nothing else. Useful to reindex/recreate coe repo, etc.
Nothing else. Useful to reindex/recreate coe repo, etc. This will not
reapply the metadata (see parameter ``republish``).
republish: Normally packages that are already uploaded and published are not republished.
If true, already published packages are republished. This is different from
``only_republish``, because the package metadata will be updated, calc diffs
recomputed, etc.
offset: Will add a random sleep before migrating each package between 0 and
``wait`` seconds.
......@@ -862,25 +872,28 @@ class NomadCOEMigration:
self.logger.info('wait for a random amount of time')
time.sleep(random.randint(0, wait))
package_report = self.migrate_package(package, delete_failed=delete_failed)
package_report = self.migrate_package(package, delete_failed=delete_failed, republish=republish)
except Exception as e:
package_report = Report()
package_report.failed_packages = 1
event = 'unexpected exception while migrating packages'
package.migration_failure = event + ': ' + str(e)
package.migration_failure_type = 'exception'
logger.error(event, exc_info=e)
finally:
package.report = package_report
package.migration_version = self.migration_version
package.save()
with cv:
package.migration_id = self.migration_id
package.save()
try:
overall_report.add(package_report)
migrated_all_packages = all(
p.migration_version == self.migration_version
p.migration_id == self.migration_id
for p in Package.objects(upload_id=package.upload_id))
if migrated_all_packages:
......@@ -931,7 +944,7 @@ class NomadCOEMigration:
logger = self.logger.bind(package_id=package_id, source_upload_id=source_upload_id)
uploads = self.call_api('uploads.get_uploads', name=package_id)
uploads = self.call_api('uploads.get_uploads', all=True, name=package_id)
if len(uploads) > 1:
self.logger.warning('upload name is not unique')
if len(uploads) == 0:
......@@ -971,12 +984,12 @@ class NomadCOEMigration:
if scroll_id != 'first':
scroll_args['scroll_id'] = scroll_id
search = self.call_api('repo.search', upload_id=upload_id, **scroll_args)
search = self.call_api('repo.search', upload_id=upload_id, owner='admin', **scroll_args)
scroll_id = search.scroll_id
for calc in search.results:
yield calc
def migrate_package(self, package: Package, delete_failed: str = '') -> 'Report':
def migrate_package(self, package: Package, delete_failed: str = '', republish: bool = False) -> 'Report':
""" Migrates the given package. For other params see :func:`migrate`. """
source_upload_id = package.upload_id
......@@ -986,15 +999,16 @@ class NomadCOEMigration:
logger.debug('start to process package')
report = Report()
report.total_packages += 1
report.total_packages = 1
# check if the package is already uploaded
upload = None
try:
uploads = self.call_api('uploads.get_uploads', name=package_id)
uploads = self.call_api('uploads.get_uploads', all=True, name=package_id)
if len(uploads) > 1:
event = 'duplicate upload name'
package.migration_failure(event)
package.migration_failure_type = 'exception'
report.failed_packages += 1
return report
elif len(uploads) == 1:
......@@ -1004,6 +1018,7 @@ class NomadCOEMigration:
event = 'could not verify if upload already exists'
logger.error(event, exc_info=e)
package.migration_failure(event)
package.migration_failure_type = 'exception'
report.failed_packages += 1
return report
......@@ -1017,6 +1032,7 @@ class NomadCOEMigration:
event = 'could not upload package'
logger.error(event, exc_info=e)
package.migration_failure = event + ': ' + str(e)
package.migration_failure_type = 'processing'
report.failed_packages += 1
return report
else:
......@@ -1094,6 +1110,7 @@ class NomadCOEMigration:
event = 'failed to process upload'
logger.error(event, process_errors=upload.errors)
package.migration_failure = event + ': ' + str(upload.errors)
package.migration_failure_type = 'processing'
report.failed_packages += 1
delete_upload(FAILED_PROCESSING)
return report
......@@ -1169,7 +1186,7 @@ class NomadCOEMigration:
logger.error('missmatch between processed calcs and calcs found with search')
# publish upload
if len(calc_mainfiles) > 0 and not upload.published:
if len(calc_mainfiles) > 0 and (republish or not upload.published):
with utils.timer(logger, 'upload published'):
upload_metadata = dict(with_embargo=(package.restricted > 0))
upload_metadata['calculations'] = [
......@@ -1194,10 +1211,13 @@ class NomadCOEMigration:
report.new_calcs = 0
report.failed_packages += 1
package.migration_failure = event + ': ' + str(upload.errors)
package.migration_failure_type = 'publish'
delete_upload(FAILED_PUBLISH)
SourceCalc.objects(upload=source_upload_id, mainfile__in=calc_mainfiles) \
.update(migration_version=-1)
if not upload.published:
# only do this if the upload was not publish with prior migration
delete_upload(FAILED_PUBLISH)
SourceCalc.objects(upload=source_upload_id, mainfile__in=calc_mainfiles) \
.update(migration_version=-1)
else:
SourceCalc.objects(upload=source_upload_id, mainfile__in=calc_mainfiles) \
.update(migration_version=self.migration_version)
......@@ -1206,6 +1226,9 @@ class NomadCOEMigration:
logger.info('package upload already published, skip publish')
else:
delete_upload(NO_PROCESSED_CALCS)
report.failed_packages += 1
package.migration_failure = 'no calculcations found'
package.migration_failure_type = 'no_calcs'
logger.info('no successful calcs, skip publish')
logger.info('migrated package', **report)
......@@ -1219,10 +1242,14 @@ class NomadCOEMigration:
_uploader=calc_with_metadata.uploader['id'],
_pid=calc_with_metadata.pid,
references=[ref['value'] for ref in calc_with_metadata.references],
datasets=[dict(
id=ds['id'],
_doi=ds.get('doi', {'value': None})['value'],
_name=ds.get('name', None)) for ds in calc_with_metadata.datasets],
datasets=[
dict(
id=ds['id'],
_doi=ds.get('doi', {'value': None})['value'],
_name=ds.get('name', None))
for ds in calc_with_metadata.datasets
if ds is not None
] if calc_with_metadata.datasets is not None else [],
mainfile=calc_with_metadata.mainfile,
with_embargo=calc_with_metadata.with_embargo,
comment=calc_with_metadata.comment,
......
......@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any
import pytest
import time
import json
......@@ -623,6 +624,17 @@ class TestRepo():
calc_id='4', uploader=other_test_user.to_popo(), published=True, with_embargo=True)
search.Entry.from_calc_with_metadata(calc_with_metadata).save(refresh=True)
def assert_search(self, rv: Any, number_of_calcs: int) -> dict:
assert rv.status_code == 200
data = json.loads(rv.data)
results = data.get('results', None)
assert results is not None
assert isinstance(results, list)
assert len(results) == number_of_calcs
return data
def test_own_calc(self, client, example_elastic_calcs, no_warn, test_user_auth):
rv = client.get('/repo/0/1', headers=test_user_auth)
assert rv.status_code == 200
......@@ -663,12 +675,8 @@ class TestRepo():
def test_search_owner(self, client, example_elastic_calcs, no_warn, test_user_auth, other_test_user_auth, calcs, owner, auth):
auth = dict(none=None, test_user=test_user_auth, other_test_user=other_test_user_auth).get(auth)
rv = client.get('/repo/?owner=%s' % owner, headers=auth)
assert rv.status_code == 200
data = json.loads(rv.data)
data = self.assert_search(rv, calcs)
results = data.get('results', None)
assert results is not None
assert isinstance(results, list)
assert len(results) == calcs
if calcs > 0:
for key in ['uploader', 'calc_id', 'formula', 'upload_id']:
assert key in results[0]
......@@ -696,13 +704,7 @@ class TestRepo():
query_string = '?%s' % query_string
rv = client.get('/repo/%s' % query_string)
assert rv.status_code == 200
data = json.loads(rv.data)
results = data.get('results', None)
assert results is not None
assert isinstance(results, list)
assert len(results) == calcs
self.assert_search(rv, calcs)
@pytest.mark.parametrize('calcs, quantity, value', [
(2, 'system', 'bulk'),
......@@ -720,15 +722,9 @@ class TestRepo():
])
def test_search_quantities(self, client, example_elastic_calcs, no_warn, test_user_auth, calcs, quantity, value):
query_string = '%s=%s' % (quantity, ','.join(value) if isinstance(value, list) else value)
rv = client.get('/repo/?%s' % query_string, headers=test_user_auth)
assert rv.status_code == 200
data = json.loads(rv.data)
results = data.get('results', None)
assert results is not None
assert isinstance(results, list)
assert len(results) == calcs
rv = client.get('/repo/?%s' % query_string, headers=test_user_auth)
data = self.assert_search(rv, calcs)
aggregations = data.get('aggregations', None)
assert aggregations is not None
......@@ -740,6 +736,17 @@ class TestRepo():
metrics_permutations = [[], search.metrics_names] + [[metric] for metric in search.metrics_names]
def test_search_admin(self, client, example_elastic_calcs, no_warn, admin_user_auth):
rv = client.get('/repo/?owner=admin', headers=admin_user_auth)
self.assert_search(rv, 4)
def test_search_admin_auth(self, client, example_elastic_calcs, no_warn, test_user_auth):
rv = client.get('/repo/?owner=admin', headers=test_user_auth)
assert rv.status_code == 401
rv = client.get('/repo/?owner=admin')
assert rv.status_code == 401
@pytest.mark.parametrize('metrics', metrics_permutations)
def test_search_total_metrics(self, client, example_elastic_calcs, no_warn, metrics):
rv = client.get('/repo/?total_metrics=%s' % ','.join(metrics))
......
......@@ -17,7 +17,7 @@ from typing import cast
from passlib.hash import bcrypt
from datetime import datetime
from nomad.coe_repo import User, Calc, Upload
from nomad.coe_repo import User, Calc, Upload, create_handle
from nomad.coe_repo.calc import PublishContext
from nomad import processing, parsing, datamodel
......@@ -65,15 +65,18 @@ def assert_coe_upload(upload_id: str, upload: datamodel.UploadWithMetadata = Non
if user_metadata is not None:
calc.apply_user_metadata(user_metadata)
assert_coe_calc(coe_calc, cast(datamodel.DFTCalcWithMetadata, calc))
assert_coe_calc(coe_calc, cast(datamodel.DFTCalcWithMetadata, calc), has_handle=True)
if upload is not None and upload.upload_time is not None:
assert coe_upload.created.isoformat()[:26] == upload.upload_time.isoformat()
def assert_coe_calc(coe_calc: Calc, calc: datamodel.DFTCalcWithMetadata):
def assert_coe_calc(coe_calc: Calc, calc: datamodel.DFTCalcWithMetadata, has_handle: bool = False):
if calc.pid is not None:
assert coe_calc.pid == calc.pid
elif has_handle:
assert coe_calc.pid is not None
assert create_handle(coe_calc.pid) == coe_calc.handlepid
# calc data
assert len(coe_calc.files) == len(calc.files)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment