Commit c2191f15 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Merge branch 'migration' into 'master'

Merged latest changes torwards 4.2.

See merge request !34
parents ec46f648 0d98bc8f
Pipeline #44885 failed with stages
in 11 seconds
......@@ -17,10 +17,12 @@ stages:
- deploy
variables:
TEST_IMAGE: gitlab-registry.mpcdf.mpg.de/nomad-lab/nomad-fair:${CI_COMMIT_REF_NAME}
RELEASE_IMAGE: gitlab-registry.mpcdf.mpg.de/nomad-lab/nomad-fair:latest
FRONTEND_TEST_IMAGE: gitlab-registry.mpcdf.mpg.de/nomad-lab/nomad-fair/frontend:${CI_COMMIT_REF_NAME}
FRONTEND_RELEASE_IMAGE: gitlab-registry.mpcdf.mpg.de/nomad-lab/nomad-fair/frontend:latest
TEST_IMAGE: gitlab-registry.mpcdf.mpg.de/nomad-lab/nomad-fair:test_${CI_COMMIT_REF_NAME}
RELEASE_IMAGE: gitlab-registry.mpcdf.mpg.de/nomad-lab/nomad-fair:${CI_COMMIT_REF_NAME}
LATEST_IMAGE: gitlab-registry.mpcdf.mpg.de/nomad-lab/nomad-fair:latest
FRONTEND_TEST_IMAGE: gitlab-registry.mpcdf.mpg.de/nomad-lab/nomad-fair/frontend:test_${CI_COMMIT_REF_NAME}
FRONTEND_RELEASE_IMAGE: gitlab-registry.mpcdf.mpg.de/nomad-lab/nomad-fair/frontend:${CI_COMMIT_REF_NAME}
FRONTEND_LATEST_IMAGE: gitlab-registry.mpcdf.mpg.de/nomad-lab/nomad-fair/frontend:latest
RAWAPI_TEST_IMAGE: gitlab-registry.mpcdf.mpg.de/nomad-lab/nomad-fair/rawapi:${CI_COMMIT_REF_NAME}
RAWAPI_RELEASE_IMAGE: gitlab-registry.mpcdf.mpg.de/nomad-lab/nomad-fair/rawapi:latest
......@@ -38,6 +40,7 @@ build:
- docker push $TEST_IMAGE
except:
- /^dev-.*$/
- tags
buildgui:
......@@ -51,6 +54,7 @@ buildgui:
- docker push $FRONTEND_TEST_IMAGE
except:
- /^dev-.*$/
- tags
buildrawapi:
......@@ -72,6 +76,8 @@ linting:
- python -m pylint --load-plugins=pylint_mongoengine nomad tests
- python -m mypy --ignore-missing-imports --follow-imports=silent --no-strict-optional nomad tests
except:
refs:
- tags
variables:
- $CI_COMMIT_REF_NAME =~ /^dev-.*$/
- $CI_COMMIT_MESSAGE =~ /\[skip[ _-]tests?\]/i
......@@ -106,6 +112,8 @@ tests:
- cd /app
- python -m pytest --cov=nomad -sv tests
except:
refs:
- tags
variables:
- $CI_COMMIT_REF_NAME =~ /^dev-.*$/
- $CI_COMMIT_MESSAGE =~ /\[skip[ _-]tests?\]/i
......@@ -154,14 +162,31 @@ release:
script:
- docker login -u gitlab-ci-token -p $CI_BUILD_TOKEN gitlab-registry.mpcdf.mpg.de
- docker pull $TEST_IMAGE
- docker tag $TEST_IMAGE $LATEST_IMAGE
- docker push $LATEST_IMAGE
- docker pull $FRONTEND_TEST_IMAGE
- docker tag $FRONTEND_TEST_IMAGE $FRONTEND_LATEST_IMAGE
- docker push $FRONTEND_LATEST_IMAGE
except:
- /^dev-.*$/
when: manual
release_version:
stage: release
script:
- docker login -u gitlab-ci-token -p $CI_BUILD_TOKEN gitlab-registry.mpcdf.mpg.de
- docker pull $TEST_IMAGE
- docker tag $TEST_IMAGE $LATEST_IMAGE
- docker push $LATEST_IMAGE
- docker tag $TEST_IMAGE $RELEASE_IMAGE
- docker push $RELEASE_IMAGE
- docker pull $FRONTEND_TEST_IMAGE
- docker tag $FRONTEND_TEST_IMAGE $FRONTEND_LATEST_IMAGE
- docker push $FRONTEND_LATEST_IMAGE
- docker tag $FRONTEND_TEST_IMAGE $FRONTEND_RELEASE_IMAGE
- docker push $FRONTEND_RELEASE_IMAGE
except:
- /^dev-.*$/
when: manual
only:
- tags
release_rawapi:
stage: release
......@@ -194,7 +219,7 @@ deploy:
- mkdir -p /etc/deploy
# kube_config is a CI/CD variable set in GitLab GUI
- echo $CI_KUBE_CONFIG | base64 -d > /etc/deploy/config
- helm init --client-only
- helm init --upgrade
- helm repo add stable https://kubernetes-charts.storage.googleapis.com/
- helm repo add incubator https://kubernetes-charts-incubator.storage.googleapis.com/
- helm repo update
......@@ -208,9 +233,11 @@ deploy:
- export DEPLOYS=$(helm ls | grep $RELEASE_NAME | wc -l)
- export EXTERNAL_PATH="/fairdi/nomad/v${NOMAD_VERSION}"
- export DBNAME="fairdi_nomad_v${NOMAD_VERSION//./_}"
- export FILES_PATH="/scratch/nomad-fair/fs/nomad_v${NOMAD_VERSION}"
- export FILES_PATH="/nomad/fairdi/nomad_v${NOMAD_VERSION}/fs"
- if [ ${DEPLOYS} -eq 0 ]; then
helm install --name=${RELEASE_NAME} . --namespace=${STAGING_NAMESPACE}
--set images.nomad.tag=${CI_COMMIT_REF_NAME}
--set images.frontend.tag=${CI_COMMIT_REF_NAME}
--set api.disableReset="false"
--set proxy.nodePort="300${NUMERIC_VERSION//./}"
--set proxy.external.path=${EXTERNAL_PATH}
......@@ -219,8 +246,10 @@ deploy:
--set worker.memrequest=32
--set volumes.files=${FILES_PATH};
else
helm upgrade ${RELEASE_NAME} . --namespace=${STAGING_NAMESPACE} --recreate-pods;
helm upgrade ${RELEASE_NAME} . --namespace=${STAGING_NAMESPACE}
--set images.nomad.tag=${CI_COMMIT_REF_NAME}
--set images.frontend.tag=${CI_COMMIT_REF_NAME}
--recreate-pods;
fi
except:
- /^dev-.*$/
when: manual
only:
- tags
......@@ -52,3 +52,10 @@ python -m http.server 8888
```
Open [http://localhost:8888/html/setup.html](http://localhost:8888/html/setup.html) in
your browser.
## Change log
### v0.4.2
- bugfixes regarding the migration
- better migration configurability and reproducibility
- scales to multi node kubernetes deployment
\ No newline at end of file
Subproject commit 386c64df4c8e4acba3d3339a5b018f1178c5294f
Subproject commit aa7414dad899c0ff568802e80e030c13f97afe3c
"""
This is a brief example demonstrating the public nomad@FAIRDI API for doing operations
that might be necessary to integrate external project data.
"""
from bravado.requests_client import RequestsClient
from bravado.client import SwaggerClient
import math
from urllib.parse import urlparse
from concurrent.futures import ThreadPoolExecutor
# nomad_url = 'http://enc-staging-nomad.esc.rzg.mpg.de/fairdi/nomad/migration/api'
nomad_url = 'http://localhost:8000/nomad/api/'
user = 'admin'
password = 'password'
upload_file = 'external_project_example.zip'
# create the bravado client
host = urlparse(nomad_url).netloc.split(':')[0]
http_client = RequestsClient()
http_client.set_basic_auth(host, user, password)
client = SwaggerClient.from_url('%s/swagger.json' % nomad_url, http_client=http_client)
uploads = [upload.upload_id for upload in client.uploads.get_uploads().response().result]
executor = ThreadPoolExecutor(max_workers=10)
def run(upload_id):
upload = client.uploads.get_upload(upload_id=upload_id).response().result
upload_total_calcs = upload.calcs.pagination.total
per_page = 200
for page in range(1, math.ceil(upload_total_calcs / per_page) + 1):
search = client.repo.search(
page=page, per_page=per_page, order_by='mainfile',
upload_id=upload_id).response().result
print(search.pagination.page)
for upload in uploads:
executor.submit(lambda: run(upload))
......@@ -17,12 +17,14 @@ All APIs are served by one Flask app (:py:mod:`nomad.api.app`) under different p
"""
from flask import Flask, jsonify
from flask_restplus import Api
from flask_restplus import Api, fields
from flask_cors import CORS
from werkzeug.exceptions import HTTPException
from werkzeug.wsgi import DispatcherMiddleware
import os.path
import inspect
from datetime import datetime
import pytz
from nomad import config, utils
......@@ -120,3 +122,12 @@ def with_logger(func):
wrapper.__signature__ = wrapper_signature
return wrapper
class RFC3339DateTime(fields.DateTime):
def format(self, value):
if isinstance(value, datetime):
return super().format(value.replace(tzinfo=pytz.utc))
else:
str(value)
......@@ -41,7 +41,7 @@ from datetime import datetime
from nomad import config, processing, files, utils, coe_repo
from nomad.coe_repo import User, LoginException
from .app import app, api
from .app import app, api, RFC3339DateTime
app.config['SECRET_KEY'] = config.services.api_secret
auth = HTTPBasicAuth()
......@@ -145,7 +145,7 @@ user_model = api.model('User', {
'token': fields.String(
description='The access token that authenticates the user with the API. '
'User the HTTP header "X-Token" to provide it in API requests.'),
'created': fields.DateTime(dt_format='iso8601', description='The create date for the user.')
'created': RFC3339DateTime(description='The create date for the user.')
})
......@@ -229,7 +229,7 @@ class UserResource(Resource):
token_model = api.model('Token', {
'user': fields.Nested(user_model),
'token': fields.String(description='The short term token to sign URLs'),
'experies_at': fields.DateTime(desription='The time when the token expires')
'expiries_at': RFC3339DateTime(desription='The time when the token expires')
})
......
......@@ -29,7 +29,7 @@ from nomad.processing import Upload, FAILURE
from nomad.processing import ProcessAlreadyRunning
from nomad.files import ArchiveBasedStagingUploadFiles
from .app import api, with_logger
from .app import api, with_logger, RFC3339DateTime
from .auth import login_really_required
from .common import pagination_request_parser, pagination_model
......@@ -46,8 +46,8 @@ proc_model = api.model('Processing', {
'tasks_status': fields.String,
'errors': fields.List(fields.String),
'warnings': fields.List(fields.String),
'create_time': fields.DateTime(dt_format='iso8601'),
'complete_time': fields.DateTime(dt_format='iso8601'),
'create_time': RFC3339DateTime,
'complete_time': RFC3339DateTime,
'current_process': fields.String,
'process_running': fields.Boolean,
})
......@@ -64,7 +64,7 @@ metadata_model = api.model('MetaData', {
'references': fields.List(fields.String, descriptions='References allow to link calculations to external source, e.g. URLs.'),
'coauthors': fields.List(fields.Integer, description='A list of co-authors given by user_id.'),
'shared_with': fields.List(fields.Integer, description='A list of users to share calculations with given by user_id.'),
'_upload_time': fields.DateTime(dt_format='iso8601', description='Overrride the upload time.'),
'_upload_time': RFC3339DateTime(description='Overrride the upload time.'),
'_uploader': fields.Integer(description='Override the uploader with the given user id.'),
'datasets': fields.List(fields.Nested(model=dataset_model), description='A list of datasets.')
})
......@@ -84,9 +84,10 @@ upload_model = api.inherit('UploadProcessing', proc_model, {
'using the name query parameter.'),
'upload_id': fields.String(
description='The unique id for the upload.'),
'metadata': fields.Nested(model=upload_metadata_model, description='Additional upload and calculation meta data.'),
# TODO just removed during migration, where this get particularily large
# 'metadata': fields.Nested(model=upload_metadata_model, description='Additional upload and calculation meta data.'),
'local_path': fields.String,
'upload_time': fields.DateTime(dt_format='iso8601'),
'upload_time': RFC3339DateTime(),
})
calc_model = api.inherit('UploadCalculationProcessing', proc_model, {
......@@ -198,7 +199,7 @@ class UploadListResource(Resource):
data = request.stream.read(io.DEFAULT_BUFFER_SIZE)
received_data += len(data)
received_last += len(data)
if received_last > 1e6:
if received_last > 1e9:
received_last = 0
# TODO remove this logging or reduce it to debug
logger.info('received streaming data', size=received_data)
......
......@@ -15,9 +15,8 @@
import click
import time
import datetime
from ppworkflows import Workflow, GeneratorTask, StatusTask, SimpleTask
from nomad import config, infrastructure, utils
from nomad import config, infrastructure
from nomad.migration import NomadCOEMigration
from .main import cli
......@@ -96,36 +95,16 @@ def pid_prefix(prefix: int):
@click.argument('paths', nargs=-1)
@click.option('--create-packages', help='Allow migration to create package entries on the fly.', is_flag=True)
@click.option('--local', help='Create local upload files.', is_flag=True)
@click.option('--delete-local', help='Delete created local upload files after upload.', is_flag=True)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
@click.option('--migration-version', default=0, type=int, help='The version number, only packages with lower or no number will be migrated.')
def upload(paths: list, create_packages, local: bool, parallel: int, migration_version: int):
def upload(
paths: list, create_packages, local: bool, delete_local: bool, parallel: int,
migration_version: int):
def producer():
for path in paths:
yield path
def work(task):
infrastructure.setup_logging()
infrastructure.setup_mongo()
infrastructure.setup_logging()
infrastructure.setup_mongo()
logger = utils.get_logger(__name__)
migration = NomadCOEMigration(migration_version=migration_version)
while True:
path = task.get_one()
report = migration.migrate(path, create_packages=create_packages, local=local)
logger.info('migrated upload with result', upload_path=path, **report)
task.put([
('uploads: {:5d}', 1),
('migrated_calcs: {:7d}', report.migrated_calcs),
('failed_calcs: {:7d}', report.failed_calcs),
('calcs_with_diffs: {:7d}', report.calcs_with_diffs),
('new_calcs: {:7d}', report.new_calcs),
('missing_calcs: {:7d}', report.missing_calcs)])
workflow = Workflow()
workflow.add_task(GeneratorTask(producer), outputs=["paths"])
workflow.add_task(SimpleTask(work), input="paths", outputs=["sums"], runner_count=parallel)
workflow.add_task(StatusTask(), input="sums")
workflow.run()
migration = NomadCOEMigration(migration_version=migration_version, threads=parallel)
migration.migrate(
*paths, local=local, delete_local=delete_local, create_packages=create_packages)
......@@ -42,12 +42,14 @@ This module also provides functionality to add parsed calculation data to the db
:undoc-members:
"""
from typing import Type, Callable
from typing import Type
import datetime
from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey
from sqlalchemy.orm import relationship
import filelock
import os.path
from nomad import utils, infrastructure
from nomad import utils, infrastructure, config
from nomad.datamodel import UploadWithMetadata
from .calc import Calc, PublishContext
......@@ -109,7 +111,7 @@ class Upload(Base): # type: ignore
return self.created
@staticmethod
def publish(upload: UploadWithMetadata) -> Callable[[bool], int]:
def publish(upload: UploadWithMetadata) -> int:
"""
Add the upload to the NOMAD-coe repository db. It creates an
uploads-entry, respective calculation and property entries. Everything in one
......@@ -124,55 +126,75 @@ class Upload(Base): # type: ignore
"""
assert upload.uploader is not None
repo_db = infrastructure.repository_db
repo_db.begin()
logger = utils.get_logger(__name__, upload_id=upload.upload_id)
has_calcs = False
try:
# create upload
coe_upload = Upload(
upload_name=upload.upload_id,
created=upload.upload_time,
user_id=upload.uploader.id,
is_processed=True)
repo_db.add(coe_upload)
# add calculations and metadata
# reuse the cache for the whole transaction to profit from repeating
# star schema entries for users, ds, topics, etc.
context = PublishContext(upload_id=upload.upload_id)
for calc in upload.calcs:
has_calcs = True
coe_calc = Calc(
coe_calc_id=calc.pid,
checksum=calc.calc_id,
upload=coe_upload)
repo_db.add(coe_calc)
coe_calc.apply_calc_with_metadata(calc, context=context)
logger.debug('added calculation, not yet committed', calc_id=coe_calc.calc_id)
# commit
def complete(commit: bool) -> int:
if commit:
if has_calcs:
# empty upload case
repo_db.commit()
return coe_upload.coe_upload_id
else:
repo_db.rollback()
return -1
logger.info('added upload')
last_error = None
while True:
publish_filelock = filelock.FileLock(
os.path.join(config.fs.tmp, 'publish.lock'))
logger.info('waiting for filelock')
while True:
try:
publish_filelock.acquire(timeout=15 * 60, poll_intervall=1)
logger.info('acquired filelock')
break
except filelock.Timeout:
logger.warning('could not acquire publish lock after generous timeout')
repo_db = infrastructure.repository_db
repo_db.expunge_all()
repo_db.begin()
try:
has_calcs = False
# create upload
coe_upload = Upload(
upload_name=upload.upload_id,
created=upload.upload_time,
user_id=upload.uploader.id,
is_processed=True)
repo_db.add(coe_upload)
# add calculations and metadata
# reuse the cache for the whole transaction to profit from repeating
# star schema entries for users, ds, topics, etc.
context = PublishContext(upload_id=upload.upload_id)
for calc in upload.calcs:
has_calcs = True
coe_calc = Calc(
coe_calc_id=calc.pid,
checksum=calc.calc_id,
upload=coe_upload)
repo_db.add(coe_calc)
coe_calc.apply_calc_with_metadata(calc, context=context)
logger.debug(
'added calculation, not yet committed', calc_id=coe_calc.calc_id)
logger.info('filled publish transaction')
upload_id = -1
if has_calcs:
repo_db.commit()
logger.info('committed publish transaction')
upload_id = coe_upload.coe_upload_id
else:
# empty upload case
repo_db.rollback()
logger.info('rolled upload back')
return -1
return complete
except Exception as e:
logger.error('Unexpected exception.', exc_info=e)
repo_db.rollback()
raise e
logger.info('added upload')
return upload_id
except Exception as e:
repo_db.rollback()
if last_error != str(e):
last_error = str(e)
logger.error('Retry publish after unexpected execption.', exc_info=e)
else:
logger.error('Unexpected exception.', exc_info=e)
raise e
finally:
publish_filelock.release()
logger.info('released filelock')
......@@ -30,7 +30,7 @@ FilesConfig = namedtuple(
'FilesConfig', ['uploads_bucket', 'raw_bucket', 'archive_bucket', 'staging_bucket', 'public_bucket'])
""" API independent configuration for the object storage. """
CeleryConfig = namedtuple('Celery', ['broker_url', 'max_memory', 'timeout'])
CeleryConfig = namedtuple('Celery', ['broker_url', 'max_memory', 'timeout', 'acks_late'])
""" Used to configure the RabbitMQ for celery. """
FSConfig = namedtuple('FSConfig', ['tmp', 'objects', 'nomad_tmp'])
......@@ -86,7 +86,8 @@ def get_loglevel_from_env(key, default_level=logging.INFO):
celery = CeleryConfig(
broker_url=rabbit_url,
max_memory=int(os.environ.get('NOMAD_CELERY_MAXMEMORY', 64e6)), # 64 GB
timeout=int(os.environ.get('NOMAD_CELERY_TIMEOUT', 3 * 3600)) # 3h
timeout=int(os.environ.get('NOMAD_CELERY_TIMEOUT', 3 * 3600)), # 3h
acks_late=bool(os.environ.get('NOMAD_CELERY_ACKS_LATE', True))
)
fs = FSConfig(
......
......@@ -182,6 +182,11 @@ def sqlalchemy_repository_db(exists: bool = False, readonly: bool = True, **kwar
params = config.repository_db._asdict()
params.update(**kwargs)
url = 'postgresql://%s:%s@%s:%d/%s' % utils.to_tuple(params, 'user', 'password', 'host', 'port', 'dbname')
# We tried to set a very high isolation level, to prevent conflicts between transactions on the
# start-shaped schema, which usually involve read/writes to many tables at once.
# Unfortunately, this had week performance, and postgres wasn't even able to serialize on all
# occasions. We are now simply rollingback and retrying on conflicts.
# engine = create_engine(url, echo=False, isolation_level="SERIALIZABLE")
engine = create_engine(url, echo=False)
repository_db_conn = engine.connect()
......
......@@ -20,7 +20,7 @@ other/older nomad@FAIRDI instances to mass upload it to a new nomad@FAIRDI insta
.. autoclass:: SourceCalc
"""
from typing import Generator, Tuple, List, Iterable, IO, Any
from typing import Generator, Tuple, List, Iterable, IO, Any, Dict
import os
import os.path
import zipstream
......@@ -34,6 +34,7 @@ import glob
import os
import runstats
import io
import threading
from nomad import utils, infrastructure, config
from nomad.coe_repo import User, Calc, LoginException
......@@ -123,8 +124,11 @@ class Package(Document):
return iterable_to_stream(zip_file) # type: ignore
def create_package_upload_file(self) -> str:
""" Creates a zip file for the package in tmp and returns its path. """
def create_package_upload_file(self) -> Tuple[str, bool]:
"""
Creates a zip file for the package in tmp and returns its path and whether it
was created (``True``) or existed before (``False``).
"""
upload_filepath = os.path.join(config.fs.nomad_tmp, '%s.zip' % self.package_id)
if not os.path.exists(os.path.dirname(upload_filepath)):
os.mkdir(os.path.dirname(upload_filepath))
......@@ -135,8 +139,11 @@ class Package(Document):
for filename in self.filenames:
filepath = os.path.join(self.upload_path, filename)
zip_file.write(filepath, filename)
created = True
else:
created = False
return upload_filepath
return upload_filepath, created
@classmethod
def index(cls, *upload_paths):
......@@ -344,6 +351,8 @@ class NomadCOEMigration:
Arguments:
migration_version: The migration version. Only packages/calculations with
no migration version or a lower migration version are migrated.
threads: Number of threads to run migration in parallel.
quiet: Prints stats if not quiet
"""
default_sites = [
......@@ -357,10 +366,12 @@ class NomadCOEMigration:
archive_filename = 'archive.tar.gz'
""" The standard name for tarred uploads in the CoE repository. """
def __init__(self, migration_version: int = 0) -> None:
def __init__(self, migration_version: int = 0, threads: int = 1, quiet: bool = False) -> None:
self.logger = utils.get_logger(__name__, migration_version=migration_version)
self.migration_version = migration_version
self._client = None
self._threads = threads
self._quiet = quiet
self.source = infrastructure.repository_db
......@@ -414,6 +425,13 @@ class NomadCOEMigration:
else:
yield item
expected_differences = {
'0d': 'molecule / cluster',
'3d': 'bulk',
'2d': '2d / surface',
'+u': 'gga'
}
def _validate(self, repo_calc: dict, source_calc: CalcWithMetadata, logger) -> bool:
"""
Validates the given processed calculation, assuming that the data in the given
......@@ -435,7 +453,8 @@ class NomadCOEMigration:
def check_mismatch() -> bool:
# some exceptions
if source_value == '3d' and target_value == 'bulk':
if source_value in NomadCOEMigration.expected_differences and \
target_value == NomadCOEMigration.expected_differences.get(source_value):
return True
logger.info(
......@@ -468,7 +487,7 @@ class NomadCOEMigration:
def _packages(
self, source_upload_path: str,
create: bool = False) -> Tuple[Iterable[Package], str]:
create: bool = False) -> Tuple[Any, str]:
"""
Creates a iterator over packages for the given upload path. Packages are