files.py 28.5 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Uploads contains classes and functions to create and maintain file structures
for uploads.

There are two different structures for uploads in two different states: *staging* and *public*.
Possible operations on uploads differ based on this state. Staging is used for
processing, heavily editing, creating hashes, etc. Public is supposed to be a
almost readonly (beside metadata) storage.

Markus Scheidgen's avatar
Markus Scheidgen committed
24
25
.. code-block:: sh

26
27
28
    fs/staging/<upload>/raw/**
                       /archive/<calc>.json
    fs/public/<upload>/raw-public.plain.zip
29
30
31
                      /raw-restricted.plain.zip
                      /archive-public.json.zip
                      /archive-restricted.json.zip
Markus Scheidgen's avatar
Markus Scheidgen committed
32

33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
There is an implicit relationship between files, based on them being in the same
directory. Each directory with at least one *mainfile* is a *calculation directory*
and all the files are *aux* files to that *mainfile*. This is independent of the
respective files actually contributing data or not. A *calculation directory* might
contain multiple *mainfile*. E.g., user simulated multiple states of the same system, have
one calculation based on the other, etc. In this case the other *mainfile* is an *aux*
file to the original *mainfile* and vice versa.

Published files are kept in pairs of public and restricted files. Here the multiple *mainfiles*
per directory provides a dilemma. If on *mainfile* is restricted, all its *aux* files
should be restricted too. But if one of the *aux* files is actually a *mainfile* it
might be published!

There are multiple ways to solve this. Due to the rarity of the case, we take the
most simple solution: if one file is public, all files are made public, execpt those
being other mainfiles. Therefore, the aux files of a restricted calc might become public!
49
50
"""

51
from abc import ABCMeta
52
import sys
53
from typing import IO, Generator, Dict, Iterable, Callable, List, Tuple
54
55
56
import os.path
import os
import shutil
57
import tarfile
58
import hashlib
59
import io
60
import pickle
61
import cachetools
62

Markus Scheidgen's avatar
Markus Scheidgen committed
63
from nomad import config, utils
64
from nomad.datamodel import UploadWithMetadata
65
66


67
68
69
70
71
72
73
74
# TODO this should become obsolete, once we are going beyong python 3.6. For now
# python 3.6's zipfile does not allow to seek/tell within a file-like opened from a
# file in a zipfile.
if sys.version_info >= (3, 7):
    import zipfile
else:
    import zipfile37 as zipfile

75
76
77
user_metadata_filename = 'user_metadata.pickle'


78
79
80
81
82
83
84
85
86
def always_restricted(path: str):
    """
    Used to put general restrictions on files, e.g. due to licensing issues. Will be
    called during packing and while accessing public files.
    """
    if os.path.basename(path) == 'POTCAR':
        return True


87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
def copytree(src, dst):
    """
    A close on ``shutils.copytree`` that does not try to copy the stats on all files.
    This is unecessary for our usecase and also causes permission denies for unknown
    reasons.
    """
    os.makedirs(dst, exist_ok=False)

    for item in os.listdir(src):
        s = os.path.join(src, item)
        d = os.path.join(dst, item)
        if os.path.isdir(s):
            copytree(s, d)
        else:
            shutil.copyfile(s, d)


Markus Scheidgen's avatar
Markus Scheidgen committed
104
class PathObject:
105
106
107
108
109
110
    """
    Object storage-like abstraction for paths in general.
    Arguments:
        bucket: The bucket to store this object in
        object_id: The object id (i.e. directory path)
        os_path: Override the "object storage" path with the given path.
111
        prefix: Add a x-digit prefix directory, e.g. foo/test/ -> foo/tes/test
Markus Scheidgen's avatar
Markus Scheidgen committed
112
        create_prefix: Create the prefix right away
113
    """
Markus Scheidgen's avatar
Markus Scheidgen committed
114
115
116
    def __init__(
            self, bucket: str, object_id: str, os_path: str = None,
            prefix: bool = False, create_prefix: bool = False) -> None:
Markus Scheidgen's avatar
Markus Scheidgen committed
117
118
119
        if os_path:
            self.os_path = os_path
        else:
120
            self.os_path = os.path.join(bucket, object_id)
Markus Scheidgen's avatar
Markus Scheidgen committed
121

122
        if prefix and config.fs.prefix_size > 0:
123
124
            segments = list(os.path.split(self.os_path))
            last = segments[-1]
125
            segments[-1] = last[:config.fs.prefix_size]
126
127
128
            segments.append(last)
            self.os_path = os.path.join(*segments)

Markus Scheidgen's avatar
Markus Scheidgen committed
129
130
131
            if create_prefix:
                os.makedirs(os.path.dirname(self.os_path), exist_ok=True)

Markus Scheidgen's avatar
Markus Scheidgen committed
132
    def delete(self) -> None:
133
134
135
136
        basename = os.path.basename(self.os_path)
        parent_directory = os.path.dirname(self.os_path)
        parent_name = os.path.basename(parent_directory)

Markus Scheidgen's avatar
Markus Scheidgen committed
137
138
        shutil.rmtree(self.os_path)

139
        if len(parent_name) == config.fs.prefix_size and basename.startswith(parent_name):
140
141
142
143
144
145
146
            try:
                if not os.listdir(parent_directory):
                    os.rmdir(parent_directory)
            except Exception as e:
                utils.get_logger(__name__).error(
                    'could not remove empty prefix dir', directory=parent_directory, exc_info=e)

Markus Scheidgen's avatar
Markus Scheidgen committed
147
148
149
    def exists(self) -> bool:
        return os.path.exists(self.os_path)

150
151
    @property
    def size(self) -> int:
152
        """ The os determined file size. """
153
154
        return os.stat(self.os_path).st_size

Markus Scheidgen's avatar
Markus Scheidgen committed
155
156
157
158
159
    def __repr__(self) -> str:
        return self.os_path


class DirectoryObject(PathObject):
160
161
162
163
164
165
166
    """
    Object storage-like abstraction for directories.
    Arguments:
        bucket: The bucket to store this object in
        object_id: The object id (i.e. directory path)
        create: True if the directory structure should be created. Default is False.
    """
Markus Scheidgen's avatar
Markus Scheidgen committed
167
168
169
170
171
172
173
174
175
176
177
    def __init__(self, bucket: str, object_id: str, create: bool = False, **kwargs) -> None:
        super().__init__(bucket, object_id, **kwargs)
        self._create = create
        if create and not os.path.isdir(self.os_path):
            os.makedirs(self.os_path)

    def join_dir(self, path, create: bool = None) -> 'DirectoryObject':
        if create is None:
            create = self._create
        return DirectoryObject(None, None, create=create, os_path=os.path.join(self.os_path, path))

178
    def join_file(self, path) -> PathObject:
179
180
181
182
183
        dirname = os.path.dirname(path)
        if dirname != '':
            return self.join_dir(dirname).join_file(os.path.basename(path))
        else:
            return PathObject(None, None, os_path=os.path.join(self.os_path, path))
Markus Scheidgen's avatar
Markus Scheidgen committed
184

185
186
187
    def exists(self) -> bool:
        return os.path.isdir(self.os_path)

Markus Scheidgen's avatar
Markus Scheidgen committed
188

189
class ExtractError(Exception):
190
191
192
    pass


193
194
195
196
class Restricted(Exception):
    pass


Markus Scheidgen's avatar
Markus Scheidgen committed
197
class UploadFiles(DirectoryObject, metaclass=ABCMeta):
198
199
200

    _archive_ext = 'json'

201
    def __init__(
202
203
204
            self, bucket: str, upload_id: str,
            is_authorized: Callable[[], bool] = lambda: False,
            create: bool = False) -> None:
205
        self.logger = utils.get_logger(__name__, upload_id=upload_id)
206
207
208
209
210
211

        super().__init__(bucket, upload_id, create=create, prefix=True)

        if not create and not self.exists():
            raise KeyError()

212
        self.upload_id = upload_id
213
        self._is_authorized = is_authorized
214
215
216
217

    @property
    def _user_metadata_file(self):
        return self.join_file('user_metadata.pickle')
218
219
220
221
222
223
224
225
226
227
228
229
230

    @property
    def user_metadata(self) -> dict:
        if self._user_metadata_file.exists():
            with open(self._user_metadata_file.os_path, 'rb') as f:
                return pickle.load(f)
        else:
            return {}

    @user_metadata.setter
    def user_metadata(self, data: dict) -> None:
        with open(self._user_metadata_file.os_path, 'wb') as f:
            pickle.dump(data, f)
231

232
233
    @staticmethod
    def get(upload_id: str, *args, **kwargs) -> 'UploadFiles':
234
        if DirectoryObject(config.fs.staging, upload_id, prefix=True).exists():
235
            return StagingUploadFiles(upload_id, *args, **kwargs)
236
        elif DirectoryObject(config.fs.public, upload_id, prefix=True).exists():
237
238
239
240
            return PublicUploadFiles(upload_id, *args, **kwargs)
        else:
            return None

241
    def raw_file(self, file_path: str, *args, **kwargs) -> IO:
242
        """
243
        Opens a raw file and returns a file-like object. Additional args, kwargs are
244
        delegated to the respective `open` call.
245
246
247
248
        Arguments:
            file_path: The path to the file relative to the upload.
        Raises:
            KeyError: If the file does not exist.
249
            Restricted: If the file is restricted and upload access evaluated to False.
250
        """
Markus Scheidgen's avatar
Markus Scheidgen committed
251
        raise NotImplementedError()
252

253
254
255
256
257
258
259
    def raw_file_size(self, file_path: str) -> int:
        """
        Returns:
            The size of the given raw file.
        """
        raise NotImplementedError()

260
261
262
263
264
265
266
267
268
269
    def raw_file_manifest(self, path_prefix: str = None) -> Generator[str, None, None]:
        """
        Returns the path for all raw files in the archive (with a given prefix).
        Arguments:
            path_prefix: An optional prefix; only returns those files that have the prefix.
        Returns:
            An iterable over all (matching) raw files.
        """
        raise NotImplementedError()

270
271
272
273
274
275
276
277
278
279
    def raw_file_list(self, directory: str) -> List[Tuple[str, int]]:
        """
        Gives a list of directory contents and its size.
        Arguments:
            directory: The directory to list
        Returns:
            A list of tuples with file name and size.
        """
        raise NotImplementedError()

280
    def archive_file(self, calc_id: str, *args, **kwargs) -> IO:
281
282
283
284
        """
        Opens a archive file and returns a file-like objects. Additional args, kwargs are
        delegated to the respective `open` call.
        Arguments:
285
            calc_id: The id identifying the calculation.
286
287
288
289
290
291
        Raises:
            KeyError: If the calc does not exist.
            Restricted: If the file is restricted and upload access evaluated to False.
        """
        raise NotImplementedError()

292
    def archive_log_file(self, calc_id: str, *args, **kwargs) -> IO:
293
        """
294
295
        Opens a archive log file and returns a file-like objects. Additional args, kwargs are
        delegated to the respective `open` call.
296
        Arguments:
297
            calc_id: The id identifying the calculation.
298
299
        Raises:
            KeyError: If the calc does not exist.
300
            Restricted: If the file is restricted and upload access evaluated to False.
301
        """
Markus Scheidgen's avatar
Markus Scheidgen committed
302
        raise NotImplementedError()
303
304


Markus Scheidgen's avatar
Markus Scheidgen committed
305
class StagingUploadFiles(UploadFiles):
306
307
308
309
    def __init__(
            self, upload_id: str, is_authorized: Callable[[], bool] = lambda: False,
            create: bool = False) -> None:
        super().__init__(config.fs.staging, upload_id, is_authorized, create)
310

311
312
313
        self._raw_dir = self.join_dir('raw')
        self._archive_dir = self.join_dir('archive')
        self._frozen_file = self.join_file('.frozen')
314

315
        self._size = 0
316
317
318
319
320
        self._shared = DirectoryObject(config.fs.public, upload_id, create=create)

    @property
    def _user_metadata_file(self):
        return self._shared.join_file('user_metadata.pickle')
321
322
323
324
325

    @property
    def size(self) -> int:
        return self._size

326
    def _file(self, path_object: PathObject, *args, **kwargs) -> IO:
327
        try:
328
            return open(path_object.os_path, *args, **kwargs)
329
330
        except FileNotFoundError:
            raise KeyError()
331
332
        except IsADirectoryError:
            raise KeyError()
333

334
    def raw_file(self, file_path: str, *args, **kwargs) -> IO:
335
        if not self._is_authorized():
336
            raise Restricted
337
        return self._file(self.raw_file_object(file_path), *args, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
338

339
340
341
342
343
    def raw_file_size(self, file_path: str) -> int:
        if not self._is_authorized():
            raise Restricted
        return self.raw_file_object(file_path).size

344
345
346
    def raw_file_object(self, file_path: str) -> PathObject:
        return self._raw_dir.join_file(file_path)

347
    def archive_file(self, calc_id: str, *args, **kwargs) -> IO:
348
        if not self._is_authorized():
349
            raise Restricted
350
        return self._file(self.archive_file_object(calc_id), *args, **kwargs)
351

352
    def archive_log_file(self, calc_id: str, *args, **kwargs) -> IO:
353
        if not self._is_authorized():
354
            raise Restricted
355
        return self._file(self.archive_log_file_object(calc_id), *args, **kwargs)
356

357
358
    def archive_file_object(self, calc_id: str) -> PathObject:
        return self._archive_dir.join_file('%s.%s' % (calc_id, self._archive_ext))
359

360
361
    def archive_log_file_object(self, calc_id: str) -> PathObject:
        return self._archive_dir.join_file('%s.log' % calc_id)
362

363
    def add_rawfiles(self, path: str, move: bool = False, prefix: str = None, force_archive: bool = False) -> None:
364
        """
365
        Add rawfiles to the upload. The given file will be copied, moved, or extracted.
Markus Scheidgen's avatar
Markus Scheidgen committed
366

367
368
        Arguments:
            path: Path to a directory, file, or zip file. Zip files will be extracted.
369
370
            move: Whether the file should be moved instead of copied. Zips will be extracted and then deleted.
            prefix: Optional path prefix for the added files.
371
372
            force_archive: Expect the file to be a zip or other support archive file.
                Usually those files are only extracted if they can be extracted and copied instead.
373
374
375
        """
        assert not self.is_frozen
        assert os.path.exists(path)
376
        self._size += os.stat(path).st_size
377
378
379
        target_dir = self._raw_dir
        if prefix is not None:
            target_dir = target_dir.join_dir(prefix, create=True)
380
        ext = os.path.splitext(path)[1]
381
        if force_archive or ext == '.zip':
382
            try:
383
                with zipfile.ZipFile(path) as zf:
384
385
386
                    zf.extractall(target_dir.os_path)
                if move:
                    os.remove(path)
387
                return
388
            except zipfile.BadZipFile:
389
390
                pass

391
392
393
394
395
396
397
398
399
400
401
402
403
        if force_archive or ext in ['.tgz', '.tar.gz', '.tar.bz2']:
            try:
                with tarfile.open(path) as tf:
                    tf.extractall(target_dir.os_path)
                if move:
                    os.remove(path)
                return
            except tarfile.TarError:
                pass

        if force_archive:
            raise ExtractError

404
        if move:
405
            shutil.move(path, target_dir.os_path)
406
        else:
407
408
409
410
            if os.path.isdir(path):
                shutil.copytree(path, os.path.join(target_dir.os_path, os.path.dirname(path)))
            else:
                shutil.copy(path, target_dir.os_path)
411
412
413
414
415
416

    @property
    def is_frozen(self) -> bool:
        """ Returns True if this upload is already *bagged*. """
        return self._frozen_file.exists()

417
418
419
420
421
    def create_extracted_copy(self) -> None:
        """
        Copies all raw-file to the extracted bucket to mimic the behaviour of the old
        CoE python API. TODO: should be removed after migration.
        """
422
        copytree(self._raw_dir.os_path, os.path.join(config.fs.coe_extracted, self.upload_id))
423

424
    def pack(self, upload: UploadWithMetadata) -> None:
425
426
427
428
429
        """
        Replaces the staging upload data with a public upload record by packing all
        data into files. It is only available if upload *is_bag*.
        This is potentially a long running operation.
        Arguments:
430
431
            calcs: The calculation metadata of the upload used to determine what files to
                pack and what the embargo situation is.
432
        """
Markus Scheidgen's avatar
Markus Scheidgen committed
433
434
        self.logger.debug('started to pack upload')

435
436
437
438
        # freeze the upload
        assert not self.is_frozen, "Cannot pack an upload that is packed, or packing."
        with open(self._frozen_file.os_path, 'wt') as f:
            f.write('frozen')
439

440
441
442
443
444
        # create a target dir in the public bucket
        target_dir = DirectoryObject(
            config.fs.public, self.upload_id, create=True, prefix=True,
            create_prefix=True)
        assert target_dir.exists()
445
446

        # copy user metadata
447
448
        target_metadata_file = target_dir.join_file(user_metadata_filename)
        if self._user_metadata_file.exists() and not target_metadata_file.exists():
449
450
            shutil.copyfile(
                self._user_metadata_file.os_path,
451
                target_metadata_file.os_path)
Markus Scheidgen's avatar
Markus Scheidgen committed
452

453
        def create_zipfile(kind: str, prefix: str, ext: str) -> zipfile.ZipFile:
454
            file = target_dir.join_file('%s-%s.%s.zip' % (kind, prefix, ext))
455
            return zipfile.ZipFile(file.os_path, mode='w')
Markus Scheidgen's avatar
Markus Scheidgen committed
456
457
458

        # In prior versions we used bagit on raw files. There was not much purpose for
        # it, so it was removed. Check 0.3.x for the implementation
459

Markus Scheidgen's avatar
Markus Scheidgen committed
460
461
462
        # zip raw files
        raw_public_zip = create_zipfile('raw', 'public', 'plain')
        raw_restricted_zip = create_zipfile('raw', 'restricted', 'plain')
463

Markus Scheidgen's avatar
Markus Scheidgen committed
464
465
466
        # 1. add all public raw files
        # 1.1 collect all public mainfiles and aux files
        public_files: Dict[str, str] = {}
467
468
469
        for calc in upload.calcs:
            if not calc.with_embargo:
                mainfile = calc.mainfile
470
                assert mainfile is not None
471
472
473
474
475
                # mainfile might already have been added due to being a auxfile to another calc
                if mainfile not in public_files:
                    for filepath in self.calc_files(mainfile, with_cutoff=False):
                        if not always_restricted(filepath):
                            public_files[filepath] = None
Markus Scheidgen's avatar
Markus Scheidgen committed
476
        # 1.2 remove the non public mainfiles that have been added as auxfiles of public mainfiles
477
478
479
        for calc in upload.calcs:
            if calc.with_embargo:
                mainfile = calc.mainfile
480
                assert mainfile is not None
Markus Scheidgen's avatar
Markus Scheidgen committed
481
482
483
484
485
                if mainfile in public_files:
                    del(public_files[mainfile])
        # 1.3 zip all remaining public
        for filepath in public_files.keys():
            raw_public_zip.write(self._raw_dir.join_file(filepath).os_path, filepath)
486

Markus Scheidgen's avatar
Markus Scheidgen committed
487
488
489
490
        # 2. everything else becomes restricted
        for filepath in self.raw_file_manifest():
            if filepath not in public_files:
                raw_restricted_zip.write(self._raw_dir.join_file(filepath).os_path, filepath)
491

Markus Scheidgen's avatar
Markus Scheidgen committed
492
493
        raw_restricted_zip.close()
        raw_public_zip.close()
494
        self.logger.debug('packed raw files')
495
496

        # zip archives
Markus Scheidgen's avatar
Markus Scheidgen committed
497
498
        archive_public_zip = create_zipfile('archive', 'public', self._archive_ext)
        archive_restricted_zip = create_zipfile('archive', 'restricted', self._archive_ext)
499

500
501
        for calc in upload.calcs:
            archive_zip = archive_restricted_zip if calc.with_embargo else archive_public_zip
502

503
            archive_filename = '%s.%s' % (calc.calc_id, self._archive_ext)
504
505
506
            archive_file = self._archive_dir.join_file(archive_filename)
            if archive_file.exists():
                archive_zip.write(archive_file.os_path, archive_filename)
507

508
            archive_log_filename = '%s.%s' % (calc.calc_id, 'log')
509
510
511
512
513
514
            log_file = self._archive_dir.join_file(archive_log_filename)
            if log_file.exists():
                archive_zip.write(log_file.os_path, archive_log_filename)

        archive_restricted_zip.close()
        archive_public_zip.close()
515
        self.logger.debug('packed archives')
516

517
        self.logger.debug('packed upload')
518

519
520
    def raw_file_manifest(self, path_prefix: str = None) -> Generator[str, None, None]:
        upload_prefix_len = len(self._raw_dir.os_path) + 1
521
522
        for root, _, files in os.walk(self._raw_dir.os_path):
            for file in files:
523
524
525
                path = os.path.join(root, file)[upload_prefix_len:]
                if path_prefix is None or path.startswith(path_prefix):
                    yield path
526

527
    def raw_file_list(self, directory: str) -> List[Tuple[str, int]]:
528
529
530
        if not self._is_authorized():
            raise Restricted

531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
        if directory is None or directory == '':
            prefix = self._raw_dir.os_path
        else:
            prefix = os.path.join(self._raw_dir.os_path, directory)

        results: List[Tuple[str, int]] = []
        if not os.path.isdir(prefix):
            return results

        for file in os.listdir(prefix):
            path = os.path.join(prefix, file)
            if os.path.isfile(path):
                results.append((file, os.path.getsize(path)))

        return results

547
    def calc_files(self, mainfile: str, with_mainfile: bool = True, with_cutoff: bool = True) -> Iterable[str]:
548
549
        """
        Returns all the auxfiles and mainfile for a given mainfile. This implements
550
551
        nomad's logic about what is part of a calculation and what not. The mainfile
        is first entry, the rest is sorted.
552
553
554
        Arguments:
            mainfile: The mainfile relative to upload
            with_mainfile: Do include the mainfile, default is True
555
        """
556
557
558
559
        mainfile_object = self._raw_dir.join_file(mainfile)
        if not mainfile_object.exists():
            raise KeyError()

560
        mainfile_basename = os.path.basename(mainfile)
561
562
        calc_dir = os.path.dirname(mainfile_object.os_path)
        calc_relative_dir = calc_dir[len(self._raw_dir.os_path) + 1:]
Markus Scheidgen's avatar
Markus Scheidgen committed
563

564
565
566
567
568
569
570
571
572
573
574
575
576
577
        file_count = 0
        aux_files: List[str] = []
        for filename in os.listdir(calc_dir):
            if filename != mainfile_basename and os.path.isfile(os.path.join(calc_dir, filename)):
                aux_files.append(os.path.join(calc_relative_dir, filename))
                file_count += 1

            if with_cutoff and file_count > config.auxfile_cutoff:
                # If there are two many of them, its probably just a directory with lots of
                # calculations. In this case it does not make any sense to provide thousands of
                # aux files.
                break

        aux_files = sorted(aux_files)
Markus Scheidgen's avatar
Markus Scheidgen committed
578

579
580
581
582
        if with_mainfile:
            return [mainfile] + aux_files
        else:
            return aux_files
583

584
585
586
587
588
589
590
591
592
593
    def calc_id(self, mainfile: str) -> str:
        """
        Calculates a id for the given calc.
        Arguments:
            mainfile: The mainfile path relative to the upload that identifies the calc in the folder structure.
        Returns:
            The calc id
        Raises:
            KeyError: If the mainfile does not exist.
        """
594
        return utils.hash(self.upload_id, mainfile)
595

596
597
    def calc_hash(self, mainfile: str) -> str:
        """
598
        Calculates a hash for the given calc based on file contents and aux file contents.
599
600
601
        Arguments:
            mainfile: The mainfile path relative to the upload that identifies the calc in the folder structure.
        Returns:
602
            The calculated hash
603
604
605
        Raises:
            KeyError: If the mainfile does not exist.
        """
606
        hash = hashlib.sha512()
607
        for filepath in self.calc_files(mainfile):
608
            with open(self._raw_dir.join_file(filepath).os_path, 'rb') as f:
609
610
611
                for data in iter(lambda: f.read(65536), b''):
                    hash.update(data)

612
        return utils.make_websave(hash)
613

614
615
616
617
618
    def delete(self) -> None:
        super().delete()
        if self._shared.exists():
            self._shared.delete()

619

620
621
622
623
624
class ArchiveBasedStagingUploadFiles(StagingUploadFiles):
    """
    :class:`StagingUploadFiles` based on a single uploaded archive file (.zip)

    Arguments:
625
        upload_path: The path to the uploaded file.
626
627
    """

628
    def __init__(
629
            self, upload_id: str, upload_path: str, *args, **kwargs) -> None:
630
        super().__init__(upload_id, *args, **kwargs)
631
        self.upload_path = upload_path
632
633
634

    @property
    def is_valid(self) -> bool:
635
636
637
        if self.upload_path is None:
            return False
        if not os.path.exists(self.upload_path):
638
            return False
639
        elif not os.path.isfile(self.upload_path):
640
641
            return False
        else:
642
            return True
643
644
645

    def extract(self) -> None:
        assert next(self.raw_file_manifest(), None) is None, 'can only extract once'
646
        super().add_rawfiles(self.upload_path, force_archive=True)
647

648
    def add_rawfiles(self, path: str, move: bool = False, prefix: str = None, force_archive: bool = False) -> None:
649
650
651
        assert False, 'do not add_rawfiles to a %s' % self.__class__.__name__


652
653
654
655
656
657
658
659
660
661
662
class LRUZipFileCache(cachetools.LRUCache):
    """ Specialized cache that closes the cached zipfiles on eviction """
    def __init__(self, maxsize):
        super().__init__(maxsize)

    def popitem(self):
        key, val = super().popitem()
        val.close()
        return key, val


663
class PublicUploadFiles(UploadFiles):
664
665
    __zip_file_cache = LRUZipFileCache(maxsize=128)

666
    def __init__(self, *args, **kwargs) -> None:
667
        super().__init__(config.fs.public, *args, **kwargs)
668
669

    @cachetools.cached(cache=__zip_file_cache)
670
    def get_zip_file(self, prefix: str, access: str, ext: str) -> zipfile.ZipFile:
671
        zip_file = self.join_file('%s-%s.%s.zip' % (prefix, access, ext))
672
        return zipfile.ZipFile(zip_file.os_path)
673

674
    def _file(self, prefix: str, ext: str, path: str, *args, **kwargs) -> IO:
675
676
677
678
679
        mode = kwargs.get('mode') if len(args) == 0 else args[0]
        if 'mode' in kwargs:
            del(kwargs['mode'])
        mode = mode if mode else 'rb'

680
681
        for access in ['public', 'restricted']:
            try:
682
                zf = self.get_zip_file(prefix, access, ext)
683
684
685
686
687
688
689
690

                f = zf.open(path, 'r', **kwargs)
                if (access == 'restricted' or always_restricted(path)) and not self._is_authorized():
                    raise Restricted
                if 't' in mode:
                    return io.TextIOWrapper(f)
                else:
                    return f
691
692
            except FileNotFoundError:
                pass
693
694
            except IsADirectoryError:
                pass
695
696
697
698
699
            except KeyError:
                pass

        raise KeyError()

700
    def raw_file(self, file_path: str, *args, **kwargs) -> IO:
Markus Scheidgen's avatar
Markus Scheidgen committed
701
        return self._file('raw', 'plain', file_path, *args, *kwargs)
702

703
704
705
    def raw_file_size(self, file_path: str) -> int:
        for access in ['public', 'restricted']:
            try:
706
707
708
709
                zf = self.get_zip_file('raw', access, 'plain')
                info = zf.getinfo(file_path)
                if (access == 'restricted' or always_restricted(file_path)) and not self._is_authorized():
                    raise Restricted
710

711
                return info.file_size
712
713
714
715
716
717
718
            except FileNotFoundError:
                pass
            except KeyError:
                pass

        raise KeyError()

719
720
721
    def raw_file_manifest(self, path_prefix: str = None) -> Generator[str, None, None]:
        for access in ['public', 'restricted']:
            try:
722
723
724
725
                zf = self.get_zip_file('raw', access, 'plain')
                for path in zf.namelist():
                    if path_prefix is None or path.startswith(path_prefix):
                        yield path
726
727
728
            except FileNotFoundError:
                pass

729
730
731
732
733
734
735
    def raw_file_list(self, directory: str) -> List[Tuple[str, int]]:
        if directory is None:
            directory = ''
        directory_len = len(directory)

        results = []
        for access in ['public', 'restricted']:
736
737
738
            if access == 'restricted' and not self._is_authorized():
                continue

739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
            try:
                zf = self.get_zip_file('raw', access, 'plain')
                for path in zf.namelist():
                    content_path = path[directory_len + (0 if directory_len == 0 else 1):]
                    if path.startswith(directory) and '/' not in content_path:
                        if '/' not in content_path:
                            results.append((content_path, zf.getinfo(path).file_size))
                        else:
                            # this asserts that sub directories are always behind their
                            # parents and file siblings
                            break

            except FileNotFoundError:
                pass

        return results

756
757
    def archive_file(self, calc_id: str, *args, **kwargs) -> IO:
        return self._file('archive', self._archive_ext, '%s.%s' % (calc_id, self._archive_ext), *args, **kwargs)
758

759
760
    def archive_log_file(self, calc_id: str, *args, **kwargs) -> IO:
        return self._file('archive', self._archive_ext, '%s.log' % calc_id, *args, **kwargs)
761
762
763
764
765
766
767

    def repack(self) -> None:
        """
        Replaces the existing public/restricted data file pairs with new ones, based
        on current restricted information in the metadata. Should be used after updating
        the restrictions on calculations. This is potentially a long running operation.
        """
768
        raise NotImplementedError()