data.py 21.7 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files


.. autoclass:: Calc
    :members:
.. autoclass:: Upload
    :members:
"""

27
from typing import List, Any, ContextManager, Tuple, Generator
28
from elasticsearch.exceptions import NotFoundError
Markus Scheidgen's avatar
Markus Scheidgen committed
29
from mongoengine import StringField, BooleanField, DateTimeField, DictField, IntField
Markus Scheidgen's avatar
Markus Scheidgen committed
30
import logging
31
import base64
Markus Scheidgen's avatar
Markus Scheidgen committed
32
import time
33
from structlog import wrap_logger
34
from contextlib import contextmanager
Markus Scheidgen's avatar
Markus Scheidgen committed
35

36
from nomad import config, utils, coe_repo
Markus Scheidgen's avatar
Markus Scheidgen committed
37
from nomad.files import UploadFile, ArchiveFile, ArchiveLogFile, File
Markus Scheidgen's avatar
Markus Scheidgen committed
38
from nomad.repo import RepoCalc
39
from nomad.processing.base import Proc, Chord, process, task, PENDING, SUCCESS, FAILURE, RUNNING
Markus Scheidgen's avatar
Markus Scheidgen committed
40
from nomad.parsing import parsers, parser_dict
Markus Scheidgen's avatar
Markus Scheidgen committed
41
from nomad.normalizing import normalizers
Markus Scheidgen's avatar
Markus Scheidgen committed
42
from nomad.utils import lnr
Markus Scheidgen's avatar
Markus Scheidgen committed
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72


class NotAllowedDuringProcessing(Exception): pass


class Calc(Proc):
    """
    Instances of this class represent calculations. This class manages the elastic
    search index entry, files, and archive for the respective calculation.

    It also contains the calculations processing and its state.

    The attribute list, does not include the various repository properties generated
    while parsing, including ``program_name``, ``program_version``, etc.

    Attributes:
        archive_id: the hash based archive id of the calc
        parser: the name of the parser used to process this calc
        upload_id: the id of the upload used to create this calculation
        mainfile: the mainfile (including path in upload) that was used to create this calc
        mainfile_tmp_path: path to the mainfile extracted for processing
    """
    archive_id = StringField(primary_key=True)
    upload_id = StringField()
    mainfile = StringField()
    parser = StringField()
    mainfile_tmp_path = StringField()

    meta: Any = {
        'indices': [
73
            'upload_id', 'mainfile', 'code', 'parser', 'status'
Markus Scheidgen's avatar
Markus Scheidgen committed
74
75
76
77
78
79
80
        ]
    }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._parser_backend = None
        self._upload = None
81
        self._calc_proc_logwriter = None
Markus Scheidgen's avatar
Markus Scheidgen committed
82
        self._calc_proc_logfile = None
83
        self._calc_proc_logwriter_ctx: ContextManager = None
Markus Scheidgen's avatar
Markus Scheidgen committed
84
85
86
87
88

    @classmethod
    def get(cls, id):
        return cls.get_by_id(id, 'archive_id')

Markus Scheidgen's avatar
Markus Scheidgen committed
89
90
91
92
    @property
    def mainfile_file(self) -> File:
        return File(self.mainfile_tmp_path)

93
94
95
96
97
98
    @property
    def upload(self) -> 'Upload':
        if not self._upload:
            self._upload = Upload.get(self.upload_id)
        return self._upload

Markus Scheidgen's avatar
Markus Scheidgen committed
99
100
101
102
    def delete(self):
        """
        Delete this calculation and all associated data. This includes all files,
        the archive, and this search index entry.
103
        TODO is this needed? Or do we always delete hole uploads in bulk.
Markus Scheidgen's avatar
Markus Scheidgen committed
104
105
106
        """
        # delete the archive
        if self.archive_id is not None:
107
            ArchiveFile(self.archive_id).delete()
Markus Scheidgen's avatar
Markus Scheidgen committed
108
109

        # delete the search index entry
110
111
112
113
114
115
        try:
            elastic_entry = RepoCalc.get(self.archive_id)
            if elastic_entry is not None:
                elastic_entry.delete()
        except NotFoundError:
            pass
Markus Scheidgen's avatar
Markus Scheidgen committed
116
117
118
119
120
121
122
123
124

        # delete this mongo document
        super().delete()

    def get_logger(self, **kwargs):
        upload_hash, calc_hash = self.archive_id.split('/')
        logger = super().get_logger()
        logger = logger.bind(
            upload_id=self.upload_id, mainfile=self.mainfile,
125
            upload_hash=upload_hash, calc_hash=calc_hash,
Markus Scheidgen's avatar
Markus Scheidgen committed
126
            archive_id=self.archive_id, **kwargs)
127

Markus Scheidgen's avatar
Markus Scheidgen committed
128
129
        return logger

130
131
132
133
134
135
136
137
    def get_calc_logger(self, **kwargs):
        """
        Returns a wrapped logger that additionally saves all entries to the calculation
        processing log in the archive.
        """
        logger = self.get_logger(**kwargs)

        if self._calc_proc_logwriter is None:
Markus Scheidgen's avatar
Markus Scheidgen committed
138
            self._calc_proc_logfile = ArchiveLogFile(self.archive_id)
139
140
            self._calc_proc_logwriter_ctx = self._calc_proc_logfile.open('wt')
            self._calc_proc_logwriter = self._calc_proc_logwriter_ctx.__enter__()  # pylint: disable=E1101
141

142
        def save_to_calc_log(logger, method_name, event_dict):
143
144
145
146
147
148
149
150
151
152
153
            program = event_dict.get('normalizer', 'parser')
            event = event_dict.get('event', '')
            entry = '[%s] %s: %s' % (method_name, program, event)
            if len(entry) > 120:
                self._calc_proc_logwriter.write(entry[:120])
                self._calc_proc_logwriter.write('...')
            else:
                self._calc_proc_logwriter.write(entry)
            self._calc_proc_logwriter.write('\n')
            return event_dict

154
        return wrap_logger(logger, processors=[save_to_calc_log])
155

Markus Scheidgen's avatar
Markus Scheidgen committed
156
157
    @process
    def process(self):
158
        logger = self.get_logger()
159
        if self.upload is None:
160
            logger.error('calculation upload does not exist')
Markus Scheidgen's avatar
Markus Scheidgen committed
161
162
163
164
165
166

        try:
            self.parsing()
            self.normalizing()
            self.archiving()
        finally:
Markus Scheidgen's avatar
Markus Scheidgen committed
167
            # close loghandler that was not closed due to failures
168
            try:
169
170
171
                if self._calc_proc_logwriter is not None:
                    self._calc_proc_logwriter.close()
                    self._calc_proc_logwriter = None
172
173
174
175
            except Exception as e:
                logger.error('could not close calculation proc log', exc_info=e)

            # inform parent proc about completion
176
            self.upload.completed_child()
Markus Scheidgen's avatar
Markus Scheidgen committed
177
178
179

    @task
    def parsing(self):
180
181
        context = dict(parser=self.parser, step=self.parser)
        logger = self.get_calc_logger(**context)
182
        parser = parser_dict[self.parser]
Markus Scheidgen's avatar
Markus Scheidgen committed
183

184
        with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
185
            self._parser_backend = parser.run(self.mainfile_tmp_path, logger=logger)
Markus Scheidgen's avatar
Markus Scheidgen committed
186

187
188
189
190
191
192
        self._parser_backend.openNonOverlappingSection('section_calculation_info')
        self._parser_backend.addValue('upload_id', self.upload_id)
        self._parser_backend.addValue('archive_id', self.archive_id)
        self._parser_backend.addValue('main_file', self.mainfile)
        self._parser_backend.addValue('parser_name', self.parser)

Markus Scheidgen's avatar
Markus Scheidgen committed
193
        if self._parser_backend.status[0] != 'ParseSuccess':
194
            logger.error(self._parser_backend.status[1])
Markus Scheidgen's avatar
Markus Scheidgen committed
195
            error = self._parser_backend.status[1]
196
            self._parser_backend.addValue('parse_status', 'ParseFailure')
197
            self.fail(error, level=logging.DEBUG, **context)
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
        else:
            self._parser_backend.addValue('parse_status', 'ParseSuccess')

        self._parser_backend.closeNonOverlappingSection('section_calculation_info')

        self.add_processor_info(self.parser)

    @contextmanager
    def use_parser_backend(self, processor_name):
        self._parser_backend.reset_status()
        yield self._parser_backend
        self.add_processor_info(processor_name)

    def add_processor_info(self, processor_name: str) -> None:
        self._parser_backend.openContext('/section_calculation_info/0')
        self._parser_backend.openNonOverlappingSection('section_archive_processing_info')
        self._parser_backend.addValue('archive_processor_name', processor_name)

        if self._parser_backend.status[0] == 'ParseSuccess':
            warnings = getattr(self._parser_backend, '_warnings', [])
            if len(warnings) > 0:
                self._parser_backend.addValue('archive_processor_status', 'WithWarnings')
                self._parser_backend.addValue('archive_processor_warning_number', len(warnings))
                self._parser_backend.addArrayValues('archive_processor_warnings', [str(warning) for warning in warnings])
            else:
                self._parser_backend.addValue('archive_processor_status', 'Success')
        else:
            errors = self._parser_backend.status[1]
            self._parser_backend.addValue('archive_processor_error', str(errors))

        self._parser_backend.closeNonOverlappingSection('section_archive_processing_info')
        self._parser_backend.closeContext('/section_calculation_info/0')
Markus Scheidgen's avatar
Markus Scheidgen committed
230
231
232
233
234

    @task
    def normalizing(self):
        for normalizer in normalizers:
            normalizer_name = normalizer.__name__
235
236
            context = dict(normalizer=normalizer_name, step=normalizer_name)
            logger = self.get_calc_logger(**context)
Markus Scheidgen's avatar
Markus Scheidgen committed
237
238

            with utils.timer(
239
                    logger, 'normalizer executed', input_size=self.mainfile_file.size):
240
241
                with self.use_parser_backend(normalizer_name) as backend:
                    normalizer(backend).normalize(logger=logger)
Markus Scheidgen's avatar
Markus Scheidgen committed
242

243
244
            failed = self._parser_backend.status[0] != 'ParseSuccess'
            if failed:
245
                logger.error(self._parser_backend.status[1])
Markus Scheidgen's avatar
Markus Scheidgen committed
246
                error = self._parser_backend.status[1]
247
                self.fail(error, level=logging.WARNING, **context)
248
249
250
251
                break
            else:
                logger.debug(
                    'completed normalizer successfully', normalizer=normalizer_name)
Markus Scheidgen's avatar
Markus Scheidgen committed
252
253
254

    @task
    def archiving(self):
255
256
257
258
259
        logger = self.get_logger()

        upload_hash, calc_hash = self.archive_id.split('/')
        additional = dict(
            mainfile=self.mainfile,
260
            upload_time=self.upload.upload_time,
261
262
            staging=True,
            restricted=False,
263
264
            user_id=self.upload.user_id,
            aux_files=list(self.upload.upload_file.get_siblings(self.mainfile)))
265
266

        with utils.timer(logger, 'indexed', step='index'):
267
268
269
270
271
272
            # persist to elastic search
            RepoCalc.create_from_backend(
                self._parser_backend,
                additional=additional,
                upload_hash=upload_hash,
                calc_hash=calc_hash,
273
                upload_id=self.upload_id).persist()
274

Markus Scheidgen's avatar
Markus Scheidgen committed
275
276
277
278
        with utils.timer(
                logger, 'archived', step='archive',
                input_size=self.mainfile_file.size) as log_data:

279
            # persist the archive
Markus Scheidgen's avatar
Markus Scheidgen committed
280
281
            archive_file = ArchiveFile(self.archive_id)
            with archive_file.write_archive_json() as out:
282
283
                self._parser_backend.write_json(out, pretty=True)

Markus Scheidgen's avatar
Markus Scheidgen committed
284
285
286
287
288
289
290
            log_data.update(archive_size=archive_file.size)

        # close loghandler
        if self._calc_proc_logwriter is not None:
            with utils.timer(
                    logger, 'archived log', step='archive_log',
                    input_size=self.mainfile_file.size) as log_data:
291
                self._calc_proc_logwriter_ctx.__exit__(None, None, None)  # pylint: disable=E1101
292
                self._calc_proc_logwriter = None
Markus Scheidgen's avatar
Markus Scheidgen committed
293

Markus Scheidgen's avatar
Markus Scheidgen committed
294
295
                log_data.update(log_size=self._calc_proc_logfile.size)

Markus Scheidgen's avatar
Markus Scheidgen committed
296

297
class Upload(Chord):
Markus Scheidgen's avatar
Markus Scheidgen committed
298
299
300
301
302
303
    """
    Represents uploads in the databases. Provides persistence access to the files storage,
    and processing state.

    Attributes:
        name: optional user provided upload name
304
        local_path: optional local path, e.g. for files that are already somewhere on the server
Markus Scheidgen's avatar
Markus Scheidgen committed
305
306
307
308
309
310
        additional_metadata: optional user provided additional meta data
        upload_id: the upload id generated by the database
        in_staging: true if the upload is still in staging and can be edited by the uploader
        is_private: true if the upload and its derivitaves are only visible to the uploader
        upload_time: the timestamp when the system realised the upload
        upload_hash: the hash of the uploaded file
311
        user_id: the id of the user that created this upload
Markus Scheidgen's avatar
Markus Scheidgen committed
312
313
314
315
316
317
    """
    id_field = 'upload_id'

    upload_id = StringField(primary_key=True)

    name = StringField(default=None)
318
    local_path = StringField(default=None)
Markus Scheidgen's avatar
Markus Scheidgen committed
319
320
321
322
323
324
325
326
    additional_metadata = DictField(default=None)

    in_staging = BooleanField(default=True)
    is_private = BooleanField(default=False)

    upload_time = DateTimeField()
    upload_hash = StringField(default=None)

327
    user_id = StringField(required=True)
328
329
    upload_url = StringField(default=None)
    upload_command = StringField(default=None)
Markus Scheidgen's avatar
Markus Scheidgen committed
330

331
332
    coe_repo_upload_id = IntField(default=None)

333
334
    _initiated_parsers = IntField(default=-1)

Markus Scheidgen's avatar
Markus Scheidgen committed
335
336
    meta: Any = {
        'indexes': [
337
            'upload_hash', 'user_id', 'status'
Markus Scheidgen's avatar
Markus Scheidgen committed
338
339
340
341
342
        ]
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
343
        self._upload_file = None
Markus Scheidgen's avatar
Markus Scheidgen committed
344
345
346
347
348
349

    @classmethod
    def get(cls, id):
        return cls.get_by_id(id, 'upload_id')

    @classmethod
350
    def user_uploads(cls, user: coe_repo.User) -> List['Upload']:
Markus Scheidgen's avatar
Markus Scheidgen committed
351
        """ Returns all uploads for the given user. Currently returns all uploads. """
352
        return cls.objects(user_id=str(user.user_id), in_staging=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
353
354
355
356
357
358
359
360
361

    def get_logger(self, **kwargs):
        logger = super().get_logger()
        logger = logger.bind(upload_id=self.upload_id, **kwargs)
        return logger

    def delete(self):
        logger = self.get_logger(task='delete')

362
        if not (self.completed or self.current_task == 'uploading'):
Markus Scheidgen's avatar
Markus Scheidgen committed
363
364
365
366
            raise NotAllowedDuringProcessing()

        with lnr(logger, 'delete upload file'):
            try:
367
                UploadFile(self.upload_id, local_path=self.local_path).delete()
Markus Scheidgen's avatar
Markus Scheidgen committed
368
369
370
371
372
373
374
375
376
            except KeyError:
                if self.current_task == 'uploading':
                    logger.debug(
                        'Upload exist, but file does not exist. '
                        'It was probably aborted and deleted.')
                else:
                    logger.debug('Upload exist, but uploaded file does not exist.')

        with lnr(logger, 'deleting calcs'):
377
            # delete archive files
378
            ArchiveFile.delete_archives(upload_hash=self.upload_hash)
379
380

            # delete repo entries
Markus Scheidgen's avatar
Markus Scheidgen committed
381
            RepoCalc.delete_upload(upload_id=self.upload_id)
382
383
384

            # delete calc processings
            Calc.objects(upload_id=self.upload_id).delete()
Markus Scheidgen's avatar
Markus Scheidgen committed
385
386
387
388

        with lnr(logger, 'deleting upload'):
            super().delete()

389
390
    @classmethod
    def _external_objects_url(cls, url):
391
        """ Replaces the given internal object storage url with an URL that allows
392
393
            external access.
        """
394
        return 'http://%s:%s%s%s' % (config.services.api_host, config.services.api_port, config.services.api_base_path, url)
395

Markus Scheidgen's avatar
Markus Scheidgen committed
396
397
398
399
400
401
    @classmethod
    def create(cls, **kwargs) -> 'Upload':
        """
        Creates a new upload for the given user, a user given name is optional.
        It will populate the record with a signed url and pending :class:`UploadProc`.
        The upload will be already saved to the database.
402
403

        Arguments:
404
            user (coe_repo.User): The user that created the upload.
Markus Scheidgen's avatar
Markus Scheidgen committed
405
        """
406
        user: coe_repo.User = kwargs['user']
407
408
409
        del(kwargs['user'])
        if 'upload_id' not in kwargs:
            kwargs.update(upload_id=utils.create_uuid())
410
        kwargs.update(user_id=str(user.user_id))
Markus Scheidgen's avatar
Markus Scheidgen committed
411
        self = super().create(**kwargs)
412

413
        basic_auth_token = base64.b64encode(b'%s:' % user.get_auth_token()).decode('utf-8')
414
415

        self.upload_url = cls._external_objects_url('/uploads/%s/file' % self.upload_id)
Markus Scheidgen's avatar
Markus Scheidgen committed
416
        self.upload_command = 'curl -H "Authorization: Basic %s" "%s" --upload-file local_file' % (
417
418
            basic_auth_token, self.upload_url)

Markus Scheidgen's avatar
Markus Scheidgen committed
419
        self._continue_with('uploading')
420

Markus Scheidgen's avatar
Markus Scheidgen committed
421
422
        return self

Markus Scheidgen's avatar
Markus Scheidgen committed
423
    def unstage(self):
424
        self.get_logger().info('unstage')
425
426
427
428

        if not (self.completed or self.current_task == 'uploading'):
            raise NotAllowedDuringProcessing()

Markus Scheidgen's avatar
Markus Scheidgen committed
429
        self.in_staging = False
430
        RepoCalc.unstage(upload_id=self.upload_id)
431
        coe_repo.add_upload(self, restricted=False)  # TODO allow users to choose restricted
Markus Scheidgen's avatar
Markus Scheidgen committed
432
433
        self.save()

Markus Scheidgen's avatar
Markus Scheidgen committed
434
435
436
437
438
439
440
441
442
    @process
    def process(self):
        self.extracting()
        self.parse_all()

    @task
    def uploading(self):
        pass

443
444
445
446
447
448
449
    @property
    def upload_file(self):
        """ The :class:`UploadFile` instance that represents the uploaded file of this upload. """
        if not self._upload_file:
            self._upload_file = UploadFile(self.upload_id, local_path=self.local_path)
        return self._upload_file

Markus Scheidgen's avatar
Markus Scheidgen committed
450
451
    @task
    def extracting(self):
452
453
454
455
456
457
        """
        Task performed before the actual parsing/normalizing. Extracting and bagging
        the uploaded files, computing all keys, create an *upload* entry in the NOMAD-coe
        repository db, etc.
        """
        # extract the uploaded file, this will also create a bagit bag.
Markus Scheidgen's avatar
Markus Scheidgen committed
458
459
        logger = self.get_logger()
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
460
461
            with utils.timer(
                    logger, 'upload extracted', step='extracting',
462
463
                    upload_size=self.upload_file.size):
                self.upload_file.extract()
Markus Scheidgen's avatar
Markus Scheidgen committed
464
465
466
467
        except KeyError as e:
            self.fail('process request for non existing upload', level=logging.INFO)
            return

468
        # create and save a hash for the upload
Markus Scheidgen's avatar
Markus Scheidgen committed
469
        try:
470
            self.upload_hash = self.upload_file.upload_hash()
471
        except Exception as e:
Markus Scheidgen's avatar
Markus Scheidgen committed
472
473
474
            self.fail('could not create upload hash', e)
            return

475
        # check if the file was already uploaded and processed before
Markus Scheidgen's avatar
Markus Scheidgen committed
476
        if RepoCalc.upload_exists(self.upload_hash):
Markus Scheidgen's avatar
Markus Scheidgen committed
477
478
479
            self.fail('The same file was already uploaded and processed.', level=logging.INFO)
            return

480
481
482
483
484
485
486
487
488
489
490
491
    def match_mainfiles(self) -> Generator[Tuple[File, str, object], None, None]:
        """
        Generator function that matches all files in the upload to all parsers to
        determine the upload's mainfiles.

        Returns:
            Tuples of mainfile, filename, and parsers
        """
        for filename in self.upload_file.filelist:
            potential_mainfile = self.upload_file.get_file(filename)
            for parser in parsers:
                try:
492
                    with potential_mainfile.open('r') as mainfile_f:
493
494
495
                        if parser.is_mainfile(filename, lambda fn: mainfile_f):
                            yield potential_mainfile, filename, parser
                except Exception as e:
496
                    self.get_logger().error(
497
498
499
                        'exception while matching pot. mainfile',
                        mainfile=filename, exc_info=e)

Markus Scheidgen's avatar
Markus Scheidgen committed
500
501
    @task
    def parse_all(self):
502
        """
503
        Identified mainfile/parser combinations among the upload's files, creates
504
505
        respective :class:`Calc` instances, and triggers their processing.
        """
506
507
        logger = self.get_logger()

Markus Scheidgen's avatar
Markus Scheidgen committed
508
        # TODO: deal with multiple possible parser specs
Markus Scheidgen's avatar
Markus Scheidgen committed
509
510
        with utils.timer(
                logger, 'upload extracted', step='matching',
511
512
                upload_size=self.upload_file.size,
                upload_filecount=len(self.upload_file.filelist)):
513
            total_calcs = 0
514
515
516
517
518
519
520
521
522
            for mainfile, filename, parser in self.match_mainfiles():
                calc = Calc.create(
                    archive_id='%s/%s' % (self.upload_hash, utils.hash(filename)),
                    mainfile=filename, parser=parser.name,
                    mainfile_tmp_path=mainfile.os_path,
                    upload_id=self.upload_id)

                calc.process()
                total_calcs += 1
Markus Scheidgen's avatar
Markus Scheidgen committed
523

524
525
526
527
528
        # have to save the total_calcs information for chord management
        self.spwaned_childred(total_calcs)

    def join(self):
        self.cleanup()
Markus Scheidgen's avatar
Markus Scheidgen committed
529
530
531
532

    @task
    def cleanup(self):
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
533
            upload = UploadFile(self.upload_id, local_path=self.local_path)
534
535
536
537
538
            with utils.timer(
                    self.get_logger(), 'upload persisted', step='cleaning',
                    upload_size=upload.size):
                upload.persist()

Markus Scheidgen's avatar
Markus Scheidgen committed
539
540
541
542
            with utils.timer(
                    self.get_logger(), 'processing cleaned up', step='cleaning',
                    upload_size=upload.size):
                upload.remove_extract()
Markus Scheidgen's avatar
Markus Scheidgen committed
543
        except KeyError as e:
Markus Scheidgen's avatar
Markus Scheidgen committed
544
            self.fail('Upload does not exist', exc_info=e)
Markus Scheidgen's avatar
Markus Scheidgen committed
545
546
547
548
549
            return

        self.get_logger().debug('closed upload')

    @property
550
551
552
553
554
555
556
557
558
559
560
    def processed_calcs(self):
        return Calc.objects(upload_id=self.upload_id, status__in=[SUCCESS, FAILURE]).count()

    @property
    def total_calcs(self):
        return Calc.objects(upload_id=self.upload_id).count()

    @property
    def failed_calcs(self):
        return Calc.objects(upload_id=self.upload_id, status=FAILURE).count()

561
562
563
564
    @property
    def pending_calcs(self):
        return Calc.objects(upload_id=self.upload_id, status=PENDING).count()

565
566
    def all_calcs(self, start, end, order_by='mainfile'):
        return Calc.objects(upload_id=self.upload_id)[start:end].order_by(order_by)
567

Markus Scheidgen's avatar
Markus Scheidgen committed
568
    @staticmethod
569
570
571
572
573
    def repair_all():
        """
        Utitlity function that will look for suspiciously looking conditions in
        all uncompleted downloads. It ain't a perfect world.
        """
574
575
576
        # TODO this was added as a quick fix to #37.
        # Even though it might be strictly necessary, there should be a tested backup
        # solution for it Chords to not work properly due to failed to fail processings
577
578
579
580
581
582
583
584
585
586
        uploads = Upload.objects(status__in=[PENDING, RUNNING])
        for upload in uploads:
            completed = upload.processed_calcs
            total = upload.total
            pending = upload.pending_calcs

            if completed + pending == total:
                time.sleep(2)
                if pending == upload.pending_calcs:
                    Calc.objects(upload_id=upload.upload_id, status=PENDING).delete()