Commit 0e108ee0 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Added a command to lift the embargo where necessary.

parent df99efb9
......@@ -12,8 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Callable, List
import click
import datetime
from elasticsearch_dsl import Q
import elasticsearch.helpers
import sys
import io
......@@ -21,6 +23,7 @@ import re
import uuid
import json
from io import StringIO
import threading
import numpy as np
import requests
......@@ -35,6 +38,72 @@ from nomad.cli.cli import cli
from nomad import config
def __run_processing(
uploads, parallel: int, process: Callable[[proc.Upload], None], label: str):
if isinstance(uploads, (tuple, list)):
uploads_count = len(uploads)
else:
uploads_count = uploads.count()
uploads = list(uploads) # copy the whole mongo query set to avoid cursor timeouts
cv = threading.Condition()
threads: List[threading.Thread] = []
state = dict(
completed_count=0,
skipped_count=0,
available_threads_count=parallel)
logger = utils.get_logger(__name__)
print('%d uploads selected, %s ...' % (uploads_count, label))
def process_upload(upload: proc.Upload):
logger.info('%s started' % label, upload_id=upload.upload_id)
completed = False
if upload.process_running:
logger.warn(
'cannot trigger %s, since the upload is already/still processing' % label,
current_process=upload.current_process,
current_task=upload.current_task, upload_id=upload.upload_id)
else:
upload.reset()
process(upload)
upload.block_until_complete(interval=.5)
if upload.tasks_status == proc.FAILURE:
logger.info('%s with failure' % label, upload_id=upload.upload_id)
completed = True
logger.info('%s complete' % label, upload_id=upload.upload_id)
with cv:
state['completed_count'] += 1 if completed else 0
state['skipped_count'] += 1 if not completed else 0
state['available_threads_count'] += 1
print(
' %s %s and skipped %s of %s uploads' %
(label, state['completed_count'], state['skipped_count'], uploads_count))
cv.notify()
for upload in uploads:
with cv:
cv.wait_for(lambda: state['available_threads_count'] > 0)
state['available_threads_count'] -= 1
thread = threading.Thread(target=lambda: process_upload(upload))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
@cli.group(help='''The nomad admin commands to do nasty stuff directly on the databases.
Remember: With great power comes great responsibility!''')
@click.pass_context
......@@ -57,6 +126,46 @@ def reset(remove, i_am_really_sure):
infrastructure.reset(remove)
@admin.command(help='Check and lift embargo of data with expired embargo period.')
@click.option('--dry', is_flag=True, help='Do not lift the embargo, just show what needs to be done.')
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
def lift_embargo(dry, parallel):
infrastructure.setup_logging()
infrastructure.setup_mongo()
infrastructure.setup_elastic()
request = search.SearchRequest()
request.q = Q('term', with_embargo=True) & Q('term', published=True)
request.quantity('upload_id', 1000)
result = request.execute()
uploads_to_repack = []
for upload_id in result['quantities']['upload_id']['values']:
upload = proc.Upload.get(upload_id)
embargo_length = upload.embargo_length
if embargo_length is None:
embargo_length = 36
upload.embargo_length = 36
if upload.upload_time + datetime.timedelta(days=int(embargo_length * 365 / 12)) < datetime.datetime.now():
print('need to lift the embargo of %s (upload_time=%s, embargo=%d)' % (
upload.upload_id, upload.upload_time, embargo_length))
if not dry:
proc.Calc._get_collection().update_many(
{'upload_id': upload_id},
{'$set': {'metadata.with_embargo': False}})
uploads_to_repack.append(upload)
upload.save()
upload_with_metadata = upload.to_upload_with_metadata()
calcs = upload_with_metadata.calcs
search.index_all(calcs)
if not dry:
__run_processing(uploads_to_repack, parallel, lambda upload: upload.re_pack(), 're-packing')
@admin.command(help='(Re-)index all calcs.')
@click.option('--threads', type=int, default=1, help='Number of threads to use.')
@click.option('--dry', is_flag=True, help='Do not index, just compute entries.')
......
......@@ -12,16 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List, Callable
from typing import List
import click
from tabulate import tabulate
from mongoengine import Q
from pymongo import UpdateOne
import threading
import elasticsearch_dsl as es
from nomad import processing as proc, config, infrastructure, utils, search, files, datamodel
from .admin import admin
from .admin import admin, __run_processing
@admin.group(help='Upload related commands')
......@@ -231,75 +230,13 @@ def rm(ctx, uploads, skip_es, skip_mongo, skip_files):
delete_upload(upload, skip_es=skip_es, skip_mongo=skip_mongo, skip_files=skip_files)
def __run_processing(
ctx, uploads, parallel: int, process: Callable[[proc.Upload], None], label: str):
_, uploads = query_uploads(ctx, uploads)
uploads_count = uploads.count()
uploads = list(uploads) # copy the whole mongo query set to avoid cursor timeouts
cv = threading.Condition()
threads: List[threading.Thread] = []
state = dict(
completed_count=0,
skipped_count=0,
available_threads_count=parallel)
logger = utils.get_logger(__name__)
print('%d uploads selected, %s ...' % (uploads_count, label))
def process_upload(upload: proc.Upload):
logger.info('%s started' % label, upload_id=upload.upload_id)
completed = False
if upload.process_running:
logger.warn(
'cannot trigger %s, since the upload is already/still processing' % label,
current_process=upload.current_process,
current_task=upload.current_task, upload_id=upload.upload_id)
else:
upload.reset()
process(upload)
upload.block_until_complete(interval=.5)
if upload.tasks_status == proc.FAILURE:
logger.info('%s with failure' % label, upload_id=upload.upload_id)
completed = True
logger.info('%s complete' % label, upload_id=upload.upload_id)
with cv:
state['completed_count'] += 1 if completed else 0
state['skipped_count'] += 1 if not completed else 0
state['available_threads_count'] += 1
print(
' %s %s and skipped %s of %s uploads' %
(label, state['completed_count'], state['skipped_count'], uploads_count))
cv.notify()
for upload in uploads:
with cv:
cv.wait_for(lambda: state['available_threads_count'] > 0)
state['available_threads_count'] -= 1
thread = threading.Thread(target=lambda: process_upload(upload))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
@uploads.command(help='Reprocess selected uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
@click.pass_context
def re_process(ctx, uploads, parallel: int):
__run_processing(ctx, uploads, parallel, lambda upload: upload.re_process_upload(), 're-processing')
_, uploads = query_uploads(ctx, uploads)
__run_processing(uploads, parallel, lambda upload: upload.re_process_upload(), 're-processing')
@uploads.command(help='Repack selected uploads.')
......@@ -307,7 +244,8 @@ def re_process(ctx, uploads, parallel: int):
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
@click.pass_context
def re_pack(ctx, uploads, parallel: int):
__run_processing(ctx, uploads, parallel, lambda upload: upload.re_pack(), 're-packing')
_, uploads = query_uploads(ctx, uploads)
__run_processing(uploads, parallel, lambda upload: upload.re_pack(), 're-packing')
@uploads.command(help='Attempt to abort the processing of uploads.')
......
......@@ -17,12 +17,14 @@ import pytest
import click.testing
import json
import mongoengine
import datetime
from nomad import utils, search, processing as proc, files
from nomad.cli import cli
from nomad.processing import Upload, Calc
from tests.app.test_app import BlueprintClient
from tests.utils import assert_exception
# TODO there is much more to test
......@@ -66,6 +68,32 @@ class TestAdmin:
assert Calc.objects(upload_id=upload_id).first() is None
assert search.SearchRequest().search_parameter('upload_id', upload_id).execute()['total'] > 0
@pytest.mark.parametrize('upload_time,dry,lifted', [
(datetime.datetime.now(), False, False),
(datetime.datetime(year=2012, month=1, day=1), True, False),
(datetime.datetime(year=2012, month=1, day=1), False, True)])
def test_lift_embargo(self, published, upload_time, dry, lifted):
upload_id = published.upload_id
published.upload_time = upload_time
published.save()
calc = Calc.objects(upload_id=upload_id).first()
assert published.upload_files.exists()
assert calc.metadata['with_embargo']
assert search.SearchRequest().owner('public').search_parameter('upload_id', upload_id).execute()['total'] == 0
with assert_exception():
files.UploadFiles.get(upload_id=upload_id).archive_file(calc_id=calc.calc_id)
result = click.testing.CliRunner().invoke(
cli, ['admin', 'lift-embargo'] + (['--dry'] if dry else []),
catch_exceptions=False, obj=utils.POPO())
assert result.exit_code == 0
assert not Calc.objects(upload_id=upload_id).first().metadata['with_embargo'] == lifted
assert (search.SearchRequest().owner('public').search_parameter('upload_id', upload_id).execute()['total'] > 0) == lifted
if lifted:
assert files.UploadFiles.get(upload_id=upload_id).archive_file(calc_id=calc.calc_id)
def test_index(self, published):
upload_id = published.upload_id
calc = Calc.objects(upload_id=upload_id).first()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment