Commit 26b1b913 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Merged lazyimport.

parent c7224e50
Pipeline #71910 failed with stages
in 13 minutes and 30 seconds
#!/usr/bin/env bash
if [[ $(basename $SHELL) = 'bash' ]];
then
if [ -f ~/.bashrc ];
then
echo "Installing bash autocompletion..."
grep -q '_NOMAD_COMPLETE' ~/.bashrc
if [[ $? -ne 0 ]]; then
echo "" >> ~/.bashrc
echo 'eval "$(_NOMAD_COMPLETE=source nomad)"' >> ~/.bashrc
fi
fi
elif [[ $(basename $SHELL) = 'zsh' ]];
then
if [ -f ~/.zshrc ];
then
echo "Installing zsh autocompletion..."
grep -q 'nomad-autocompletion' ~/.zshrc
if [[ $? -ne 0 ]]; then
echo "" >> ~/.zshrc
echo "autoload bashcompinit" >> ~/.zshrc
echo "bashcompinit" >> ~/.zshrc
echo 'eval "$(_NOMAD_COMPLETE=source nomad)"' >> ~/.nomad-autocompletion.sh
echo "source ~/.nomad-autocompletion.sh" >> ~/.zshrc
fi
fi
fi
\ No newline at end of file
from types import ModuleType
import sys
from importlib._bootstrap import _ImportLockContext
from six import raise_from
from importlib import reload as reload_module
__all__ = ['lazy_module', 'LazyModule', '_MSG']
_CLS_ATTRS = (
'_lazy_import_error_strings', '_lazy_import_error_msgs', '_lazy_import_callables',
'_lazy_import_submodules', '__repr__'
)
_DICT_DELETION = ('_lazy_import_submodules',)
_MSG = ("{caller} attempted to use a functionality that requires module "
"{module}, but it couldn't be loaded. Please install {install_name} "
"and retry.")
class LazyModule(ModuleType):
def __getattribute__(self, attr):
if attr not in ('__name__', '__class__', '__spec__'):
try:
name = '%s.%s' % (self.__name__, attr)
return sys.modules[name]
except KeyError:
pass
try:
return type(self)._lazy_import_callables[attr]
except (AttributeError, KeyError):
_load_module(self)
return super(LazyModule, self).__getattribute__(attr)
def __setattr__(self, attr, value):
_load_module(self)
return super(LazyModule, self).__setattr__(attr, value)
def _clean_lazy_submodule_refs(module):
module_class = type(module)
for entry in _DICT_DELETION:
try:
names = getattr(module_class, entry)
except AttributeError:
continue
for name in names:
try:
super(LazyModule, module).__delattr__(name)
except AttributeError:
pass
def _clean_lazymodule(module):
module_class = type(module)
_clean_lazy_submodule_refs(module)
module_class.__getattribute__ = ModuleType.__getattribute__
module_class.__setattr__ = ModuleType.__setattr__
class_attrs = {}
for attr in _CLS_ATTRS:
try:
class_attrs[attr] = getattr(module_class, attr)
delattr(module_class, attr)
except AttributeError:
pass
return class_attrs
def _reset_lazy_submodule_refs(module):
module_class = type(module)
for entry in _DICT_DELETION:
try:
names = getattr(module_class, entry)
except AttributeError:
continue
for name, submodule in names.items():
super(LazyModule, module).__setattr__(name, submodule)
def _reset_lazymodule(module, class_attrs):
module_class = type(module)
del module_class.__getattribute__
del module_class.__setattr__
try:
del module_class._LOADING
except AttributeError:
pass
for attr in _CLS_ATTRS:
try:
setattr(module_class, attr, class_attrs[attr])
except KeyError:
pass
_reset_lazy_submodule_refs(module)
def _load_module(module):
module_class = type(module)
if not issubclass(module_class, LazyModule):
raise TypeError('Not an instance of LazyModule')
with _ImportLockContext():
parent, _, module_name = module.__name__.rpartition('.')
if not hasattr(module_class, '_lazy_import_error_msgs'):
return
module_class._LOADING = True
try:
if parent:
setattr(sys.modules[parent], module_name, module)
if not hasattr(module_class, '_LOADING'):
return
cached_data = _clean_lazymodule(module)
try:
reload_module(module)
except Exception:
_reset_lazymodule(module, cached_data)
raise
else:
delattr(module_class, '_LOADING')
_reset_lazy_submodule_refs(module)
except (AttributeError, ImportError):
msg = module_class._lazy_import_error_msgs['msg']
raise_from(ImportError(msg.format(**module_class._lazy_import_error_strings)), None)
def _lazy_module(module_name, error_strings):
with _ImportLockContext():
full_module_name = module_name
full_submodule_name = None
submodule_name = ''
while module_name:
try:
module = sys.modules[module_name]
module_name = ''
except KeyError:
err_strs = error_strings.copy()
err_strs.setdefault('module', module_name)
class _LazyModule(LazyModule):
_lazy_import_error_msgs = {'msg': err_strs.pop('msg')}
msg_callable = err_strs.pop('msg_callable', None)
if msg_callable:
_lazy_import_error_msgs['msg_callable'] = msg_callable
_lazy_import_error_strings = err_strs
_lazy_import_callables = {}
_lazy_import_submodules = {}
def __repr__(self):
return 'Lazily-loaded module %s' % self.__name__
_LazyModule.__name__ = 'module'
module = sys.modules[module_name] = _LazyModule(module_name)
if full_submodule_name:
submodule = sys.modules[full_submodule_name]
ModuleType.__setattr__(module, submodule_name, submodule)
_LazyModule._lazy_import_submodules[submodule_name] = submodule
full_submodule_name = module_name
module_name, _, submodule_name = module_name.rpartition('.')
return sys.modules[full_module_name]
def lazy_module(module_name, level='leaf'):
module_base_name = module_name.partition('.')[0]
error_strings = {}
try:
caller = sys._getframe(3).f_globals['__name__']
except AttributeError:
caller = 'Python'
error_strings.setdefault('caller', caller)
error_strings.setdefault('install_name', module_base_name)
error_strings.setdefault('msg', _MSG)
module = _lazy_module(module_name, error_strings)
if level == 'base':
return sys.modules[module_base_name]
elif level == 'leaf':
return module
else:
raise ValueError('Must be base or leaf')
......@@ -19,11 +19,26 @@ that offers various functionality to the command line user.
Use it from the command line with ``nomad --help`` or ``python -m nomad.cli --help`` to learn
more.
'''
import lazy_import
from nomad.utils import POPO
lazy_import.lazy_module('click')
lazy_import.lazy_module('logging')
lazy_import.lazy_module('os')
lazy_import.lazy_module('typing')
lazy_import.lazy_module('json')
lazy_import.lazy_module('sys')
lazy_import.lazy_module('nomad.config')
lazy_import.lazy_module('nomad.infrastructure')
lazy_import.lazy_module('nomad.utils')
lazy_import.lazy_module('nomad.parsing')
lazy_import.lazy_module('nomad.normalizing')
lazy_import.lazy_module('nomad.datamodel')
lazy_import.lazy_module('nomadcore')
from . import dev, admin, client, parse
from .cli import cli
from nomad.utils import POPO # noqa
from . import dev, admin, client, parse # noqa
from .cli import cli # noqa
def run_cli():
......
......@@ -12,5 +12,42 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import lazy_import
from . import admin, uploads, entries, run, clean, users
lazy_import.lazy_module('typing')
lazy_import.lazy_module('click')
lazy_import.lazy_module('asyncio')
lazy_import.lazy_module('concurrent.futures')
lazy_import.lazy_module('datetime')
lazy_import.lazy_module('time')
lazy_import.lazy_module('elasticsearch_dsl')
lazy_import.lazy_module('elasticsearch')
lazy_import.lazy_module('os')
lazy_import.lazy_module('shutil')
lazy_import.lazy_module('tabulate')
lazy_import.lazy_module('sys')
lazy_import.lazy_module('io')
lazy_import.lazy_module('re')
lazy_import.lazy_module('uuid')
lazy_import.lazy_module('json')
lazy_import.lazy_module('threading')
lazy_import.lazy_module('numpy')
lazy_import.lazy_module('requests')
lazy_import.lazy_module('pymongo')
lazy_import.lazy_module('mongoengine')
lazy_import.lazy_module('ase')
lazy_import.lazy_module('bs4')
lazy_import.lazy_module('matid')
lazy_import.lazy_module('matid.symmetry.symmetryanalyzer')
lazy_import.lazy_module('matid.utils.segfault_protect')
lazy_import.lazy_module('nomad.normalizing')
lazy_import.lazy_module('nomad.processing')
lazy_import.lazy_module('nomad.search')
lazy_import.lazy_module('nomad.datamodel')
lazy_import.lazy_module('nomad.infrastructure')
lazy_import.lazy_module('nomad.utils')
lazy_import.lazy_module('nomad.config')
lazy_import.lazy_module('nomad.files')
lazy_import.lazy_module('nomad.archive')
from . import admin, uploads, entries, run, clean, users # noqa
......@@ -11,37 +11,31 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Callable, List
import typing
import click
import datetime
from elasticsearch_dsl import Q
import elasticsearch.helpers
import elasticsearch_dsl
import elasticsearch
import sys
import io
import re
import uuid
import json
from io import StringIO
import threading
import numpy as np
import requests
from ase import Atoms
import ase.io
from bs4 import BeautifulSoup
from matid import SymmetryAnalyzer
import ase
import bs4
import matid
from nomad import processing as proc, search, datamodel, infrastructure, utils, config
from nomad.normalizing.aflow_prototypes import get_normalized_wyckoff
from nomad import normalizing
from nomad.cli.cli import cli
from nomad import config
from nomad.normalizing.springer import update_springer_data
def __run_processing(
uploads, parallel: int, process: Callable[[proc.Upload], None], label: str,
reprocess_running: bool = False):
uploads, parallel: int, process, label: str, reprocess_running: bool = False):
if isinstance(uploads, (tuple, list)):
uploads_count = len(uploads)
......@@ -50,7 +44,7 @@ def __run_processing(
uploads = list(uploads) # copy the whole mongo query set to avoid cursor timeouts
cv = threading.Condition()
threads: List[threading.Thread] = []
threads: typing.List[threading.Thread] = []
state = dict(
completed_count=0,
......@@ -137,7 +131,7 @@ def lift_embargo(dry, parallel):
infrastructure.setup_elastic()
request = search.SearchRequest()
request.q = Q('term', with_embargo=True) & Q('term', published=True)
request.q = elasticsearch_dsl.Q('term', with_embargo=True) & elasticsearch_dsl.Q('term', published=True)
request.quantity('upload_id', 1000)
result = request.execute()
......@@ -413,7 +407,7 @@ def prototypes_update(ctx, filepath, matches_only):
newdict = {}
# Make prototype plaintext
prototype = BeautifulSoup(protodict["Prototype"], "html5lib").getText()
prototype = bs4.BeautifulSoup(protodict["Prototype"], "html5lib").getText()
# Add to new dictionary
newdict['Notes'] = protodict['Notes']
......@@ -425,12 +419,12 @@ def prototypes_update(ctx, filepath, matches_only):
newdict['aflow_prototype_id'] = protodict['AFLOW Prototype']
newdict['aflow_prototype_url'] = 'http://www.aflowlib.org/CrystalDatabase/' + protodict['href'][2:]
# Download cif or poscar if possible make ASE Atoms object if possible
# Download cif or poscar if possible make ASE ase.Atoms object if possible
# to obtain labels, positions, cell
cifurl = 'http://www.aflowlib.org/CrystalDatabase/CIF/' + protodict['href'][2:-5] + '.cif'
r = requests.get(cifurl, allow_redirects=True)
cif_str = r.content.decode("utf-8")
cif_file = StringIO()
cif_file = io.StringIO()
cif_file.write(cif_str)
cif_file.seek(0)
try:
......@@ -442,7 +436,7 @@ def prototypes_update(ctx, filepath, matches_only):
poscarurl = 'http://www.aflowlib.org/CrystalDatabase/POSCAR/' + protodict['href'][2:-5] + '.poscar'
r = requests.get(poscarurl, allow_redirects=True)
poscar_str = r.content.decode("utf-8")
poscar_file = StringIO()
poscar_file = io.StringIO()
poscar_file.write(poscar_str)
poscar_file.seek(0)
atoms = ase.io.read(poscar_file, format='vasp')
......@@ -496,7 +490,7 @@ def prototypes_update(ctx, filepath, matches_only):
pos = np.array(prototype["atom_positions"])
labels = prototype["atom_labels"]
cell = np.array(prototype["lattice_vectors"])
atoms = Atoms(
atoms = ase.Atoms(
symbols=labels,
positions=pos,
cell=cell,
......@@ -506,7 +500,7 @@ def prototypes_update(ctx, filepath, matches_only):
# Try to first see if the space group can be matched with the one in AFLOW
tolerance = config.normalize.symmetry_tolerance
try:
symm = SymmetryAnalyzer(atoms, tolerance)
symm = matid.SymmetryAnalyzer(atoms, tolerance)
spg_number = symm.get_space_group_number()
wyckoff_matid = symm.get_wyckoff_letters_conventional()
norm_system = symm.get_conventional_system()
......@@ -517,7 +511,7 @@ def prototypes_update(ctx, filepath, matches_only):
# letters to the data.
if spg_number == aflow_spg_number:
atomic_numbers = norm_system.get_atomic_numbers()
normalized_wyckoff_matid = get_normalized_wyckoff(atomic_numbers, wyckoff_matid)
normalized_wyckoff_matid = normalizing.aflow_prototypes.get_normalized_wyckoff(atomic_numbers, wyckoff_matid)
prototype["normalized_wyckoff_matid"] = normalized_wyckoff_matid
else:
n_unmatched += 1
......@@ -537,4 +531,4 @@ def prototypes_update(ctx, filepath, matches_only):
@click.option('--max-n-query', default=10, type=int, help='Number of unsuccessful springer request before returning an error. Default is 10.')
@click.option('--retry-time', default=120, type=int, help='Time in seconds to retry after unsuccessful request. Default is 120.')
def springer_update(max_n_query, retry_time):
update_springer_data(max_n_query, retry_time)
normalizing.springer.update_springer_data(max_n_query, retry_time)
......@@ -15,11 +15,11 @@
import click
import os
import shutil
from tabulate import tabulate
from elasticsearch_dsl import A
import tabulate
import elasticsearch_dsl
from nomad import config as nomad_config, infrastructure, processing
from nomad.search import Search
from nomad import search as nomad_search
from .admin import admin
......@@ -53,7 +53,7 @@ def clean(dry, skip_calcs, skip_fs, skip_es, staging_too, force):
for upload in missing_uploads:
mongo_client[nomad_config.mongo.db_name]['calc'].remove(dict(upload_id=upload))
Search(index=nomad_config.elastic.index_name).query('term', upload_id=upload).delete()
elasticsearch_dsl.Search(index=nomad_config.elastic.index_name).query('term', upload_id=upload).delete()
else:
print('Found %s uploads that have calcs in mongo, but there is no upload entry.' % len(missing_uploads))
print('List first 10:')
......@@ -103,8 +103,8 @@ def clean(dry, skip_calcs, skip_fs, skip_es, staging_too, force):
print(path)
if not skip_es:
search = Search(index=nomad_config.elastic.index_name)
search.aggs.bucket('uploads', A('terms', field='upload_id', size=12000))
search = nomad_search.Search(index=nomad_config.elastic.index_name)
search.aggs.bucket('uploads', elasticsearch_dsl.A('terms', field='upload_id', size=12000))
response = search.execute()
to_delete = list(
......@@ -122,8 +122,8 @@ def clean(dry, skip_calcs, skip_fs, skip_es, staging_too, force):
'Will delete %d calcs in %d uploads from ES. Press any key to continue ...' %
(calcs, len(to_delete)))
for upload, _ in to_delete:
Search(index=nomad_config.elastic.index_name).query('term', upload_id=upload).delete()
nomad_search.Search(index=nomad_config.elastic.index_name).query('term', upload_id=upload).delete()
else:
print('Found %d calcs in %d uploads from ES with no upload in mongo.' % (calcs, len(to_delete)))
print('List first 10:')
tabulate(to_delete, headers=['id', '#calcs'])
tabulate.tabulate(to_delete, headers=['id', '#calcs'])
......@@ -12,22 +12,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, Any
from mongoengine import Document, IntField, StringField, DictField
from pymongo import UpdateOne
import typing
import mongoengine
import pymongo
import time
import datetime
import json
from nomad import utils, processing as proc, search
from nomad.datamodel import EntryMetadata
from nomad.cli.client.mirror import transform_reference, tarnsform_user_id, transform_dataset
from nomad import datamodel
from nomad.cli.client import mirror
__logger = utils.get_logger(__name__)
class SourceCalc(Document):
class SourceCalc(mongoengine.Document):
'''
Mongo document used as a calculation, upload, and metadata db and index
build from a given source db. Each :class:`SourceCacl` entry relates
......@@ -36,12 +37,12 @@ class SourceCalc(Document):
specific path segment that identifies an upload on the CoE repo FS(s) without
any prefixes (e.g. $EXTRACTED, /data/upload, etc.)
'''
pid = IntField(primary_key=True)
mainfile = StringField()
upload = StringField()
metadata = DictField()
pid = mongoengine.IntField(primary_key=True)
mainfile = mongoengine.StringField()
upload = mongoengine.StringField()
metadata = mongoengine.DictField()
migration_version = IntField(default=-1)
migration_version = mongoengine.IntField(default=-1)
extracted_prefix = '$EXTRACTED/'
sites = ['/data/nomad/extracted/', '/nomad/repository/extracted/']
......@@ -67,7 +68,7 @@ def update_user_metadata(bulk_size: int = 1000, update_index: bool = False, **kw
# iterate the source index in bulk
size = SourceCalc.objects(**kwargs).count()
count = 0
important_changes: Dict[str, Any] = dict(missing_calcs=dict(), replaced=dict(), lifted_embargo=list())
important_changes: typing.Dict[str, typing.Any] = dict(missing_calcs=dict(), replaced=dict(), lifted_embargo=list())
try:
for start in range(0, size, bulk_size):
......@@ -96,16 +97,16 @@ def update_user_metadata(bulk_size: int = 1000, update_index: bool = False, **kw
important_changes['missing_calcs'].setdefault(source.upload, []).append(source.pid)
continue
target_metadata = EntryMetadata(**target.metadata)
source_metadata_normalized: Dict[str, Any] = dict(
target_metadata = datamodel.EntryMetadata(**target.metadata)
source_metadata_normalized: typing.Dict[str, typing.Any] = dict(
comment=source.metadata.get('comment'),
references={transform_reference(ref) for ref in source.metadata['references']},
coauthors={tarnsform_user_id(user['id']) for user in source.metadata['coauthors']},
shared_with={tarnsform_user_id(user['id']) for user in source.metadata['shared_with']},
datasets={transform_dataset(ds) for ds in source.metadata['datasets']},
references={mirror.transform_reference(ref) for ref in source.metadata['references']},
coauthors={mirror.tarnsform_user_id(user['id']) for user in source.metadata['coauthors']},
shared_with={mirror.tarnsform_user_id(user['id']) for user in source.metadata['shared_with']},
datasets={mirror.transform_dataset(ds) for ds in source.metadata['datasets']},
with_embargo=source.metadata['with_embargo'])
target_metadata_normalized: Dict[str, Any] = dict(
target_metadata_normalized: typing.Dict[str, typing.Any] = dict(
comment=target_metadata.comment,
references=set(target_metadata.references),
coauthors=set(target_metadata.coauthors),
......@@ -115,7 +116,7 @@ def update_user_metadata(bulk_size: int = 1000, update_index: bool = False, **kw
if source_metadata_normalized != target_metadata_normalized:
# do a full update of all metadata!
update = UpdateOne(
update = pymongo.UpdateOne(
dict(_id=target.calc_id),
{
"$set": {
......
......@@ -14,7 +14,7 @@
import click
import asyncio
from concurrent.futures import ProcessPoolExecutor
from concurrent import futures as concurrent_futures
from nomad import config
from .admin import admin
......@@ -54,7 +54,7 @@ def run_worker():
@run.command(help='Run both app and worker.')
def appworker():
executor = ProcessPoolExecutor(2)
executor = concurrent_futures.ProcessPoolExecutor(2)
loop = asyncio.get_event_loop()
loop.run_in_executor(executor, run_app)
loop.run_in_executor(executor, run_worker)
......@@ -11,13 +11,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List
import typing
import click
from tabulate import tabulate
from mongoengine import Q
from elasticsearch_dsl import Q as ESQ
from pymongo import UpdateOne
import tabulate
import mongoengine
import pymongo
import elasticsearch_dsl as es
import json
......@@ -34,23 +32,25 @@ from .admin import admin, __run_processing
@click.option('--code', multiple=True, type=str, help='Select only uploads with calcs of given codes')
@click.option('--query-mongo', is_flag=True, help='Select query mongo instead of elastic search.')
@click.pass_context
def uploads(ctx, user: str, staging: bool, processing: bool, outdated: bool, code: List[str], query_mongo):
def uploads(
ctx, user: str, staging: bool, processing: bool, outdated: bool,
code: typing.List[str], query_mongo: bool):
infrastructure.setup_mongo()
infrastructure.setup_elastic()
query = Q()
query = mongoengine.Q()
if user is not None:
query &= Q(user_id=user)
query &= mongoengine.Q(user_id=user)
if staging:
query &= Q(published=False)
query &= mongoengine.Q(published=False)