Commit e649a4c5 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Merge branch 'migration' of gitlab.mpcdf.mpg.de:nomad-lab/nomad-FAIR into migration

parents 09798467 ee9b5ba1
Pipeline #44802 canceled with stages
in 7 minutes and 50 seconds
......@@ -9,8 +9,8 @@ import math
from urllib.parse import urlparse
from concurrent.futures import ThreadPoolExecutor
nomad_url = 'http://enc-staging-nomad.esc.rzg.mpg.de/fairdi/nomad/migration/api'
# nomad_url = 'http://localhost:8000/nomad/api/'
# nomad_url = 'http://enc-staging-nomad.esc.rzg.mpg.de/fairdi/nomad/migration/api'
nomad_url = 'http://localhost:8000/nomad/api/'
user = 'admin'
password = 'password'
......@@ -22,15 +22,7 @@ http_client = RequestsClient()
http_client.set_basic_auth(host, user, password)
client = SwaggerClient.from_url('%s/swagger.json' % nomad_url, http_client=http_client)
# uploads = [upload.upload_id for upload in client.uploads.get_uploads().response().result]
uploads = [
'SJSSLPKiR1mm6a_kSJAGZQ',
'nfMmrzGBTjmx77g9_C59fw',
'pglT7PYDQ0aB69FBbgIkRg',
'KXaaC5RASryl0wfuiEEUMA',
'xRSZ4xCnQ7uRL-uhtL3cSw'
]
uploads = [upload.upload_id for upload in client.uploads.get_uploads().response().result]
executor = ThreadPoolExecutor(max_workers=10)
......
......@@ -84,7 +84,8 @@ upload_model = api.inherit('UploadProcessing', proc_model, {
'using the name query parameter.'),
'upload_id': fields.String(
description='The unique id for the upload.'),
'metadata': fields.Nested(model=upload_metadata_model, description='Additional upload and calculation meta data.'),
# TODO just removed during migration, where this get particularily large
# 'metadata': fields.Nested(model=upload_metadata_model, description='Additional upload and calculation meta data.'),
'local_path': fields.String,
'upload_time': RFC3339DateTime(),
})
......
......@@ -46,8 +46,10 @@ from typing import Type, Callable, Tuple
import datetime
from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey
from sqlalchemy.orm import relationship
import filelock
import os.path
from nomad import utils, infrastructure
from nomad import utils, infrastructure, config
from nomad.datamodel import UploadWithMetadata
from .calc import Calc, PublishContext
......@@ -125,9 +127,23 @@ class Upload(Base): # type: ignore
assert upload.uploader is not None
logger = utils.get_logger(__name__, upload_id=upload.upload_id)
publish_filelock = filelock.FileLock(
os.path.join(config.fs.tmp, 'publish.lock'))
logger.info('waiting for filelock')
while True:
try:
publish_filelock.acquire(timeout=15 * 60, poll_intervall=1)
logger.info('acquired filelock')
break
except filelock.Timeout:
logger.warning('could not acquire publish lock after generous timeout')
repo_db = infrastructure.repository_db
repo_db.expunge_all()
repo_db.begin()
def fill_publish_transaction() -> Tuple[Upload, bool]:
try:
has_calcs = False
# create upload
......@@ -153,56 +169,52 @@ class Upload(Base): # type: ignore
coe_calc.apply_calc_with_metadata(calc, context=context)
logger.debug('added calculation, not yet committed', calc_id=coe_calc.calc_id)
return coe_upload, has_calcs
logger.info('filled publish transaction')
repo_db.expunge_all()
repo_db.begin()
try:
coe_upload, has_calcs = fill_publish_transaction()
upload_id = -1
if has_calcs:
repo_db.commit()
logger.info('committed publish transaction')
upload_id = coe_upload.coe_upload_id
else:
# empty upload case
repo_db.rollback()
return -1
logger.info('added upload')
return upload_id
except Exception as e:
logger.error('Unexpected exception.', exc_info=e)
repo_db.rollback()
raise e
finally:
publish_filelock.release()
logger.info('released filelock')
# commit
def complete(commit: bool) -> int:
upload_to_commit = coe_upload
try:
if commit:
if has_calcs:
last_error = None
repeat_count = 0
while True:
try:
repo_db.commit()
break
except Exception as e:
repo_db.rollback()
error = str(e)
if last_error != error:
last_error = error
logger.info(
'repeat publish transaction',
error=error, repeat_count=repeat_count)
repo_db.expunge_all()
repo_db.begin()
upload_to_commit, _ = fill_publish_transaction()
repeat_count += 1
else:
raise e
return upload_to_commit.coe_upload_id
repo_db.commit()
logger.info('committed publish transaction')
release_lock()
return coe_upload.coe_upload_id
else:
# empty upload case
repo_db.rollback()
release_lock()
return -1
logger.info('added upload')
else:
repo_db.rollback()
repo_db.expunge_all()
release_lock()
return -1
except Exception as e:
logger.error('Unexpected exception.', exc_info=e)
release_lock()
raise e
return complete
......@@ -753,7 +753,7 @@ class NomadCOEMigration:
# check for processing errors
with utils.timer(logger, 'checked upload processing'):
per_page = 200
per_page = 500
for page in range(1, math.ceil(upload_total_calcs / per_page) + 1):
upload = self.nomad(
'uploads.get_upload', upload_id=upload.upload_id, per_page=per_page,
......
......@@ -400,41 +400,36 @@ class Upload(Proc):
with utils.timer(
logger, 'upload added to repository', step='repo',
upload_size=self.upload_files.size):
upload_transaction_complete = coe_repo.Upload.publish(upload_with_metadata)
coe_repo.Upload.publish(upload_with_metadata)
try:
with utils.timer(
logger, 'staged upload files packed', step='pack',
upload_size=self.upload_files.size):
coe_upload = coe_repo.Upload.from_upload_id(upload_with_metadata.upload_id)
if coe_upload is not None:
for coe_calc in coe_upload.calcs:
calc_metadata = coe_calc.to_calc_with_metadata()
calc_metadata.published = True
self.upload_files.metadata.update(
calc_id=calc_metadata.calc_id, updates=calc_metadata.to_dict())
logger.info('metadata updated after publish to coe repo', step='publish')
self.upload_files.pack()
with utils.timer(
logger, 'index updated', step='index',
upload_size=self.upload_files.size):
coe_upload = coe_repo.Upload.from_upload_id(upload_with_metadata.upload_id)
if coe_upload is not None:
search.publish(
[coe_calc.to_calc_with_metadata() for coe_calc in coe_upload.calcs])
with utils.timer(
logger, 'staged upload deleted', step='delete staged',
upload_size=self.upload_files.size):
self.upload_files.delete()
self.delete()
except Exception as e:
upload_transaction_complete(False)
raise e
with utils.timer(
logger, 'staged upload files packed', step='pack',
upload_size=self.upload_files.size):
coe_upload = coe_repo.Upload.from_upload_id(upload_with_metadata.upload_id)
if coe_upload is not None:
for coe_calc in coe_upload.calcs:
calc_metadata = coe_calc.to_calc_with_metadata()
calc_metadata.published = True
self.upload_files.metadata.update(
calc_id=calc_metadata.calc_id, updates=calc_metadata.to_dict())
logger.info('metadata updated after publish to coe repo', step='publish')
self.upload_files.pack()
with utils.timer(
logger, 'index updated', step='index',
upload_size=self.upload_files.size):
coe_upload = coe_repo.Upload.from_upload_id(upload_with_metadata.upload_id)
if coe_upload is not None:
search.publish(
[coe_calc.to_calc_with_metadata() for coe_calc in coe_upload.calcs])
with utils.timer(
logger, 'staged upload deleted', step='delete staged',
upload_size=self.upload_files.size):
self.upload_files.delete()
self.delete()
upload_transaction_complete(True)
return True # do not save the process status on the delete upload
@process
......
......@@ -273,6 +273,7 @@ def aggregate_search(
"""
search = Search(index=config.elastic.index_name)
if q is not None:
search = search.query(q)
......@@ -304,6 +305,8 @@ def aggregate_search(
raise KeyError('Unknown order quantity %s' % order_by)
search = search.sort(order_by if order == 1 else '-%s' % order_by)
search = search.source(exclude=['quantities'])
response = search[(page - 1) * per_page: page * per_page].execute() # pylint: disable=E1101
total_results = response.hits.total
......
#!/bin/sh
# add yum repo
cat <<EOF > /etc/yum.repos.d/kubernetes.repo
[kubernetes]
name=Kubernetes
baseurl=https://packages.cloud.google.com/yum/repos/kubernetes-el7-x86_64
enabled=1
gpgcheck=1
repo_gpgcheck=1
gpgkey=https://packages.cloud.google.com/yum/doc/yum-key.gpg https://packages.cloud.google.com/yum/doc/rpm-package-key.gpg
exclude=kube*
EOF
# delete old
yum erase -y kubelet kubeadm kubectl --disableexcludes=kubernetes
# Set SELinux in permissive mode (effectively disabling it)
setenforce 0
sed -i 's/^SELINUX=enforcing$/SELINUX=permissive/' /etc/selinux/config
yum install -y kubelet kubeadm kubectl --disableexcludes=kubernetes
systemctl enable --now kubelet
# firewall
firewall-cmd --permanent --add-port=6443/tcp
firewall-cmd --permanent --add-port=2379-2380/tcp
firewall-cmd --permanent --add-port=10250/tcp
firewall-cmd --permanent --add-port=10251/tcp
firewall-cmd --permanent --add-port=10252/tcp
firewall-cmd --permanent --add-port=10255/tcp
firewall-cmd --reload
modprobe br_netfilter
# routing
cat <<EOF > /etc/sysctl.d/k8s.conf
net.bridge.bridge-nf-call-ip6tables = 1
net.bridge.bridge-nf-call-iptables = 1
EOF
sysctl --system
# start it up
systemctl daemon-reload
systemctl restart kubelet
echo "Still have to use kubeadm init/join"
\ No newline at end of file
......@@ -107,17 +107,7 @@ def test_add_normalized_calc_with_metadata(
def test_add_upload(processed: processing.Upload):
upload_with_metadata = processed.to_upload_with_metadata()
Upload.publish(upload_with_metadata)(True)
assert_coe_upload(processed.upload_id, upload_with_metadata)
def test_rollback_upload(processed: processing.Upload, postgres):
assert Upload.from_upload_id(processed.upload_id) is None
upload_with_metadata = processed.to_upload_with_metadata()
assert Upload.publish(upload_with_metadata)(False) == -1
assert Upload.from_upload_id(processed.upload_id) is None
Upload.publish(upload_with_metadata)(True)
Upload.publish(upload_with_metadata)
assert_coe_upload(processed.upload_id, upload_with_metadata)
......@@ -142,14 +132,14 @@ def test_rollback_upload(processed: processing.Upload, postgres):
# import time
# start = time.time()
# upload_with_metadata.calcs = many_calcs()
# Upload.publish(upload_with_metadata)(True)
# Upload.publish(upload_with_metadata)
# print('########### %d' % (time.time() - start))
def test_add_upload_with_metadata(processed, example_user_metadata):
processed.metadata = example_user_metadata
upload_with_metadata = processed.to_upload_with_metadata()
Upload.publish(upload_with_metadata)(True)
Upload.publish(upload_with_metadata)
assert_coe_upload(
processed.upload_id, upload_with_metadata)
......
......@@ -57,6 +57,7 @@ def test_search(elastic, normalized: parsing.LocalBackend):
assert hits[0]['calc_id'] == calc_with_metadata.calc_id
assert 'bulk' in aggs['system']
assert aggs['system']['bulk'] == 1
assert 'quantities' not in hits[0]
def test_authors(elastic, normalized: parsing.LocalBackend, test_user: coe_repo.User, other_test_user: coe_repo.User):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment