diff --git a/gui/src/components/api.js b/gui/src/components/api.js
index b4f005bf6883e134fa97d4384f12823de617510a..8dae504c8f4e6449ad5dab18c3d1962a84971525 100644
--- a/gui/src/components/api.js
+++ b/gui/src/components/api.js
@@ -377,7 +377,7 @@ class Api {
         }
       }
     })
-    return this.post('entries/edit', edit)
+    return this.post('entries/edit_v0', edit)
   }
 }
 
diff --git a/nomad/app/v1/models.py b/nomad/app/v1/models.py
index c30ddfdceabe0b972055beb1ea5197320385676c..b1f7c9453f28f7ec4198ca0cb8d1e91a56ebd2bc 100644
--- a/nomad/app/v1/models.py
+++ b/nomad/app/v1/models.py
@@ -30,13 +30,14 @@ from pydantic import (  # pylint: disable=unused-import
     validator,
     root_validator,
 )
+from pydantic.main import create_model
 import datetime
 import numpy as np
 import re
 import fnmatch
 import json
 
-from nomad import datamodel  # pylint: disable=unused-import
+from nomad import datamodel, metainfo  # pylint: disable=unused-import
 from nomad.utils import strip
 from nomad.metainfo import Datetime, MEnum
 from nomad.metainfo.elasticsearch_extension import DocumentType, material_entry_type, material_type
@@ -51,6 +52,44 @@ Value = Union[StrictInt, StrictFloat, StrictBool, str, datetime.datetime]
 ComparableValue = Union[StrictInt, StrictFloat, str, datetime.datetime]
 
 
+class Owner(str, enum.Enum):
+    '''
+    The `owner` allows to limit the scope of the searched based on entry ownership.
+    This is useful, if you only want to search among all publically downloadable
+    entries, or only among your own entries, etc.
+
+    These are the possible owner values and their meaning:
+    * `all`: Consider all entries.
+    * `public` (default): Consider all entries that can be publically downloaded,
+        i.e. only published entries without embargo
+    * `user`: Only consider entries that belong to you.
+    * `shared`: Only consider entries that belong to you or are shared with you.
+    * `visible`: Consider all entries that are visible to you. This includes
+        entries with embargo or unpublished entries that belong to you or are
+        shared with you.
+    * `staging`: Only search through unpublished entries.
+    '''
+
+    # There seems to be a slight bug in fast API. When it creates the example in OpenAPI
+    # it will ignore any given default or example and simply take the first enum value.
+    # Therefore, we put public first, which is the most default and save in most contexts.
+    public = 'public'
+    all_ = 'all'
+    visible = 'visible'
+    shared = 'shared'
+    user = 'user'
+    staging = 'staging'
+    admin = 'admin'
+
+
+class Direction(str, enum.Enum):
+    '''
+    Order direction, either ascending (`asc`) or descending (`desc`)
+    '''
+    asc = 'asc'
+    desc = 'desc'
+
+
 class HTTPExceptionModel(BaseModel):
     detail: str
 
@@ -156,36 +195,6 @@ Not.update_forward_refs()
 Nested.update_forward_refs()
 
 
-class Owner(str, enum.Enum):
-    '''
-    The `owner` allows to limit the scope of the searched based on entry ownership.
-    This is useful, if you only want to search among all publically downloadable
-    entries, or only among your own entries, etc.
-
-    These are the possible owner values and their meaning:
-    * `all`: Consider all entries.
-    * `public` (default): Consider all entries that can be publically downloaded,
-        i.e. only published entries without embargo
-    * `user`: Only consider entries that belong to you.
-    * `shared`: Only consider entries that belong to you or are shared with you.
-    * `visible`: Consider all entries that are visible to you. This includes
-        entries with embargo or unpublished entries that belong to you or are
-        shared with you.
-    * `staging`: Only search through unpublished entries.
-    '''
-
-    # There seems to be a slight bug in fast API. When it creates the example in OpenAPI
-    # it will ignore any given default or example and simply take the first enum value.
-    # Therefore, we put public first, which is the most default and save in most contexts.
-    public = 'public'
-    all_ = 'all'
-    visible = 'visible'
-    shared = 'shared'
-    user = 'user'
-    staging = 'staging'
-    admin = 'admin'
-
-
 class WithQuery(BaseModel):
     owner: Optional[Owner] = Body('public')
     query: Optional[Query] = Body(
@@ -424,14 +433,6 @@ class QueryParameters:
         return WithQuery(query=query, owner=owner)
 
 
-class Direction(str, enum.Enum):
-    '''
-    Order direction, either ascending (`asc`) or descending (`desc`)
-    '''
-    asc = 'asc'
-    desc = 'desc'
-
-
 class MetadataRequired(BaseModel):
     ''' Defines which metadata quantities are included or excluded in the response. '''
 
@@ -1051,6 +1052,52 @@ class Metadata(WithQueryAndPagination):
         '''))
 
 
+class MetadataEditListAction(BaseModel):
+    '''
+    Defines an action to perform on a list quantity. This enables users to add and remove values.
+    '''
+    op: str = Field(description=strip('''
+        Defines the type of operation (either `set`, `add` or `remove`)'''))
+    values: Union[str, List[str]] = Field(description=strip('''
+        The value or values to set/add/remove (string or list of strings)'''))
+
+
+# Generate model for MetadataEditActions
+_metadata_edit_actions_fields = {}
+for quantity in datamodel.EditableUserMetadata.m_def.definitions:
+    if quantity.is_scalar:
+        pydantic_type = quantity.type if quantity.type in (str, int, float, bool) else str
+    else:
+        pydantic_type = Union[str, List[str], MetadataEditListAction]
+    if getattr(quantity, 'a_auth_level', None) == datamodel.AuthLevel.admin:
+        description = '**NOTE:** Only editable by admin user'
+    else:
+        description = None
+    _metadata_edit_actions_fields[quantity.name] = (
+        Optional[pydantic_type], Field(description=description))
+
+MetadataEditActions = create_model('MetadataEditActions', **_metadata_edit_actions_fields)  # type: ignore
+
+
+class MetadataEditRequest(WithQuery):
+    ''' Defines a request to edit metadata. '''
+    metadata: Optional[MetadataEditActions] = Field(  # type: ignore
+        description=strip('''
+            Metadata to set, on the upload and/or selected entries.'''))
+    entries: Optional[Dict[str, MetadataEditActions]] = Field(  # type: ignore
+        description=strip('''
+            An optional dictionary, specifying metadata to set on individual entries. The field
+            `entries_metadata_key` defines which type of key is used in the dictionary to identify
+            the entries. Note, only quantities defined on the entry level can be set using this method.'''))
+    entries_key: Optional[str] = Field(
+        default='calc_id', description=strip('''
+            Defines which type of key is used in `entries_metadata`. Default is `calc_id`.'''))
+    verify_only: Optional[bool] = Field(
+        default=False, description=strip('''
+            Do not execute the request, just verifies it and provides detailed feedback on
+            encountered errors etc.'''))
+
+
 class Files(BaseModel):
     ''' Configures the download of files. '''
     compress: Optional[bool] = Field(
diff --git a/nomad/app/v1/routers/entries.py b/nomad/app/v1/routers/entries.py
index 5c7a83db98e910af53e394ee8cc9385ba16fe619..a6f73db67d661f742a370b929ebd44c11d0bb67f 100644
--- a/nomad/app/v1/routers/entries.py
+++ b/nomad/app/v1/routers/entries.py
@@ -48,8 +48,9 @@ from ..utils import (
     create_download_stream_zipped, create_download_stream_raw_file,
     DownloadItem, create_responses)
 from ..models import (
-    Aggregation, Pagination, PaginationResponse, MetadataPagination, TermsAggregation, WithQuery, WithQueryAndPagination, MetadataRequired,
-    MetadataResponse, Metadata, Files, Query, User, Owner,
+    Aggregation, Pagination, PaginationResponse, MetadataPagination, TermsAggregation,
+    WithQuery, WithQueryAndPagination, MetadataRequired, MetadataResponse, Metadata,
+    MetadataEditRequest, Files, Query, User, Owner,
     QueryParameters, metadata_required_parameters, files_parameters, metadata_pagination_parameters,
     HTTPExceptionModel)
 
@@ -280,6 +281,18 @@ _bad_path_response = status.HTTP_404_NOT_FOUND, {
     'model': HTTPExceptionModel,
     'description': strip('File or directory not found.')}
 
+_bad_edit_request = status.HTTP_400_BAD_REQUEST, {
+    'model': HTTPExceptionModel,
+    'description': strip('Edit request could not be executed.')}
+
+_bad_edit_request_authorization = status.HTTP_401_UNAUTHORIZED, {
+    'model': HTTPExceptionModel,
+    'description': strip('Not enough permissions to execute edit request.')}
+
+_bad_edit_request_empty_query = status.HTTP_404_NOT_FOUND, {
+    'model': HTTPExceptionModel,
+    'description': strip('No matching entries found.')}
+
 _raw_download_response = 200, {
     'content': {'application/zip': {}},
     'description': strip('''
@@ -1161,7 +1174,7 @@ _editable_quantities = {
 
 
 @router.post(
-    '/edit',
+    '/edit_v0',
     tags=[metadata_tag],
     summary='Edit the user metadata of a set of entries',
     response_model=EntryMetadataEditResponse,
@@ -1328,3 +1341,39 @@ async def post_entry_metadata_edit(
                 datamodel.Dataset.m_def.a_mongo.objects(dataset_id=dataset).delete()
 
     return data
+
+
+@router.post(
+    '/edit',
+    tags=[metadata_tag],
+    summary='Edit the user metadata of a set of entries',
+    response_model=MetadataEditRequest,
+    response_model_exclude_unset=True,
+    response_model_exclude_none=True,
+    responses=create_responses(
+        _bad_edit_request, _bad_edit_request_authorization, _bad_edit_request_empty_query))
+async def post_entries_edit(
+        request: Request,
+        data: MetadataEditRequest,
+        user: User = Depends(create_user_dependency(required=True))):
+    '''
+    Updates the metadata of the specified entries.
+
+    **Note:**
+      - Only admins can edit some of the fields.
+      - Only entry level attributes (like `comment`, `references` etc.) can be set using
+        this endpoint; upload level attributes (like `upload_name`, `coauthors`, embargo
+        settings, etc) need to be set through the endpoint **uploads/upload_id/edit**.
+      - If the upload is published, the only operation permitted using this endpoint is to
+        edit the entries in datasets that where created by the current user.
+    '''
+    edit_request_json = await request.json()
+    try:
+        verified_json = proc.MetadataEditRequestHandler.edit_metadata(
+            edit_request_json=edit_request_json, upload_id=None, user=user)
+        return verified_json
+    except RequestValidationError as e:
+        raise  # A problem which we have handled explicitly. Fastapi does json conversion.
+    except Exception as e:
+        # The upload is processing or some kind of unexpected error has occured
+        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
diff --git a/nomad/app/v1/routers/uploads.py b/nomad/app/v1/routers/uploads.py
index 7a732d8878e8f2413392815da46eebcaa126c29f..1669ff1a6f85cb71b30308df83a48ef4e3e44899 100644
--- a/nomad/app/v1/routers/uploads.py
+++ b/nomad/app/v1/routers/uploads.py
@@ -25,23 +25,28 @@ from fastapi import (
     APIRouter, Request, File, UploadFile, status, Depends, Path, Query as FastApiQuery,
     HTTPException)
 from fastapi.responses import StreamingResponse
+from fastapi.exceptions import RequestValidationError
 
 from nomad import utils, config, files, datamodel
 from nomad.files import UploadFiles, StagingUploadFiles, UploadBundle, is_safe_relative_path
-from nomad.processing import Upload, Calc, ProcessAlreadyRunning, ProcessStatus
+from nomad.processing import Upload, Calc, ProcessAlreadyRunning, ProcessStatus, MetadataEditRequestHandler
 from nomad.utils import strip
 from nomad.search import search
 
 from .auth import create_user_dependency, generate_upload_token
 from ..models import (
-    BaseModel, MetadataPagination, User, Direction, Pagination, PaginationResponse, HTTPExceptionModel,
-    Files, files_parameters, WithQuery)
+    MetadataPagination, User, Direction, Pagination, PaginationResponse, HTTPExceptionModel,
+    Files, files_parameters, WithQuery, MetadataEditRequest)
 from ..utils import (
     parameter_dependency_from_model, create_responses, DownloadItem,
     create_download_stream_zipped, create_download_stream_raw_file, create_stream_from_string)
 
 router = APIRouter()
 default_tag = 'uploads'
+metadata_tag = 'uploads/metadata'
+raw_tag = 'uploads/raw'
+action_tag = 'uploads/action'
+bundle_tag = 'uploads/bundle'
 
 logger = utils.get_logger(__name__)
 
@@ -369,7 +374,7 @@ async def get_command_examples(user: User = Depends(create_user_dependency(requi
 
 
 @router.get(
-    '', tags=[default_tag],
+    '', tags=[metadata_tag],
     summary='List uploads of authenticated user.',
     response_model=UploadProcDataQueryResponse,
     responses=create_responses(_not_authorized, _bad_pagination),
@@ -430,7 +435,7 @@ async def get_uploads(
 
 
 @router.get(
-    '/{upload_id}', tags=[default_tag],
+    '/{upload_id}', tags=[metadata_tag],
     summary='Get a specific upload',
     response_model=UploadProcDataResponse,
     responses=create_responses(_upload_not_found, _not_authorized_to_upload),
@@ -453,7 +458,7 @@ async def get_upload(
 
 
 @router.get(
-    '/{upload_id}/entries', tags=[default_tag],
+    '/{upload_id}/entries', tags=[metadata_tag],
     summary='Get the entries of the specific upload as a list',
     response_model=EntryProcDataQueryResponse,
     responses=create_responses(_upload_not_found, _not_authorized_to_upload, _bad_pagination),
@@ -515,7 +520,7 @@ async def get_upload_entries(
 
 
 @router.get(
-    '/{upload_id}/entries/{entry_id}', tags=[default_tag],
+    '/{upload_id}/entries/{entry_id}', tags=[metadata_tag],
     summary='Get a specific entry for a specific upload',
     response_model=EntryProcDataResponse,
     responses=create_responses(_entry_not_found, _not_authorized_to_entry),
@@ -552,7 +557,7 @@ async def get_upload_entry(
 
 
 @router.get(
-    '/{upload_id}/raw/{path:path}', tags=[default_tag],
+    '/{upload_id}/raw/{path:path}', tags=[raw_tag],
     summary='Get the raw files and folders for a given upload and path.',
     response_class=StreamingResponse,
     responses=create_responses(
@@ -684,7 +689,7 @@ async def get_upload_raw_path(
 
 
 @router.put(
-    '/{upload_id}/raw/{path:path}', tags=[default_tag],
+    '/{upload_id}/raw/{path:path}', tags=[raw_tag],
     summary='Put (add or replace) files to an upload at the specified path.',
     response_class=StreamingResponse,
     responses=create_responses(
@@ -764,7 +769,7 @@ async def put_upload_raw_path(
 
 
 @router.delete(
-    '/{upload_id}/raw/{path:path}', tags=[default_tag],
+    '/{upload_id}/raw/{path:path}', tags=[raw_tag],
     summary='Delete file or folder located at the specified path in the specified upload.',
     response_model=UploadProcDataResponse,
     responses=create_responses(_upload_not_found, _not_authorized_to_upload, _bad_request),
@@ -919,7 +924,7 @@ async def post_upload(
 
 
 @router.put(
-    '/{upload_id}/metadata', tags=[default_tag],
+    '/{upload_id}/metadata', tags=[metadata_tag],
     summary='Updates the metadata of the specified upload.',
     response_model=UploadProcDataResponse,
     responses=create_responses(_upload_not_found, _not_authorized_to_upload, _bad_request),
@@ -950,6 +955,47 @@ async def put_upload_metadata(
     return UploadProcDataResponse(upload_id=upload_id, data=_upload_to_pydantic(upload))
 
 
+@router.post(
+    '/{upload_id}/edit', tags=[metadata_tag],
+    summary='Updates the metadata of the specified upload.',
+    response_model=MetadataEditRequest,
+    responses=create_responses(_upload_not_found, _not_authorized_to_upload, _bad_request),
+    response_model_exclude_unset=True,
+    response_model_exclude_none=True)
+async def post_upload_edit(
+        request: Request,
+        data: MetadataEditRequest,
+        upload_id: str = Path(..., description='The unique id of the upload.'),
+        user: User = Depends(create_user_dependency(required=True))):
+    '''
+    Updates the metadata of the specified upload and entries. An optional `query` can be
+    specified to select only some of the entries of the upload (the query results are
+    automatically restricted to the specified upload).
+
+    **Note:**
+      - Only admins can edit some of the fields.
+      - The embargo of a published upload is lifted by setting the `embargo_length` attribute
+        to 0.
+      - If the upload is published, the only operations permitted using this endpoint is to
+        lift the embargo, i.e. set `embargo_length` to 0, and to edit the entries in datasets
+        that where created by the current user.
+      - If a query is specified, it is not possible to edit upload level metadata (like
+        `upload_name`, `coauthors`, etc.), as the purpose of queries is to select only a
+        subset of the upload entries to edit, but changing upload level metadata would affect
+        **all** entries of the upload.
+    '''
+    edit_request_json = await request.json()
+    try:
+        verified_json = MetadataEditRequestHandler.edit_metadata(
+            edit_request_json=edit_request_json, upload_id=upload_id, user=user)
+        return verified_json
+    except RequestValidationError as e:
+        raise  # A problem which we have handled explicitly. Fastapi does json conversion.
+    except Exception as e:
+        # The upload is processing or some kind of unexpected error has occured
+        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
+
+
 @router.delete(
     '/{upload_id}', tags=[default_tag],
     summary='Delete an upload',
@@ -988,7 +1034,7 @@ async def delete_upload(
 
 
 @router.post(
-    '/{upload_id}/action/publish', tags=[default_tag],
+    '/{upload_id}/action/publish', tags=[action_tag],
     summary='Publish an upload',
     response_model=UploadProcDataResponse,
     responses=create_responses(_upload_not_found, _not_authorized_to_upload, _bad_request),
@@ -1071,7 +1117,7 @@ async def post_upload_action_publish(
 
 
 @router.post(
-    '/{upload_id}/action/process', tags=[default_tag],
+    '/{upload_id}/action/process', tags=[action_tag],
     summary='Manually triggers processing of an upload.',
     response_model=UploadProcDataResponse,
     responses=create_responses(_upload_not_found, _not_authorized_to_upload, _bad_request),
@@ -1098,8 +1144,43 @@ async def post_upload_action_process(
         data=_upload_to_pydantic(upload))
 
 
+@router.post(
+    '/{upload_id}/action/lift-embargo', tags=[action_tag],
+    summary='Lifts the embargo of an upload.',
+    response_model=UploadProcDataResponse,
+    responses=create_responses(_upload_not_found, _not_authorized_to_upload, _bad_request),
+    response_model_exclude_unset=True,
+    response_model_exclude_none=True)
+async def post_upload_action_lift_embargo(
+        upload_id: str = Path(
+            ...,
+            description='The unique id of the upload to lift the embargo for.'),
+        user: User = Depends(create_user_dependency(required=True))):
+    ''' Lifts the embargo of an upload. '''
+    upload = _get_upload_with_write_access(
+        upload_id, user, include_published=True, published_requires_admin=False)
+    _check_upload_not_processing(upload)
+    if not upload.published:
+        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=strip('''
+            Upload is not published, no embargo to lift.'''))
+    if not upload.with_embargo:
+        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=strip('''
+            Upload has no embargo.'''))
+    # Lift the embargo using MetadataEditRequestHandler.edit_metadata
+    try:
+        MetadataEditRequestHandler.edit_metadata(
+            edit_request_json={'metadata': {'embargo_length': 0}}, upload_id=upload_id, user=user)
+        upload.reload()
+        return UploadProcDataResponse(
+            upload_id=upload_id,
+            data=_upload_to_pydantic(upload))
+    except Exception as e:
+        # Should only happen if the upload just started processing or something unexpected happens
+        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
+
+
 @router.get(
-    '/bundle/{upload_id}', tags=[default_tag],
+    '/{upload_id}/bundle', tags=[bundle_tag],
     summary='Gets an *upload bundle* for the specified upload.',
     response_class=StreamingResponse,
     responses=create_responses(
@@ -1145,7 +1226,7 @@ async def get_upload_bundle(
 
 
 @router.post(
-    '/bundle', tags=[default_tag],
+    '/bundle', tags=[bundle_tag],
     summary='Posts an *upload bundle* to this NOMAD deployment.',
     response_model=UploadProcDataResponse,
     responses=create_responses(_not_authorized, _bad_request),
@@ -1430,9 +1511,10 @@ def _get_upload_with_write_access(
         raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=strip('''
             The specified upload_id was not found.'''))
     upload = mongodb_query.first()
-    if upload.main_author != str(user.user_id) and not user.is_admin:
-        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=strip('''
-            You do not have write access to the specified upload.'''))
+    if not user.is_admin and upload.main_author != str(user.user_id):
+        if not upload.coauthors or str(user.user_id) not in upload.coauthors:
+            raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=strip('''
+                You do not have write access to the specified upload.'''))
     if upload.published:
         if not include_published:
             raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=strip('''
diff --git a/nomad/cli/admin/admin.py b/nomad/cli/admin/admin.py
index 8c4fec38420bebe3de1e465675c05429bf739a64..96d49dcb86005c3aa64883a1dd3388611ed6ed64 100644
--- a/nomad/cli/admin/admin.py
+++ b/nomad/cli/admin/admin.py
@@ -95,7 +95,9 @@ def lift_embargo(dry, parallel):
                 upload.upload_id, upload.publish_time, embargo_length))
 
             if not dry:
-                upload.set_upload_metadata({'embargo_length': 0})
+                upload.edit_upload_metadata(
+                    edit_request_json=dict(metadata={'embargo_length': 0}),
+                    user_id=config.services.admin_user_id)
     return
 
 
diff --git a/nomad/cli/admin/uploads.py b/nomad/cli/admin/uploads.py
index d10decc245cf2c50e1251952ecf989bb1bd52b2e..df7c1c2d8387f5063e9074478afd9b07eac3e872 100644
--- a/nomad/cli/admin/uploads.py
+++ b/nomad/cli/admin/uploads.py
@@ -295,9 +295,10 @@ def chown(ctx, username, uploads):
     print('%d uploads selected, changing owner ...' % uploads.count())
 
     user = datamodel.User.get(username=username)
-    upload_metadata = datamodel.UploadMetadata(main_author=user)
     for upload in uploads:
-        upload.set_upload_metadata(upload_metadata.m_to_dict())
+        upload.edit_upload_metadata(
+            edit_request_json=dict(metadata={'main_author': user.user_id}),
+            user_id=config.services.admin_user_id)
 
 
 @uploads.command(help='Reset the processing state.')
diff --git a/nomad/cli/client/integrationtests.py b/nomad/cli/client/integrationtests.py
index 03e5c7913019b962b07608507392421698c38efd..fe65b943280f09cc35273fa0d6b238772869d121 100644
--- a/nomad/cli/client/integrationtests.py
+++ b/nomad/cli/client/integrationtests.py
@@ -180,7 +180,7 @@ def integrationtests(auth: api.Auth, skip_parsers: bool, skip_publish: bool, ski
             'datasets': [{'value': dataset}]}
 
         response = api.post(
-            'entries/edit',
+            'entries/edit_v0',
             data=json.dumps(dict(actions=actions, **query_request_params)),
             auth=auth)
         assert response.status_code == 200, response.text
diff --git a/nomad/datamodel/__init__.py b/nomad/datamodel/__init__.py
index aa140edd89a052cfc874064a19dd18629311e1d5..7712f596480c112aba73194f6db29ad282ee8da3 100644
--- a/nomad/datamodel/__init__.py
+++ b/nomad/datamodel/__init__.py
@@ -77,7 +77,7 @@ import sys
 from nomad.metainfo import Environment
 
 from .datamodel import (
-    Dataset, User, Author, EditableUserMetadata, UserProvidableMetadata,
+    Dataset, User, Author, EditableUserMetadata, UserProvidableMetadata, AuthLevel,
     UploadMetadata, MongoUploadMetadata, MongoEntryMetadata, MongoSystemMetadata,
     EntryMetadata, EntryArchive)
 from .optimade import OptimadeEntry, Species
diff --git a/nomad/datamodel/datamodel.py b/nomad/datamodel/datamodel.py
index 0b296ce98416e5aab9529b20e1e6139a5af79869..7c0232684351836d2808df3a28afcd4118e9402f 100644
--- a/nomad/datamodel/datamodel.py
+++ b/nomad/datamodel/datamodel.py
@@ -19,6 +19,7 @@
 ''' All generic entry metadata and related classes. '''
 
 from typing import List, Any
+from enum import Enum
 from cachetools import cached, TTLCache
 from elasticsearch_dsl import analyzer, tokenizer
 
@@ -40,6 +41,20 @@ from .metainfo.workflow import Workflow  # noqa
 from .metainfo.common_experimental import Measurement  # noqa
 
 
+class AuthLevel(int, Enum):
+    '''
+    Used to decorate fields with the authorization level required to edit them (using `a_auth_level`).
+    * `none`: No authorization required
+    * `coauthor`: You must be at least a coauthor of the upload to edit the field.
+    * `main_author`: You must be the main author of the upload to edit the field.
+    * `admin`: You must be admin to edit the field.
+    '''
+    none = 0
+    coauthor = 1
+    main_author = 2
+    admin = 3
+
+
 path_analyzer = analyzer(
     'path_analyzer',
     tokenizer=tokenizer('path_tokenizer', 'pattern', pattern='/'))
@@ -404,13 +419,14 @@ class EntryMetadata(metainfo.MSection):
         a_elasticsearch=Elasticsearch(material_entry_type, metrics=dict(n_uploads='cardinality')))
 
     upload_name = metainfo.Quantity(
-        type=str, categories=[MongoUploadMetadata],
+        type=str, categories=[MongoUploadMetadata, EditableUserMetadata],
         description='The user provided upload name',
         a_elasticsearch=Elasticsearch())
 
     upload_create_time = metainfo.Quantity(
-        type=metainfo.Datetime, categories=[MongoUploadMetadata],
+        type=metainfo.Datetime, categories=[MongoUploadMetadata, EditableUserMetadata],
         description='The date and time when the upload was created in nomad',
+        a_auth_level=AuthLevel.admin,
         a_elasticsearch=Elasticsearch(material_entry_type))
 
     calc_id = metainfo.Quantity(
@@ -421,17 +437,19 @@ class EntryMetadata(metainfo.MSection):
         a_elasticsearch=Elasticsearch(material_entry_type, metrics=dict(n_entries='cardinality')))
 
     calc_hash = metainfo.Quantity(
+        # Note: This attribute is not stored in ES
         type=str,
         description='A raw file content based checksum/hash',
         categories=[MongoEntryMetadata])
 
     entry_create_time = metainfo.Quantity(
-        type=metainfo.Datetime, categories=[MongoEntryMetadata, MongoSystemMetadata],
+        type=metainfo.Datetime, categories=[MongoEntryMetadata, MongoSystemMetadata, EditableUserMetadata],
         description='The date and time when the entry was created in nomad',
-        a_flask=dict(admin_only=True),
+        a_auth_level=AuthLevel.admin,
         a_elasticsearch=Elasticsearch(material_entry_type))
 
     last_edit_time = metainfo.Quantity(
+        # Note: This attribute is not stored in ES
         type=metainfo.Datetime, categories=[MongoEntryMetadata],
         description='The date and time the user metadata was last edited.')
 
@@ -473,7 +491,7 @@ class EntryMetadata(metainfo.MSection):
         a_elasticsearch=Elasticsearch(entry_type))
 
     external_id = metainfo.Quantity(
-        type=str, categories=[MongoEntryMetadata, UserProvidableMetadata],
+        type=str, categories=[MongoEntryMetadata, UserProvidableMetadata, EditableUserMetadata],
         description='''
             A user provided external id. Usually the id for an entry in an external database
             where the data was imported from.
@@ -487,9 +505,9 @@ class EntryMetadata(metainfo.MSection):
         a_elasticsearch=Elasticsearch(material_entry_type))
 
     publish_time = metainfo.Quantity(
-        type=metainfo.Datetime, categories=[MongoUploadMetadata],
+        type=metainfo.Datetime, categories=[MongoUploadMetadata, EditableUserMetadata],
         description='The date and time when the upload was published in nomad',
-        a_flask=dict(admin_only=True),
+        a_auth_level=AuthLevel.admin,
         a_elasticsearch=Elasticsearch(material_entry_type))
 
     with_embargo = metainfo.Quantity(
@@ -498,14 +516,21 @@ class EntryMetadata(metainfo.MSection):
         description='Indicated if this entry is under an embargo',
         a_elasticsearch=Elasticsearch(material_entry_type))
 
+    embargo_length = metainfo.Quantity(
+        # Note: This attribute is not stored in ES
+        type=int, categories=[MongoUploadMetadata, EditableUserMetadata],
+        description='The length of the requested embargo period, in months')
+
     license = metainfo.Quantity(
+        # Note: This attribute is not stored in ES
         type=str,
         description='''
             A short license description (e.g. CC BY 4.0), that refers to the
             license of this entry.
         ''',
         default='CC BY 4.0',
-        categories=[MongoUploadMetadata, EditableUserMetadata])
+        categories=[MongoUploadMetadata, EditableUserMetadata],
+        a_auth_level=AuthLevel.admin)
 
     processed = metainfo.Quantity(
         type=bool, default=False, categories=[MongoEntryMetadata, MongoSystemMetadata],
@@ -546,7 +571,7 @@ class EntryMetadata(metainfo.MSection):
 
     external_db = metainfo.Quantity(
         type=metainfo.MEnum('EELSDB', 'Materials Project', 'AFLOW', 'OQMD'),
-        categories=[MongoUploadMetadata, UserProvidableMetadata],
+        categories=[MongoUploadMetadata, UserProvidableMetadata, EditableUserMetadata],
         description='The repository or external database where the original data resides',
         a_elasticsearch=Elasticsearch(material_entry_type))
 
@@ -560,11 +585,13 @@ class EntryMetadata(metainfo.MSection):
         a_elasticsearch=Elasticsearch(material_entry_type))
 
     main_author = metainfo.Quantity(
-        type=user_reference, categories=[MongoUploadMetadata],
+        type=user_reference, categories=[MongoUploadMetadata, EditableUserMetadata],
         description='The main author of the entry',
+        a_auth_level=AuthLevel.admin,
         a_elasticsearch=Elasticsearch(material_entry_type))
 
     coauthors = metainfo.Quantity(
+        # Note: This attribute is not stored in ES
         type=author_reference, shape=['0..*'], default=[], categories=[MongoUploadMetadata, EditableUserMetadata],
         description='''
             A user provided list of co-authors for the whole upload. These can view and edit the
@@ -572,13 +599,15 @@ class EntryMetadata(metainfo.MSection):
         ''')
 
     entry_coauthors = metainfo.Quantity(
-        type=author_reference, shape=['0..*'], default=[], categories=[MongoEntryMetadata, EditableUserMetadata],
+        # Note: This attribute is not stored in ES
+        type=author_reference, shape=['0..*'], default=[], categories=[MongoEntryMetadata],
         description='''
-            A user provided list of co-authors specific for this entry. Note that normally,
-            coauthors should be set on the upload level.
+            A user provided list of co-authors specific for this entry. This is a legacy field,
+            for new uploads, coauthors should be specified on the upload level only.
         ''')
 
     reviewers = metainfo.Quantity(
+        # Note: This attribute is not stored in ES
         type=user_reference, shape=['0..*'], default=[], categories=[MongoUploadMetadata, EditableUserMetadata],
         description='''
             A user provided list of reviewers. Reviewers can see the whole upload, also if
diff --git a/nomad/metainfo/metainfo.py b/nomad/metainfo/metainfo.py
index 29892598022ec269a6765b0dcc870f0caa03408d..e0e19ace4067a1abeb3624f677ce8e430f11ce25 100644
--- a/nomad/metainfo/metainfo.py
+++ b/nomad/metainfo/metainfo.py
@@ -1356,7 +1356,10 @@ class MSection(metaclass=MObjectMeta):  # TODO find a way to make this a subclas
                 serialize = serialize_dtype
 
             elif isinstance(quantity_type, MEnum):
-                serialize = str
+                def serialize_enum(value):
+                    return None if value is None else str(value)
+
+                serialize = serialize_enum
 
             elif quantity_type == Any:
                 def _serialize(value: Any):
diff --git a/nomad/processing/__init__.py b/nomad/processing/__init__.py
index c3579263c002d4535501ce3ca16d88bebf1af328..7ca03452e256bcd122d42a462adbffdc25321343 100644
--- a/nomad/processing/__init__.py
+++ b/nomad/processing/__init__.py
@@ -62,4 +62,4 @@ classes do represent the processing state, as well as the respective entity.
 
 from nomad.processing.base import (
     app, InvalidId, ProcNotRegistered, ProcessStatus, ProcessAlreadyRunning)
-from nomad.processing.data import Upload, Calc
+from nomad.processing.data import Upload, Calc, MetadataEditRequestHandler
diff --git a/nomad/processing/data.py b/nomad/processing/data.py
index 0833918a3ea24aa686e127692e0f26e6779aa5c2..1afddfbf8282046639207161f695ad8fda95f71b 100644
--- a/nomad/processing/data.py
+++ b/nomad/processing/data.py
@@ -28,11 +28,13 @@ calculations, and files
 
 '''
 
-from typing import cast, Any, List, Tuple, Set, Iterator, Dict, cast, Iterable, Sequence
+from typing import cast, Any, List, Tuple, Set, Iterator, Dict, Iterable, Sequence, Union
 from mongoengine import (
     StringField, DateTimeField, BooleanField, IntField, ListField)
+from pymongo import UpdateOne
 from structlog import wrap_logger
 from contextlib import contextmanager
+import copy
 import os.path
 from datetime import datetime, timedelta
 import hashlib
@@ -41,21 +43,25 @@ import yaml
 import json
 from functools import lru_cache
 import requests
+from fastapi.exceptions import RequestValidationError
+from pydantic.error_wrappers import ErrorWrapper
 
 from nomad import utils, config, infrastructure, search, datamodel, metainfo, parsing, client
 from nomad.files import (
     PathObject, UploadFiles, PublicUploadFiles, StagingUploadFiles, UploadBundle, create_tmp_dir)
-from nomad.processing.base import Proc, process, ProcessStatus, ProcessFailure
+from nomad.processing.base import Proc, process, ProcessStatus, ProcessFailure, ProcessAlreadyRunning
 from nomad.parsing import Parser
 from nomad.parsing.parsers import parser_dict, match_parser
 from nomad.normalizing import normalizers
 from nomad.datamodel import (
     EntryArchive, EntryMetadata, MongoUploadMetadata, MongoEntryMetadata, MongoSystemMetadata,
-    EditableUserMetadata, UserProvidableMetadata, UploadMetadata)
+    EditableUserMetadata, UploadMetadata, AuthLevel)
 from nomad.archive import (
     write_partial_archive_to_mongo, delete_partial_archives_from_mongo)
 from nomad.datamodel.encyclopedia import EncyclopediaMetadata
-
+from nomad.app.v1.models import (
+    MetadataEditRequest, And, Aggregation, TermsAggregation, MetadataPagination, MetadataRequired)
+from nomad.search import update_metadata as es_update_metadata
 
 section_metadata = datamodel.EntryArchive.metadata.name
 section_workflow = datamodel.EntryArchive.workflow.name
@@ -69,12 +75,11 @@ _mongo_entry_metadata = tuple(
 _mongo_system_metadata = tuple(
     quantity.name for quantity in MongoSystemMetadata.m_def.definitions)
 _mongo_entry_metadata_except_system_fields = tuple(
-    field for field in _mongo_entry_metadata if field not in _mongo_system_metadata)
-_editable_metadata: Dict[str, metainfo.Definition] = {}
-_editable_metadata.update(**{
-    quantity.name: quantity for quantity in UserProvidableMetadata.m_def.definitions})
-_editable_metadata.update(**{
-    quantity.name: quantity for quantity in EditableUserMetadata.m_def.definitions})
+    quantity_name for quantity_name in _mongo_entry_metadata
+    if quantity_name not in _mongo_system_metadata)
+_editable_metadata: Dict[str, metainfo.Definition] = {
+    quantity.name: quantity for quantity in EditableUserMetadata.m_def.definitions
+    if isinstance(quantity, metainfo.Quantity)}
 
 
 def _pack_log_event(logger, method_name, event_dict):
@@ -134,6 +139,462 @@ def generate_entry_id(upload_id: str, mainfile: str) -> str:
     return utils.hash(upload_id, mainfile)
 
 
+class MetadataEditRequestHandler:
+    '''
+    Class for handling a request to edit metadata. The request may originate either from
+    metadata files in the raw directory or from a json dictionary complying with the
+    :class:`MetadataEditRequest` format. If the edit request is limited to a specific upload,
+    `upload_id` should be specified (only when this is the case can upload level metadata be edited).
+    '''
+    @classmethod
+    def edit_metadata(
+            cls, edit_request_json: Dict[str, Any], upload_id: str,
+            user: datamodel.User) -> Dict[str, Any]:
+        '''
+        Method to verify and execute a generic request to edit metadata from a certain user.
+        The request is specified as a json dictionary. Optionally, the request could be restricted
+        to a single upload by specifying `upload_id` (this is necessary when editing upload
+        level attributes). If `edit_request_json` has `verify_only` set to True, only
+        verification is carried out (i.e. nothing is actually updated). To just run the
+        verification should be quick in comparison to actually executing the request (which
+        may take some time and requires one or more @process to finish). If the request passes
+        the verification step and `verify_only` is not set to True, we will send the request
+        for execution, by initiating the @process :func:`edit_upload_metadata` for each affected
+        upload.
+
+        The method returns a json dictionary with verified data (references resolved to explicit
+        IDs, list actions always expressed as dicts with "op" and "values", etc), or raises
+        an exception, namely:
+         -  A :class:`ValidationError` if the request json can't be parsed by pydantic
+         -  A :class:`RequestValidationError` with information about validation failures and
+            their location (most errors should be of this type, provided that the json is valid)
+         -  A :class:`ProcessAlreadyRunning` exception if one of the affected uploads has
+            a running process
+         -  Some other type of exception, if something goes wrong unexpectedly (should hopefully
+            never happen)
+        '''
+        logger = utils.get_logger('nomad.processing.edit_metadata')
+        handler = MetadataEditRequestHandler(
+            logger, user, edit_request_json=edit_request_json, upload_id=upload_id)
+        # Validate the request
+        handler.validate_request()  # Should raise errors if something looks wrong
+
+        if not edit_request_json.get('verify_only'):
+            # Check if any of the affected uploads are processing
+            for upload in handler.affected_uploads:
+                upload.reload()
+                if upload.process_running:
+                    raise ProcessAlreadyRunning(f'Upload {upload.upload_id} is currently processing')
+            # Looks good, try to trigger processing
+            for upload in handler.affected_uploads:
+                upload.edit_upload_metadata(edit_request_json, user.user_id)  # Trigger the process
+        # All went well, return a verified json as response
+        verified_json = copy.deepcopy(handler.edit_request_json)
+        verified_json['metadata'] = handler.root_metadata
+        verified_json['entries'] = handler.entries_metadata
+        return verified_json
+
+    def __init__(
+            self, logger, user: datamodel.User,
+            edit_request_json: Dict[str, Any] = None,
+            upload_files: StagingUploadFiles = None,
+            upload_id: str = None):
+        # Initialization
+        assert user, 'Must specify `user`'
+        assert (edit_request_json is None) != (upload_files is None), (
+            'Must specify either `edit_request` or `upload_files`')
+        self.logger = logger
+        self.user = user
+        self.edit_request_json = edit_request_json
+        self.upload_files = upload_files
+        self.upload_id = upload_id
+
+        self.errors: List[ErrorWrapper] = []  # A list of all encountered errors, if any
+        self.edit_attempt_locs: List[Tuple[str, ...]] = []  # locs where user has attempted to edit something
+        self.required_auth_level = AuthLevel.none  # Maximum required auth level for the edit
+        self.required_auth_level_locs: List[Tuple[str, ...]] = []  # locs where maximal auth level is needed
+        self.encountered_users: Dict[str, str] = {}  # { ref: user_id | None }, ref = user_id | username | email
+        self.encountered_datasets: Dict[str, datamodel.Dataset] = {}  # { ref : dataset | None }, ref = dataset_id | dataset_name
+        self.root_metadata: Dict[str, Any] = None  # The metadata specified at the top/root level
+
+        # Specific to the MetadataEditRequest case
+        self.edit_request: MetadataEditRequest = None
+        self.affected_uploads: List['Upload'] = None  # A MetadataEditRequest may involve multiple uploads
+        self.entries_metadata: Dict[str, Dict[str, Any]] = {}  # Metadata specified for individual entries
+
+    def validate_metadata_files(self):
+        pass  # TODO
+
+    def validate_request(self):
+        ''' Validates the provided edit_request_json. '''
+        # Validate the request json. Will raise ValidationError if json is malformed
+        self.edit_request = MetadataEditRequest(**self.edit_request_json)
+        try:
+            if not self.upload_id and not self.edit_request.query:
+                return self._loc_error('Must specify `query`', 'query')
+            if self.edit_request.entries and not self.edit_request.entries_key:
+                return self._loc_error('Must specify `entries_key` when specifying `entries`', 'entries_key')
+
+            can_edit_upload_fields = bool(self.upload_id and not self.edit_request.query)
+            if self.edit_request.metadata:
+                self.root_metadata = self._verify_metadata_edit_actions(
+                    self.edit_request_json['metadata'], ('metadata',), can_edit_upload_fields)
+            if self.edit_request.entries:
+                for key, entry_metadata in self.edit_request_json['entries'].items():
+                    verified_metadata = self._verify_metadata_edit_actions(
+                        entry_metadata, ('entries', key), False)
+                    self.entries_metadata[key] = verified_metadata
+
+            if not self.edit_attempt_locs:
+                return self._loc_error('No fields to update specified', 'metadata')
+            if self.required_auth_level == AuthLevel.admin and not self.user.is_admin:
+                for loc in self.required_auth_level_locs:
+                    self._loc_error('Admin rights required', loc)
+                return
+
+            embargo_length: int = self.root_metadata.get('embargo_length')
+            try:
+                self.affected_uploads = self._find_request_uploads()
+            except Exception as e:
+                return self._loc_error('Could not evaluate query: ' + str(e), 'query')
+            if not self.affected_uploads:
+                if self.edit_request.query:
+                    return self._loc_error('No matching entries found', 'query')
+                return self._loc_error('No matching upload found', 'upload_id')
+            for upload in self.affected_uploads:
+                # Check permissions
+                coauthor = upload.coauthors and self.user.user_id in upload.coauthors
+                main_author = self.user.user_id == upload.main_author
+                admin = self.user.is_admin
+                if self.required_auth_level == AuthLevel.coauthor:
+                    has_access = coauthor or main_author or admin
+                elif self.required_auth_level == AuthLevel.main_author:
+                    has_access = main_author or admin
+                elif self.required_auth_level == AuthLevel.admin:
+                    has_access = admin
+                else:
+                    assert False, 'Invalid required_auth_level'  # Should not happen
+                if not has_access:
+                    for loc in self.required_auth_level_locs:
+                        self._loc_error(
+                            f'{self.required_auth_level} access required for upload '
+                            f'{upload.upload_id}', loc)
+                    return
+                # Other checks
+                if embargo_length is not None:
+                    if upload.published and not admin and embargo_length != 0:
+                        self._loc_error(
+                            f'Upload {upload.upload_id} is published, embargo can only be lifted',
+                            ('metadata', 'embargo_length'))
+                if upload.published and not admin:
+                    has_invalid_edit = False
+                    for edit_loc in self.edit_attempt_locs:
+                        if edit_loc[-1] not in ('embargo_length', 'datasets'):
+                            has_invalid_edit = True
+                            self._loc_error(
+                                f'Cannot update, upload {upload.upload_id} is published.', edit_loc)
+                    if has_invalid_edit:
+                        return
+        except Exception as e:
+            # Something unexpected has gone wrong
+            self.logger.error(e)
+            raise
+        finally:
+            if self.errors:
+                raise RequestValidationError(errors=self.errors)
+
+    def get_upload_metadata_to_set(self, upload: 'Upload') -> Dict[str, Any]:
+        '''
+        Returns a dictionary with verified metadata to update on the Upload object. The
+        values have the correct type for mongo. Assumes that the corresponding validation method
+        (i.e. :func:`validate_metadata_files` or :func: `validate_request`) have been run.
+        '''
+        rv: Dict[str, Any] = {}
+        if self.root_metadata:
+            self._applied_mongo_actions(upload, self.root_metadata, rv)
+        return rv
+
+    def get_entry_metadata_to_set(self, upload: 'Upload', entry: 'Calc') -> Dict[str, Any]:
+        '''
+        Returns a dictionary with verified metadata to update on the entry object. The
+        values have the correct type for mongo. Assumes that the corresponding validation method
+        (i.e. :func:`validate_metadata_files` or :func: `validate_request`) have been run.
+        '''
+        rv: Dict[str, Any] = {}
+        if self.root_metadata:
+            self._applied_mongo_actions(entry, self.root_metadata, rv)
+        if self.edit_request:
+            # Source = edit_request
+            if self.entries_metadata:
+                entry_key = self._get_entry_key(entry, self.edit_request.entries_key)
+                entry_metadata = self.entries_metadata.get(entry_key)
+                if entry_metadata:
+                    # We also have actions for this particular entry specified
+                    self._applied_mongo_actions(entry, entry_metadata, rv)
+        else:
+            # Source = metadata files
+            pass  # TODO
+        return rv
+
+    def _loc_error(self, msg: str, loc: Union[str, Tuple[str, ...]]):
+        ''' Registers a located error. '''
+        self.errors.append(ErrorWrapper(Exception(msg), loc=loc))
+        self.logger.error(msg, loc=loc)
+
+    def _verify_metadata_edit_actions(
+            self, metadata_edit_actions: Dict[str, Any], loc: Tuple[str, ...],
+            can_edit_upload_fields: bool, auth_level: AuthLevel = None) -> Dict[str, Any]:
+        '''
+        Performs *basic* validation of a dictionary with metadata edit actions, and returns a
+        dictionary with the same structure, but containing only the *verified* actions. Verified
+        actions are actions that pass validation. Moreover:
+            1)  For actions on lists, the verified action value is always expressed as a
+                list operation (a dictionary with `op` and `values`)
+            2)  User references (which can be specified by a user_id, a username, or an email)
+                are always converted to user_id
+            3)  dataset references (which can be specified either by dataset_id or dataset_name)
+                are always replaced with dataset_ids, and it is verified that none of the
+                datasets has a doi.
+            4)  Only `add` and `remove` operations are allowed for datasets.
+        '''
+        rv = {}
+        for quantity_name, action in metadata_edit_actions.items():
+            if action is not None:
+                success, verified_action = self._verify_metadata_edit_action(
+                    quantity_name, action, loc + (quantity_name,), can_edit_upload_fields, auth_level)
+                if success:
+                    rv[quantity_name] = verified_action
+        return rv
+
+    def _verify_metadata_edit_action(
+            self, quantity_name: str, action: Any, loc: Tuple[str, ...],
+            can_edit_upload_fields: bool, auth_level: AuthLevel) -> Tuple[bool, Any]:
+        '''
+        Performs basic validation of a single action. Returns (success, verified_action).
+        '''
+        definition = _editable_metadata.get(quantity_name)
+        if not definition:
+            self._loc_error('Unknown quantity', loc)
+            return False, None
+
+        self.edit_attempt_locs.append(loc)
+
+        field_auth_level = getattr(definition, 'a_auth_level', AuthLevel.coauthor)
+
+        if auth_level is not None:
+            # Our auth level is known, check it immediately
+            if field_auth_level > auth_level:
+                self._loc_error(f'{field_auth_level} privileges required', loc)
+                return False, None
+        if field_auth_level > self.required_auth_level:
+            self.required_auth_level = field_auth_level
+            self.required_auth_level_locs = [loc]
+        if quantity_name in _mongo_upload_metadata and not can_edit_upload_fields:
+            self._loc_error('Quantity can only be edited on the upload level', loc)
+            return False, None
+
+        try:
+            if definition.is_scalar:
+                return True, self._verified_value(definition, action)
+            else:
+                # We have a non-scalar quantity
+                if type(action) == dict:
+                    # Action is a dict - expected to contain op and values
+                    assert action.keys() == {'op', 'values'}, 'Expected keys `op` and `values`'
+                    op = action['op']
+                    values = action['values']
+                    assert op in ('set', 'add', 'remove'), 'op should be `set`, `add` or `remove`'
+                    if quantity_name == 'datasets' and op == 'set':
+                        self._loc_error(
+                            'Only `add` and `remove` operations permitted for datasets', loc)
+                        return False, None
+                else:
+                    op = 'set'
+                    values = action
+                    if quantity_name == 'datasets':
+                        op = 'add'  # Just specifying a list will be interpreted as add, rather than fail.
+                values = values if type(values) == list else [values]
+                verified_values = [self._verified_value(definition, v) for v in values]
+                return True, dict(op=op, values=verified_values)
+        except Exception as e:
+            self._loc_error(str(e), loc)
+            return False, None
+
+    def _verified_value(
+            self, definition: metainfo.Definition, value: Any) -> Any:
+        '''
+        Verifies a *singular* action value (i.e. for list quantities we should run this method
+        for each value in the list, not with the list itself as input). Returns the verified
+        value, which may be different from the origial value. It:
+            1) ensures a return value of a primitive type (str, int, float, bool or None),
+            2) that user refs exist,
+            3) that datasets refs exist and do not have a doi.
+            4) Translates user refs to user_id and dataset refs to dataset_id, if needed.
+        Raises exception in case of failures.
+        '''
+        if definition.type in (str, int, float, bool):
+            assert value is None or type(value) == definition.type, f'Expected a {definition.type.__name__}'
+            if definition.name == 'embargo_length':
+                assert 0 <= value <= 36, 'Value should be between 0 and 36'
+            return None if value == '' else value
+        elif definition.type == metainfo.Datetime:
+            if value is not None:
+                datetime.fromisoformat(value)  # Throws exception if badly formatted timestamp
+            return None if value == '' else value
+        elif isinstance(definition.type, metainfo.MEnum):
+            assert type(value) == str, 'Expected a string value'
+            if value == '':
+                return None
+            assert value in definition.type._values, f'Bad enum value {value}'
+            return value
+        elif isinstance(definition.type, metainfo.Reference):
+            assert type(value) == str, 'Expected a string value'
+            reference_type = definition.type.target_section_def.section_cls
+            if reference_type in [datamodel.User, datamodel.Author]:
+                if value in self.encountered_users:
+                    user_id = self.encountered_users[value]
+                else:
+                    # New user reference encountered, try to fetch it
+                    user_id = None
+                    try:
+                        user_id = datamodel.User.get(user_id=value).user_id
+                    except KeyError:
+                        try:
+                            user_id = datamodel.User.get(username=value).user_id
+                        except KeyError:
+                            if '@' in value:
+                                try:
+                                    user_id = datamodel.User.get(email=value).user_id
+                                except KeyError:
+                                    pass
+                    self.encountered_users[value] = user_id
+                assert user_id is not None, f'User reference not found: `{value}`'
+                return user_id
+            elif reference_type == datamodel.Dataset:
+                dataset = self._get_dataset(value)
+                assert dataset is not None, f'Dataset reference not found: `{value}`'
+                assert self.user.is_admin or dataset.user_id == self.user.user_id, (
+                    f'Dataset `{value}` does not belong to you')
+                assert not dataset.doi, f'Dataset `{value}` has a doi and cannot be changed'
+                return dataset.dataset_id
+        else:
+            assert False, 'Unhandled value type'  # Should not happen
+
+    def _applied_mongo_actions(
+            self, mongo_doc: Union['Upload', 'Calc'],
+            verified_actions: Dict[str, Any], applied_actions: Dict[str, Any]):
+        '''
+        Calculates the upload or entry level *applied actions*, i.e. key-value pairs with
+        data to set on the provided `mongo_doc` in order to carry out the actions specified
+        by `verified_actions`. The result is added to `applied_actions`.
+        '''
+        for quantity_name, verified_action in verified_actions.items():
+            if isinstance(mongo_doc, Calc) and quantity_name not in _mongo_entry_metadata:
+                continue
+            elif isinstance(mongo_doc, Upload) and quantity_name not in _mongo_upload_metadata:
+                continue
+            applied_actions[quantity_name] = self._applied_mongo_action(
+                mongo_doc, quantity_name, verified_action)
+
+    def _applied_mongo_action(self, mongo_doc, quantity_name: str, verified_action: Any) -> Any:
+        definition = _editable_metadata[quantity_name]
+        if definition.is_scalar:
+            if definition.type == metainfo.Datetime and verified_action:
+                return datetime.fromisoformat(verified_action)
+            return verified_action
+        # Non-scalar property. The verified action should be a dict with op and values
+        op, values = verified_action['op'], verified_action['values']
+        old_list = getattr(mongo_doc, quantity_name, [])
+        new_list = [] if op == 'set' else old_list.copy()
+        for v in values:
+            if op == 'add' or op == 'set':
+                if v not in new_list:
+                    if quantity_name in ('coauthors', 'reviewers') and v == mongo_doc.main_author:
+                        continue  # Prevent adding the main author to coauthors or reviewers
+                    new_list.append(v)
+            elif op == 'remove':
+                if v in new_list:
+                    new_list.remove(v)
+        return new_list
+
+    def _get_entry_key(self, entry: 'Calc', entries_key: str) -> str:
+        if entries_key == 'calc_id' or entries_key == 'entry_id':
+            return entry.calc_id
+        elif entries_key == 'mainfile':
+            return entry.mainfile
+        assert False, f'Invalid entries_key: {entries_key}'
+
+    def _get_dataset(self, ref: str) -> datamodel.Dataset:
+        '''
+        Gets a dataset. They can be identified either using dataset_id or dataset_name, but
+        only datasets belonging to the user can be specified using names. If no matching
+        dataset can be found, None is returned.
+        '''
+        if ref in self.encountered_datasets:
+            return self.encountered_datasets[ref]
+        else:
+            # First time we encounter this ref
+            try:
+                dataset = datamodel.Dataset.m_def.a_mongo.get(dataset_id=ref)
+            except KeyError:
+                try:
+                    dataset = datamodel.Dataset.m_def.a_mongo.get(
+                        user_id=self.user.user_id, dataset_name=ref)
+                except KeyError:
+                    dataset = None
+            self.encountered_datasets[ref] = dataset
+        return dataset
+
+    def _restricted_request_query(self, upload_id: str = None):
+        '''
+        Gets the query of the request, if it has any. If we have a query and if an `upload_id`
+        is specified, we return a modified query, by restricting the original query to this upload.
+        '''
+        query = self.edit_request.query
+        if upload_id and query:
+            # Restrict query to the specified upload
+            return And(**{'and': [{'upload_id': upload_id}, query]})
+        return query
+
+    def _find_request_uploads(self) -> List['Upload']:
+        ''' Returns a list of :class:`Upload`s matching the edit request. '''
+        query = self._restricted_request_query(self.upload_id)
+        if query:
+            # Perform the search, aggregating by upload_id
+            search_response = search.search(
+                user_id=self.user.user_id,
+                owner=self.edit_request.owner,
+                query=query,
+                aggregations=dict(agg=Aggregation(terms=TermsAggregation(quantity='upload_id'))),
+                pagination=MetadataPagination(page_size=0))
+            terms = search_response.aggregations['agg'].terms  # pylint: disable=no-member
+            return [Upload.get(bucket.value) for bucket in terms.data]  # type: ignore
+        elif self.upload_id:
+            # Request just specifies an upload_id, no query
+            try:
+                return [Upload.get(self.upload_id)]
+            except KeyError:
+                pass
+        return []
+
+    def find_request_entries(self, upload: 'Upload') -> Iterable['Calc']:
+        ''' Finds the entries of the specified upload which are effected by the request. '''
+        query = self._restricted_request_query(upload.upload_id)
+        if query:
+            # We have a query. Execute it to get the entries.
+            search_result = search.search_iterator(
+                user_id=self.user.user_id,
+                owner=self.edit_request.owner,
+                query=query,
+                required=MetadataRequired(include=['calc_id']))
+            for result in search_result:
+                yield Calc.get(result['calc_id'])
+        else:
+            # We have no query. Return all entries for the upload
+            for entry in Calc.objects(upload_id=upload.upload_id):
+                yield entry
+
+
 class Calc(Proc):
     '''
     Instances of this class represent calculations. This class manages the elastic
@@ -314,11 +775,11 @@ class Calc(Proc):
         '''
         assert upload.upload_id == self.upload_id, 'Could not apply metadata: upload_id mismatch'
         # Upload metadata
-        for field in _mongo_upload_metadata:
-            setattr(entry_metadata, field, getattr(upload, field))
+        for quantity_name in _mongo_upload_metadata:
+            setattr(entry_metadata, quantity_name, getattr(upload, quantity_name))
         # Entry metadata
-        for field in _mongo_entry_metadata:
-            setattr(entry_metadata, field, getattr(self, field))
+        for quantity_name in _mongo_entry_metadata:
+            setattr(entry_metadata, quantity_name, getattr(self, quantity_name))
         # Special case: domain. May be derivable from mongo, or may have to be read from the archive
         if self.parser_name is not None:
             parser = parser_dict[self.parser_name]
@@ -332,8 +793,8 @@ class Calc(Proc):
         but excluding upload level metadata and system fields (like mainfile, parser_name etc.).
         '''
         entry_metadata_dict = entry_metadata.m_to_dict(include_defaults=True)
-        for field in _mongo_entry_metadata_except_system_fields:
-            setattr(self, field, entry_metadata_dict.get(field))
+        for quantity_name in _mongo_entry_metadata_except_system_fields:
+            setattr(self, quantity_name, entry_metadata_dict.get(quantity_name))
 
     def set_mongo_entry_metadata(self, *args, **kwargs):
         '''
@@ -350,7 +811,7 @@ class Calc(Proc):
                 if key in _mongo_entry_metadata_except_system_fields:
                     setattr(self, key, value)
                 else:
-                    assert False, f'Cannot set metadata field: {key}'
+                    assert False, f'Cannot set metadata quantity: {key}'
 
     def full_entry_metadata(self, upload: 'Upload') -> EntryMetadata:
         '''
@@ -1542,6 +2003,68 @@ class Upload(Proc):
                 with utils.timer(logger, 'index updated'):
                     search.update_metadata(entries_metadata, update_materials=True, refresh=True)
 
+    @process
+    def edit_upload_metadata(self, edit_request_json: Dict[str, Any], user_id: str):
+        '''
+        A @process that executes a metadata edit request, restricted to a specific upload,
+        on behalf of the provided user. The `edit_request_json` should be a json dict of the
+        format specified by the pydantic model :class:`MetadataEditRequest` (we need to use
+        primitive data types, i.e. the json format, to be able to pass the request to a
+        rabbitmq task).
+        '''
+        logger = self.get_logger()
+        user = datamodel.User.get(user_id=user_id)
+        assert not edit_request_json.get('verify_only'), 'Request has verify_only'
+
+        # Validate the request (the @process could have been invoked directly, without previous validation)
+        handler = MetadataEditRequestHandler(
+            logger, user, edit_request_json=edit_request_json, upload_id=self.upload_id)
+        handler.validate_request()  # Should raise errors if something looks wrong
+
+        # Upload level metadata
+        old_with_embargo = self.with_embargo
+        upload_updates = handler.get_upload_metadata_to_set(self)
+        if upload_updates:
+            for quantity_name, mongo_value in upload_updates.items():
+                setattr(self, quantity_name, mongo_value)
+            self.save()
+
+        if self.published and old_with_embargo != self.with_embargo:
+            # Need to repack
+            PublicUploadFiles(self.upload_id).re_pack(with_embargo=self.with_embargo)
+
+        # Entry level metadata
+        last_edit_time = datetime.utcnow()
+        entry_mongo_writes = []
+        updated_metadata: List[datamodel.EntryMetadata] = []
+        for entry in handler.find_request_entries(self):
+            entry_updates = handler.get_entry_metadata_to_set(self, entry)
+            entry_updates['last_edit_time'] = last_edit_time
+            # Add mongo entry update operation to bulk write list
+            entry_mongo_writes.append(UpdateOne({'_id': entry.calc_id}, {'$set': entry_updates}))
+            # Create updates for ES
+            entry_metadata = entry.mongo_metadata(self)
+            if upload_updates:
+                entry_metadata.m_update_from_dict(upload_updates)
+            entry_metadata.m_update_from_dict(entry_updates)
+            updated_metadata.append(entry_metadata)
+
+        # Update mongo
+        if entry_mongo_writes:
+            with utils.timer(logger, 'Mongo bulk write completed', nupdates=len(entry_mongo_writes)):
+                mongo_result = Calc._get_collection().bulk_write(entry_mongo_writes)
+            mongo_errors = mongo_result.bulk_api_result.get('writeErrors')
+            if mongo_errors:
+                return self.fail(
+                    f'Failed to update mongo! {len(mongo_errors)} failures, first is {mongo_errors[0]}')
+        # Update ES
+        if updated_metadata:
+            with utils.timer(logger, 'ES updated', nupdates=len(updated_metadata)):
+                failed_es = es_update_metadata(updated_metadata, update_materials=True, refresh=True)
+
+            if failed_es > 0:
+                return self.fail(f'Failed to update ES, there were {failed_es} fails')
+
     def entry_ids(self) -> List[str]:
         return [calc.calc_id for calc in Calc.objects(upload_id=self.upload_id)]
 
@@ -1733,7 +2256,7 @@ class Upload(Proc):
                 if not self.oasis_deployment_id:
                     self.oasis_deployment_id = source_deployment_id
                     # Note, if oasis_deployment_id is set in the bundle_info, we keep this
-                    # field as it is, since it indicates that the upload has been importet from
+                    # value as it is, since it indicates that the upload has been imported from
                     # somewhere else originally (i.e. source_deployment_id would not be the
                     # original source)
 
@@ -1747,17 +2270,19 @@ class Upload(Proc):
                     check_user_ids([dataset_dict['user_id']], 'Invalid dataset creator id: {id}')
                     dataset_id = dataset_dict['dataset_id']
                     try:
-                        existing_dataset = datamodel.Dataset.m_def.a_mongo.get(dataset_name=dataset_dict['dataset_name'])
-                        # Dataset by the given dataset_name already exists
-                        assert existing_dataset.user_id == dataset_dict['user_id'], (
-                            'A dataset with the same dataset_name but different creator exists')
+                        existing_dataset = datamodel.Dataset.m_def.a_mongo.get(
+                            user_id=dataset_dict['user_id'],
+                            dataset_name=dataset_dict['dataset_name'])
+                        # Dataset by the given dataset_name and user_id already exists
                         dataset_id_mapping[dataset_id] = existing_dataset.dataset_id
                         # Note, it may be that a dataset with the same dataset_name and creator
                         # is created in both environments. In that case, we consider them
                         # to be the "same" dataset, even if they do not have the same dataset_id.
                         # Thus, in that case the dataset id needs to be translated.
+                        assert not existing_dataset.doi, (
+                            f'Matched dataset {existing_dataset.dataset_id} has a DOI, cannot be updated')
                     except KeyError:
-                        # Create a new dataset
+                        # Completely new dataset, create it
                         new_dataset = datamodel.Dataset(**dataset_dict)
                         new_dataset.a_mongo.save()
                         new_datasets.append(new_dataset)
@@ -1807,8 +2332,9 @@ class Upload(Proc):
 
             # Validate embargo settings
             if embargo_length is not None:
-                assert 0 <= embargo_length <= 36, 'Invalid embargo_length, must be between 0 and 36 months'
-                self.embargo_length = embargo_length  # Set the flag also on the Upload level
+                self.embargo_length = embargo_length  # Importing with different embargo
+            assert type(self.embargo_length) == int and 0 <= self.embargo_length <= 36, (
+                'Invalid embargo_length, must be between 0 and 36 months')
 
             # Import the files
             bundle.import_upload_files(
diff --git a/nomad/search.py b/nomad/search.py
index 907d1fa338088d91e8dfeae6c5d74b1b293241ba..a7e525f2d17b5f3b89fc29ed0cd059abc9ac67bb 100644
--- a/nomad/search.py
+++ b/nomad/search.py
@@ -32,7 +32,7 @@ update the v1 materials index according to the performed changes. TODO this is o
 partially implemented.
 '''
 
-from typing import Union, List, Iterable, Any, cast, Dict, Generator
+from typing import Union, List, Iterable, Any, cast, Dict, Iterator, Generator
 import json
 import elasticsearch
 from elasticsearch.exceptions import TransportError, RequestError
@@ -1110,6 +1110,35 @@ def search(
     return result
 
 
+def search_iterator(
+        owner: str = 'public',
+        query: Union[Query, EsQuery] = None,
+        order_by: str = 'calc_id',
+        required: MetadataRequired = None,
+        aggregations: Dict[str, Aggregation] = {},
+        user_id: str = None,
+        index: Index = entry_index) -> Iterator[Dict[str, Any]]:
+    '''
+    Works like :func:`search`, but returns an iterator for iterating over the results.
+    Consequently, you cannot specify `pagination`, only `order_buy`.
+    '''
+    page_after_value = None
+    while True:
+        response = search(
+            owner=owner, query=query,
+            pagination=MetadataPagination(
+                page_size=100, page_after_value=page_after_value, order_by=order_by),
+            required=required, aggregations=aggregations, user_id=user_id, index=index)
+
+        page_after_value = response.pagination.next_page_after_value
+
+        for result in response.data:
+            yield result
+
+        if page_after_value is None or len(response.data) == 0:
+            break
+
+
 def _index(entries, **kwargs):
     index_entries(entries, **kwargs)
 
diff --git a/tests/app/conftest.py b/tests/app/conftest.py
index ab3ad9c07348987e42699eed9215a567f1ba4314..035d5de7a0608262872e12f901af1376e69e13f5 100644
--- a/tests/app/conftest.py
+++ b/tests/app/conftest.py
@@ -45,14 +45,6 @@ def admin_user_auth(admin_user: User):
     return create_auth_headers(admin_user)
 
 
-@pytest.fixture(scope='module')
-def test_users_dict(test_user, other_test_user, admin_user):
-    return {
-        'test_user': test_user,
-        'other_test_user': other_test_user,
-        'admin_user': admin_user}
-
-
 @pytest.fixture(scope='module')
 def test_auth_dict(
         test_user, other_test_user, admin_user,
diff --git a/tests/app/v1/conftest.py b/tests/app/v1/conftest.py
index f109baa5656341f2afaa65aaf43093bc98b30820..d4a1e15fc4a54469a11bf2d6028395a16e2f1c85 100644
--- a/tests/app/v1/conftest.py
+++ b/tests/app/v1/conftest.py
@@ -17,156 +17,8 @@
 #
 
 import pytest
-import math
-
-from nomad.archive import write_partial_archive_to_mongo
-from nomad.datamodel import OptimadeEntry
-from nomad.processing import ProcessStatus
-
-from tests.utils import ExampleData
 
 
 @pytest.fixture(scope='session')
 def client(api_v1):
     return api_v1
-
-
-@pytest.fixture(scope='module')
-def example_data(elastic_module, raw_files_module, mongo_module, test_user, other_test_user, normalized):
-    '''
-    Provides a couple of uploads and entries including metadata, raw-data, and
-    archive files.
-
-    id_embargo:
-        1 entry, 1 material, published with embargo
-    id_embargo_w_coauthor:
-        1 entry, 1 material, published with embargo and coauthor
-    id_embargo_w_reviewer:
-        1 entry, 1 material, published with embargo and reviewer
-    id_unpublished:
-        1 entry, 1 material, unpublished
-    id_unpublished_w_coauthor:
-        1 entry, 1 material, unpublished with coauthor
-    id_unpublished_w_reviewer:
-        1 entry, 1 material, unpublished with reviewer
-    id_published:
-        23 entries, 6 materials published without embargo
-        partial archive exists only for id_01
-        raw files and archive file for id_02 are missing
-        id_10, id_11 reside in the same directory
-    id_processing:
-        unpublished upload without any entries, in status processing
-    id_empty:
-        unpublished upload without any entries
-    '''
-    data = ExampleData(main_author=test_user)
-
-    # 6 uploads with different combinations of main_type and sub_type
-    for main_type in ('embargo', 'unpublished'):
-        for sub_type in ('', 'w_coauthor', 'w_reviewer'):
-            upload_id = 'id_' + main_type + ('_' if sub_type else '') + sub_type
-            if main_type == 'embargo':
-                published = True
-                embargo_length = 12
-                upload_name = 'name_' + upload_id[3:]
-            else:
-                published = False
-                embargo_length = 0
-                upload_name = None
-            calc_id = upload_id + '_1'
-            coauthors = [other_test_user.user_id] if sub_type == 'w_coauthor' else None
-            reviewers = [other_test_user.user_id] if sub_type == 'w_reviewer' else None
-            data.create_upload(
-                upload_id=upload_id,
-                upload_name=upload_name,
-                coauthors=coauthors,
-                reviewers=reviewers,
-                published=published,
-                embargo_length=embargo_length)
-            data.create_entry(
-                upload_id=upload_id,
-                calc_id=calc_id,
-                material_id=upload_id,
-                mainfile=f'test_content/{calc_id}/mainfile.json')
-
-    # one upload with 23 calcs, published, no embargo
-    data.create_upload(
-        upload_id='id_published',
-        upload_name='name_published',
-        published=True)
-    for i in range(1, 24):
-        entry_id = 'id_%02d' % i
-        material_id = 'id_%02d' % (int(math.floor(i / 4)) + 1)
-        mainfile = 'test_content/subdir/test_entry_%02d/mainfile.json' % i
-        kwargs = dict(optimade=OptimadeEntry(nelements=2, elements=['H', 'O']))
-        if i == 11:
-            mainfile = 'test_content/subdir/test_entry_10/mainfile_11.json'
-        if i == 1:
-            kwargs['pid'] = '123'
-        data.create_entry(
-            upload_id='id_published',
-            calc_id=entry_id,
-            material_id=material_id,
-            mainfile=mainfile,
-            **kwargs)
-
-        if i == 1:
-            archive = data.archives[entry_id]
-            write_partial_archive_to_mongo(archive)
-
-    # one upload, no calcs, still processing
-    data.create_upload(
-        upload_id='id_processing',
-        published=False,
-        process_status=ProcessStatus.RUNNING)
-
-    # one upload, no calcs, unpublished
-    data.create_upload(
-        upload_id='id_empty',
-        published=False)
-
-    data.save(with_files=False)
-    del(data.archives['id_02'])
-    data.save(with_files=True, with_es=False, with_mongo=False)
-
-
-@pytest.fixture(scope='function')
-def example_data_writeable(mongo, test_user, normalized):
-    data = ExampleData(main_author=test_user)
-
-    # one upload with one entry, published
-    data.create_upload(
-        upload_id='id_published_w',
-        published=True,
-        embargo_length=12)
-    data.create_entry(
-        upload_id='id_published_w',
-        calc_id='id_published_w_entry',
-        mainfile='test_content/test_embargo_entry/mainfile.json')
-
-    # one upload with one entry, unpublished
-    data.create_upload(
-        upload_id='id_unpublished_w',
-        published=False,
-        embargo_length=12)
-    data.create_entry(
-        upload_id='id_unpublished_w',
-        calc_id='id_unpublished_w_entry',
-        mainfile='test_content/test_embargo_entry/mainfile.json')
-
-    # one upload, no entries, still processing
-    data.create_upload(
-        upload_id='id_processing_w',
-        published=False,
-        process_status=ProcessStatus.RUNNING)
-
-    # one upload, no entries, unpublished
-    data.create_upload(
-        upload_id='id_empty_w',
-        published=False)
-
-    data.save()
-
-    yield
-
-    data.delete()
diff --git a/tests/app/v1/routers/test_entries.py b/tests/app/v1/routers/test_entries.py
index 0e1d777c4a910f80209d725c6eb4915cba7e5741..b8dd813f4a4ad5b399e6d981b5d17d1cfa141065 100644
--- a/tests/app/v1/routers/test_entries.py
+++ b/tests/app/v1/routers/test_entries.py
@@ -34,7 +34,7 @@ from .common import (
     perform_metadata_test, post_query_test_parameters, get_query_test_parameters,
     perform_owner_test, owner_test_parameters, pagination_test_parameters,
     aggregation_test_parameters)
-from ..conftest import example_data as data  # pylint: disable=unused-import
+from tests.conftest import example_data as data  # pylint: disable=unused-import
 
 '''
 These are the tests for all API operations below ``entries``. The tests are organized
diff --git a/tests/app/v1/routers/test_entries_edit.py b/tests/app/v1/routers/test_entries_edit.py
index 3fb3cea9d25a38782777e10f35fcbe30fc65b2c2..baeb3ef66e5775e364d8fecb440e989357241e50 100644
--- a/tests/app/v1/routers/test_entries_edit.py
+++ b/tests/app/v1/routers/test_entries_edit.py
@@ -17,6 +17,7 @@
 #
 
 import pytest
+from datetime import datetime
 
 from nomad import utils
 from nomad.search import search
@@ -24,6 +25,9 @@ from nomad.datamodel import Dataset
 from nomad import processing as proc
 
 from tests.utils import ExampleData
+from tests.app.v1.routers.common import assert_response
+from tests.processing.test_edit_metadata import (
+    assert_metadata_edited, all_coauthor_entry_metadata, all_admin_entry_metadata)
 
 
 logger = utils.get_logger(__name__)
@@ -82,7 +86,7 @@ class TestEditRepo():
         if verify:
             data.update(verify=verify)
 
-        return self.api.post('entries/edit', headers=self.test_user_auth, json=data)
+        return self.api.post('entries/edit_v0', headers=self.test_user_auth, json=data)
 
     def assert_edit(self, rv, quantity: str, success: bool, message: bool, status_code: int = 200):
         data = rv.json()
@@ -137,10 +141,11 @@ class TestEditRepo():
 
     def test_edit_all_properties(self, test_user, other_test_user):
         edit_data = dict(
-            comment='test_edit_props',
-            references=['http://test', 'http://test2'],
             # reviewers=[other_test_user.user_id],  # TODO: need to set on upload level
-            entry_coauthors=[other_test_user.user_id])
+            # entry_coauthors=[other_test_user.user_id]  # Not editable any more
+            comment='test_edit_props',
+            references=['http://test', 'http://test2'])
+
         rv = self.perform_edit(**edit_data, query=self.query('upload_1'))
         result = rv.json()
         assert rv.status_code == 200, result
@@ -155,18 +160,18 @@ class TestEditRepo():
 
         assert self.mongo(1, comment='test_edit_props')
         assert self.mongo(1, references=['http://test', 'http://test2'])
-        assert self.mongo(1, entry_coauthors=[other_test_user.user_id])
+        # assert self.mongo(1, entry_coauthors=[other_test_user.user_id])
         # assert self.mongo(1, reviewers=[other_test_user.user_id])  TODO: need to be set on upload level
 
         self.assert_elastic(1, comment='test_edit_props')
         self.assert_elastic(1, references=['http://test', 'http://test2'])
-        self.assert_elastic(1, authors=[test_user.user_id, other_test_user.user_id])
+        self.assert_elastic(1, authors=[test_user.user_id])
         # self.assert_elastic(1, viewers=[test_user.user_id, other_test_user.user_id])
 
         edit_data = dict(
             comment='',
-            references=[],
-            entry_coauthors=[])
+            # entry_coauthors=[]
+            references=[])
         rv = self.perform_edit(**edit_data, query=self.query('upload_1'))
         result = rv.json()
         assert rv.status_code == 200
@@ -181,7 +186,7 @@ class TestEditRepo():
 
         assert self.mongo(1, comment=None)
         assert self.mongo(1, references=[])
-        assert self.mongo(1, entry_coauthors=[])
+        # assert self.mongo(1, entry_coauthors=[])
         assert self.mongo(1, reviewers=[])
 
         self.assert_elastic(1, comment=None)
@@ -220,19 +225,20 @@ class TestEditRepo():
         assert not self.mongo(1, comment='test_edit_verify', edited=False)
 
     def test_edit_empty_list(self, other_test_user):
-        rv = self.perform_edit(entry_coauthors=[other_test_user.user_id], query=self.query('upload_1'))
-        self.assert_edit(rv, quantity='entry_coauthors', success=True, message=False)
-        rv = self.perform_edit(entry_coauthors=[], query=self.query('upload_1'))
-        self.assert_edit(rv, quantity='entry_coauthors', success=True, message=False)
-        assert self.mongo(1, entry_coauthors=[])
+        rv = self.perform_edit(references=['a'], query=self.query('upload_1'))
+        self.assert_edit(rv, quantity='references', success=True, message=False)
+        rv = self.perform_edit(references=[], query=self.query('upload_1'))
+        self.assert_edit(rv, quantity='references', success=True, message=False)
+        assert self.mongo(1, references=[])
 
     def test_edit_duplicate_value(self, other_test_user):
-        rv = self.perform_edit(entry_coauthors=[other_test_user.user_id, other_test_user.user_id], query=self.query('upload_1'))
-        self.assert_edit(rv, status_code=400, quantity='entry_coauthors', success=False, message=True)
+        rv = self.perform_edit(references=['a', 'a'], query=self.query('upload_1'))
+        self.assert_edit(rv, status_code=400, quantity='references', success=False, message=True)
 
     def test_edit_main_author_as_coauthor(self, test_user):
-        rv = self.perform_edit(entry_coauthors=[test_user.user_id], query=self.query('upload_1'))
-        self.assert_edit(rv, status_code=400, quantity='entry_coauthors', success=False, message=True)
+        pass
+        # rv = self.perform_edit(entry_coauthors=[test_user.user_id], query=self.query('upload_1'))
+        # self.assert_edit(rv, status_code=400, quantity='entry_coauthors', success=False, message=True)
 
     def test_edit_ds(self):
         rv = self.perform_edit(
@@ -286,14 +292,141 @@ class TestEditRepo():
         assert self.mongo(1, datasets=[new_dataset.dataset_id])
 
     def test_edit_bad_user(self):
-        rv = self.perform_edit(entry_coauthors=['bad_user'], query=self.query('upload_1'))
-        self.assert_edit(rv, status_code=400, quantity='entry_coauthors', success=False, message=True)
+        pass
+        # rv = self.perform_edit(entry_coauthors=['bad_user'], query=self.query('upload_1'))
+        # self.assert_edit(rv, status_code=400, quantity='entry_coauthors', success=False, message=True)
 
     def test_edit_user(self, other_test_user):
-        rv = self.perform_edit(entry_coauthors=[other_test_user.user_id], query=self.query('upload_1'))
-        self.assert_edit(rv, quantity='entry_coauthors', success=True, message=False)
+        pass
+        # rv = self.perform_edit(entry_coauthors=[other_test_user.user_id], query=self.query('upload_1'))
+        # self.assert_edit(rv, quantity='entry_coauthors', success=True, message=False)
 
     @pytest.mark.skip(reason='Not necessary during transition. Fails because main_author is not editable anyways.')
     def test_admin_only(self, other_test_user):
         rv = self.perform_edit(main_author=other_test_user.user_id)
         assert rv.status_code != 200
+
+
+@pytest.mark.parametrize('user, kwargs', [
+    pytest.param(
+        'test_user', dict(
+            query={'upload_id': 'id_unpublished_w'},
+            metadata=all_coauthor_entry_metadata,
+            affected_upload_ids=['id_unpublished_w']),
+        id='edit-all'),
+    pytest.param(
+        'admin_user', dict(
+            query={'upload_id': 'id_published_w'},
+            owner='all',
+            metadata=all_admin_entry_metadata,
+            affected_upload_ids=['id_published_w']),
+        id='protected-admin'),
+    pytest.param(
+        'test_user', dict(
+            query={'upload_id': 'id_unpublished_w'},
+            metadata=all_admin_entry_metadata,
+            expected_error_loc=('metadata', 'entry_create_time')),
+        id='protected-not-admin'),
+    pytest.param(
+        'admin_user', dict(
+            query={'upload_id': 'id_published_w'},
+            owner='all',
+            metadata=dict(comment='test comment'),
+            affected_upload_ids=['id_published_w']),
+        id='published-admin'),
+    pytest.param(
+        'test_user', dict(
+            query={'upload_id': 'id_published_w'},
+            metadata=dict(comment='test comment'),
+            expected_error_loc=('metadata', 'comment')),
+        id='published-not-admin'),
+    pytest.param(
+        None, dict(
+            owner='all',
+            query={'upload_id': 'id_unpublished_w'},
+            metadata=dict(comment='test comment'),
+            expected_status_code=401),
+        id='no-credentials'),
+    pytest.param(
+        'invalid', dict(
+            owner='all',
+            query={'upload_id': 'id_unpublished_w'},
+            metadata=dict(comment='test comment'),
+            expected_status_code=401),
+        id='invalid-credentials'),
+    pytest.param(
+        'other_test_user', dict(
+            query={'upload_id': 'id_unpublished_w'},
+            metadata=dict(comment='test comment'),
+            expected_error_loc=('query',)),
+        id='no-access'),
+    pytest.param(
+        'other_test_user', dict(
+            query={'upload_id': 'id_unpublished_w'},
+            metadata=dict(comment='test comment'),
+            affected_upload_ids=['id_unpublished_w'],
+            add_coauthor=True),
+        id='coauthor-access'),
+    pytest.param(
+        'test_user', dict(
+            query={'and': [{'upload_create_time:gt': '2021-01-01'}, {'published': False}]},
+            metadata=dict(comment='a test comment'),
+            affected_upload_ids=['id_unpublished_w']),
+        id='compound-query-ok'),
+    pytest.param(
+        'test_user', dict(
+            query={'upload_id': 'id_unpublished_w'},
+            metadata=dict(upload_name='a test name'),
+            expected_error_loc=('metadata', 'upload_name')),
+        id='query-cannot-edit-upload-data'),
+    pytest.param(
+        'test_user', dict(
+            query={'upload_create_time:lt': '2021-01-01'},
+            metadata=dict(comment='a test comment'),
+            expected_error_loc=('query',)),
+        id='query-no-results')])
+def test_post_entries_edit(
+        client, proc_infra, example_data_writeable, a_dataset, test_auth_dict, test_users_dict,
+        user, kwargs):
+    '''
+    Note, since the endpoint basically just forwards the request to
+    `MetadataEditRequestHandler.edit_metadata`, we only do very simple verification here,
+    the more extensive testnig is done in `tests.processing.test_edit_metadata`.
+    '''
+    user_auth, _token = test_auth_dict[user]
+    user = test_users_dict.get(user)
+    query = kwargs.get('query')
+    owner = kwargs.get('owner', 'visible')
+    metadata = kwargs.get('metadata')
+    entries = kwargs.get('entries')
+    entries_key = kwargs.get('entries_key')
+    verify_only = kwargs.get('verify_only', False)
+    expected_error_loc = kwargs.get('expected_error_loc')
+    expected_status_code = kwargs.get('expected_status_code')
+    affected_upload_ids = kwargs.get('affected_upload_ids')
+    expected_metadata = kwargs.get('expected_metadata', metadata)
+
+    add_coauthor = kwargs.get('add_coauthor', False)
+    if add_coauthor:
+        upload = proc.Upload.get(affected_upload_ids[0])
+        upload.edit_upload_metadata(
+            edit_request_json={'metadata': {'coauthors': user.user_id}}, user_id=upload.main_author)
+        upload.block_until_complete()
+
+    edit_request_json = dict(
+        query=query, owner=owner, metadata=metadata, entries=entries, entries_key=entries_key,
+        verify_only=verify_only)
+    url = 'entries/edit'
+    edit_start = datetime.utcnow().isoformat()[0:22]
+    response = client.post(url, headers=user_auth, json=edit_request_json)
+    if expected_error_loc:
+        assert_response(response, 422)
+        error_locs = [tuple(d['loc']) for d in response.json()['detail']]
+        assert expected_error_loc in error_locs
+    elif expected_status_code not in (None, 200):
+        assert_response(response, expected_status_code)
+    else:
+        assert_response(response, 200)
+        assert_metadata_edited(
+            user, None, query, metadata, entries, entries_key, verify_only,
+            expected_metadata, affected_upload_ids, edit_start)
diff --git a/tests/app/v1/routers/test_materials.py b/tests/app/v1/routers/test_materials.py
index f0f49ddb2946085df2e65317359891036fc067e8..9014d311568a00e7200798da9a2e863eaa737690 100644
--- a/tests/app/v1/routers/test_materials.py
+++ b/tests/app/v1/routers/test_materials.py
@@ -28,7 +28,7 @@ from .common import (
     perform_metadata_test, perform_owner_test, owner_test_parameters,
     post_query_test_parameters, get_query_test_parameters, pagination_test_parameters,
     aggregation_test_parameters)
-from ..conftest import example_data as data  # pylint: disable=unused-import
+from tests.conftest import example_data as data  # pylint: disable=unused-import
 
 '''
 These are the tests for all API operations below ``entries``. The tests are organized
diff --git a/tests/app/v1/routers/test_uploads.py b/tests/app/v1/routers/test_uploads.py
index 2e8aa0572db4a8a5a797c4f07836f29dd8ee0384..550285c07df0ffb8617e39c04a46676a4250599d 100644
--- a/tests/app/v1/routers/test_uploads.py
+++ b/tests/app/v1/routers/test_uploads.py
@@ -29,6 +29,8 @@ from tests.test_files import (
     example_file_vasp_with_binary, example_file_aux, example_file_corrupt_zip, empty_file,
     assert_upload_files)
 from tests.test_search import assert_search_upload
+from tests.processing.test_edit_metadata import (
+    assert_metadata_edited, all_coauthor_metadata, all_admin_metadata)
 from tests.app.v1.routers.common import assert_response
 from nomad import config, files, infrastructure
 from nomad.processing import Upload, Calc, ProcessStatus
@@ -992,8 +994,7 @@ def test_delete_upload_raw_path(
     pytest.param('invalid', 'id_unpublished_w', dict(upload_name='test_name'), True, 401, id='invalid-credentials-token'),
     pytest.param('other_test_user', 'id_unpublished_w', dict(upload_name='test_name'), False, 401, id='no-access'),
     pytest.param('test_user', 'id_processing_w', dict(upload_name='test_name'), False, 400, id='processing'),
-    pytest.param('test_user', 'id_empty_w', dict(upload_name='test_name'), False, 200, id='empty-upload-ok')]
-)
+    pytest.param('test_user', 'id_empty_w', dict(upload_name='test_name'), False, 200, id='empty-upload-ok')])
 def test_put_upload_metadata(
         client, proc_infra, example_data_writeable, test_auth_dict, test_users_dict,
         user, upload_id, query_args, use_upload_token, expected_status_code):
@@ -1048,6 +1049,133 @@ def test_put_upload_metadata(
                     assert entry_metadata.with_embargo == es_data['with_embargo'] == upload.with_embargo
 
 
+@pytest.mark.parametrize('user, upload_id, kwargs', [
+    pytest.param(
+        'test_user', 'id_unpublished_w', dict(
+            metadata=all_coauthor_metadata),
+        id='edit-all'),
+    pytest.param(
+        'test_user', 'id_published_w', dict(
+            metadata=dict(embargo_length=0)), id='lift-embargo'),
+    pytest.param(
+        'admin_user', 'id_published_w', dict(
+            metadata=all_admin_metadata),
+        id='protected-admin'),
+    pytest.param(
+        'test_user', 'id_unpublished_w', dict(
+            metadata=dict(main_author='lhofstadter'),
+            expected_error_loc=('metadata', 'main_author')),
+        id='protected-not-admin'),
+    pytest.param(
+        'test_user', 'silly_value', dict(
+            metadata=dict(upload_name='test_name'),
+            expected_error_loc=('upload_id',)),
+        id='bad-upload_id'),
+    pytest.param(
+        'admin_user', 'id_published_w', dict(
+            metadata=dict(upload_name='test_name')),
+        id='published-admin'),
+    pytest.param(
+        'test_user', 'id_published_w', dict(
+            metadata=dict(upload_name='test_name'),
+            expected_error_loc=('metadata', 'upload_name')),
+        id='published-not-admin'),
+    pytest.param(
+        None, 'id_unpublished_w', dict(
+            metadata=dict(upload_name='test_name'),
+            expected_status_code=401),
+        id='no-credentials'),
+    pytest.param(
+        'invalid', 'id_unpublished_w', dict(
+            metadata=dict(upload_name='test_name'),
+            expected_status_code=401),
+        id='invalid-credentials'),
+    pytest.param(
+        'other_test_user', 'id_unpublished_w', dict(
+            metadata=dict(upload_name='test_name'),
+            expected_error_loc=('metadata', 'upload_name')),
+        id='no-access'),
+    pytest.param(
+        'other_test_user', 'id_unpublished_w', dict(
+            metadata=dict(upload_name='test_name'),
+            add_coauthor=True),
+        id='coauthor-access'),
+    pytest.param(
+        'test_user', 'id_processing_w', dict(
+            metadata=dict(upload_name='test_name'),
+            expected_status_code=400),
+        id='processing'),
+    pytest.param(
+        'test_user', 'id_empty_w', dict(
+            metadata=dict(upload_name='test_name')),
+        id='empty-upload-ok'),
+    pytest.param(
+        'test_user', 'id_unpublished_w', dict(
+            query={'and': [{'upload_create_time:gt': '2021-01-01'}, {'published': False}]},
+            owner='user',
+            metadata=dict(comment='a test comment')),
+        id='query-ok'),
+    pytest.param(
+        'test_user', 'id_unpublished_w', dict(
+            query={'and': [{'upload_create_time:gt': '2021-01-01'}, {'published': False}]},
+            owner='user',
+            metadata=dict(upload_name='a test name'),
+            expected_error_loc=('metadata', 'upload_name')),
+        id='query-cannot-edit-upload-data'),
+    pytest.param(
+        'test_user', 'id_unpublished_w', dict(
+            query={'upload_create_time:lt': '2021-01-01'},
+            owner='user',
+            metadata=dict(comment='a test comment'),
+            expected_error_loc=('query',)),
+        id='query-no-results')])
+def test_post_upload_edit(
+        client, proc_infra, example_data_writeable, a_dataset, test_auth_dict, test_users_dict,
+        user, upload_id, kwargs):
+    '''
+    Note, since the endpoint basically just forwards the request to
+    `MetadataEditRequestHandler.edit_metadata`, we only do very simple verification here,
+    the more extensive testnig is done in `tests.processing.test_edit_metadata`.
+    '''
+    user_auth, _token = test_auth_dict[user]
+    user = test_users_dict.get(user)
+    query = kwargs.get('query')
+    owner = kwargs.get('owner')
+    metadata = kwargs.get('metadata')
+    entries = kwargs.get('entries')
+    entries_key = kwargs.get('entries_key')
+    verify_only = kwargs.get('verify_only', False)
+    expected_error_loc = kwargs.get('expected_error_loc')
+    expected_status_code = kwargs.get('expected_status_code')
+    affected_upload_ids = kwargs.get('affected_upload_ids', [upload_id])
+    expected_metadata = kwargs.get('expected_metadata', metadata)
+
+    add_coauthor = kwargs.get('add_coauthor', False)
+    if add_coauthor:
+        upload = Upload.get(upload_id)
+        upload.edit_upload_metadata(
+            edit_request_json={'metadata': {'coauthors': user.user_id}}, user_id=upload.main_author)
+        upload.block_until_complete()
+
+    edit_request_json = dict(
+        query=query, owner=owner, metadata=metadata, entries=entries, entries_key=entries_key,
+        verify_only=verify_only)
+    url = f'uploads/{upload_id}/edit'
+    edit_start = datetime.utcnow().isoformat()[0:22]
+    response = client.post(url, headers=user_auth, json=edit_request_json)
+    if expected_error_loc:
+        assert_response(response, 422)
+        error_locs = [tuple(d['loc']) for d in response.json()['detail']]
+        assert expected_error_loc in error_locs
+    elif expected_status_code not in (None, 200):
+        assert_response(response, expected_status_code)
+    else:
+        assert_response(response, 200)
+        assert_metadata_edited(
+            user, upload_id, query, metadata, entries, entries_key, verify_only,
+            expected_metadata, affected_upload_ids, edit_start)
+
+
 @pytest.mark.parametrize('mode, source_path, query_args, user, use_upload_token, test_limit, accept_json, expected_status_code', [
     pytest.param('multipart', example_file_vasp_with_binary, dict(upload_name='test_name'), 'test_user', False, False, True, 200, id='multipart'),
     pytest.param('multipart', example_file_vasp_with_binary, dict(), 'test_user', False, False, True, 200, id='multipart-no-name'),
@@ -1241,7 +1369,7 @@ def test_post_upload_action_publish_to_central_nomad(
                 assert new_calc_metadata_dict[k] == (embargo_length > 0)
             elif k not in (
                     'upload_id', 'calc_id', 'upload_create_time', 'entry_create_time',
-                    'last_processing_time', 'publish_time',
+                    'last_processing_time', 'publish_time', 'embargo_length',
                     'n_quantities', 'quantities'):  # TODO: n_quantities and quantities update problem?
                 assert new_calc_metadata_dict[k] == v, f'Metadata not matching: {k}'
         assert new_calc.datasets == ['dataset_id']
@@ -1284,6 +1412,38 @@ def test_post_upload_action_process(
         assert_processing(client, upload_id, test_auth_dict['test_user'][0], check_files=False, published=True)
 
 
+@pytest.mark.parametrize('upload_id, user, preprocess, expected_status_code', [
+    pytest.param('id_published_w', 'test_user', None, 200, id='ok'),
+    pytest.param('id_published_w', 'other_test_user', None, 401, id='no-access'),
+    pytest.param('id_published_w', 'other_test_user', 'make-coauthor', 200, id='ok-coauthor'),
+    pytest.param('id_published_w', None, None, 401, id='no-credentials'),
+    pytest.param('id_published_w', 'invalid', None, 401, id='invalid-credentials'),
+    pytest.param('id_unpublished_w', 'test_user', None, 400, id='not-published'),
+    pytest.param('id_published_w', 'test_user', 'lift', 400, id='already-lifted')])
+def test_post_upload_action_lift_embargo(
+        client, proc_infra, example_data_writeable, test_auth_dict, test_users_dict,
+        upload_id, user, preprocess, expected_status_code):
+
+    user_auth, __token = test_auth_dict[user]
+    user = test_users_dict.get(user)
+
+    if preprocess:
+        if preprocess == 'lift':
+            metadata = {'embargo_length': 0}
+        elif preprocess == 'make-coauthor':
+            metadata = {'coauthors': user.user_id}
+        upload = Upload.get(upload_id)
+        upload.edit_upload_metadata(dict(metadata=metadata), config.services.admin_user_id)
+        upload.block_until_complete()
+
+    response = perform_post_upload_action(client, user_auth, upload_id, 'lift-embargo')
+    assert_response(response, expected_status_code)
+    if expected_status_code == 200:
+        assert_metadata_edited(
+            user, upload_id, None, None, None, None, False,
+            {'embargo_length': 0}, [upload_id], None)
+
+
 @pytest.mark.parametrize('upload_id, user, expected_status_code', [
     pytest.param('id_unpublished_w', 'test_user', 200, id='delete-own'),
     pytest.param('id_unpublished_w', 'other_test_user', 401, id='delete-others-not-admin'),
@@ -1337,7 +1497,7 @@ def test_get_upload_bundle(
     include_raw_files = query_args.get('include_raw_files', True)
     include_archive_files = query_args.get('include_archive_files', True)
 
-    url = build_url(f'uploads/bundle/{upload_id}', query_args)
+    url = build_url(f'uploads/{upload_id}/bundle', query_args)
     response = perform_get(client, url, user_auth=test_auth_dict[user][0])
     assert_response(response, expected_status_code)
     if expected_status_code == 200:
diff --git a/tests/conftest.py b/tests/conftest.py
index 77f07eba4d800b6247be769d3bf0d7f04bd71356..3096b65f7572f07a2c52a0cd7df4850d81f44823 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -17,6 +17,7 @@
 #
 
 from typing import Tuple, List
+import math
 import pytest
 import logging
 from collections import namedtuple
@@ -24,6 +25,7 @@ from smtpd import SMTPServer
 from threading import Lock, Thread
 import asyncore
 import time
+from datetime import datetime
 import shutil
 import os.path
 import elasticsearch.exceptions
@@ -35,9 +37,10 @@ import os.path
 from fastapi.testclient import TestClient
 
 from nomad import config, infrastructure, processing, utils, datamodel, files
-from nomad.datamodel import User, EntryArchive
+from nomad.datamodel import User, EntryArchive, OptimadeEntry
 from nomad.utils import structlogging
-from nomad.archive import write_archive, read_archive
+from nomad.archive import write_archive, read_archive, write_partial_archive_to_mongo
+from nomad.processing import ProcessStatus
 from nomad.processing.data import generate_entry_id
 from nomad.app.main import app
 
@@ -45,7 +48,7 @@ from tests.parsing import test_parsing
 from tests.normalizing.conftest import run_normalize
 from tests.processing import test_data as test_processing
 from tests.test_files import empty_file, example_file_vasp_with_binary
-from tests.utils import create_template_upload_file, set_upload_entry_metadata, build_url
+from tests.utils import create_template_upload_file, set_upload_entry_metadata, build_url, ExampleData
 
 test_log_level = logging.CRITICAL
 
@@ -373,6 +376,14 @@ def admin_user():
     return User(**test_users[test_user_uuid(0)])
 
 
+@pytest.fixture(scope='module')
+def test_users_dict(test_user, other_test_user, admin_user):
+    return {
+        'test_user': test_user,
+        'other_test_user': other_test_user,
+        'admin_user': admin_user}
+
+
 @pytest.fixture(scope='function')
 def no_warn(caplog):
     caplog.handler.formatter = structlogging.ConsoleFormatter()
@@ -736,6 +747,162 @@ def published_wo_user_metadata(non_empty_processed: processing.Upload) -> proces
     return non_empty_processed
 
 
+@pytest.fixture(scope='module')
+def example_data(elastic_module, raw_files_module, mongo_module, test_user, other_test_user, normalized):
+    '''
+    Provides a couple of uploads and entries including metadata, raw-data, and
+    archive files.
+
+    id_embargo:
+        1 entry, 1 material, published with embargo
+    id_embargo_w_coauthor:
+        1 entry, 1 material, published with embargo and coauthor
+    id_embargo_w_reviewer:
+        1 entry, 1 material, published with embargo and reviewer
+    id_unpublished:
+        1 entry, 1 material, unpublished
+    id_unpublished_w_coauthor:
+        1 entry, 1 material, unpublished with coauthor
+    id_unpublished_w_reviewer:
+        1 entry, 1 material, unpublished with reviewer
+    id_published:
+        23 entries, 6 materials published without embargo
+        partial archive exists only for id_01
+        raw files and archive file for id_02 are missing
+        id_10, id_11 reside in the same directory
+    id_processing:
+        unpublished upload without any entries, in status processing
+    id_empty:
+        unpublished upload without any entries
+    '''
+    data = ExampleData(main_author=test_user)
+
+    # 6 uploads with different combinations of main_type and sub_type
+    for main_type in ('embargo', 'unpublished'):
+        for sub_type in ('', 'w_coauthor', 'w_reviewer'):
+            upload_id = 'id_' + main_type + ('_' if sub_type else '') + sub_type
+            if main_type == 'embargo':
+                published = True
+                embargo_length = 12
+                upload_name = 'name_' + upload_id[3:]
+            else:
+                published = False
+                embargo_length = 0
+                upload_name = None
+            calc_id = upload_id + '_1'
+            coauthors = [other_test_user.user_id] if sub_type == 'w_coauthor' else None
+            reviewers = [other_test_user.user_id] if sub_type == 'w_reviewer' else None
+            data.create_upload(
+                upload_id=upload_id,
+                upload_name=upload_name,
+                coauthors=coauthors,
+                reviewers=reviewers,
+                published=published,
+                embargo_length=embargo_length)
+            data.create_entry(
+                upload_id=upload_id,
+                calc_id=calc_id,
+                material_id=upload_id,
+                mainfile=f'test_content/{calc_id}/mainfile.json')
+
+    # one upload with 23 calcs, published, no embargo
+    data.create_upload(
+        upload_id='id_published',
+        upload_name='name_published',
+        published=True)
+    for i in range(1, 24):
+        entry_id = 'id_%02d' % i
+        material_id = 'id_%02d' % (int(math.floor(i / 4)) + 1)
+        mainfile = 'test_content/subdir/test_entry_%02d/mainfile.json' % i
+        kwargs = dict(optimade=OptimadeEntry(nelements=2, elements=['H', 'O']))
+        if i == 11:
+            mainfile = 'test_content/subdir/test_entry_10/mainfile_11.json'
+        if i == 1:
+            kwargs['pid'] = '123'
+        data.create_entry(
+            upload_id='id_published',
+            calc_id=entry_id,
+            material_id=material_id,
+            mainfile=mainfile,
+            **kwargs)
+
+        if i == 1:
+            archive = data.archives[entry_id]
+            write_partial_archive_to_mongo(archive)
+
+    # one upload, no calcs, still processing
+    data.create_upload(
+        upload_id='id_processing',
+        published=False,
+        process_status=ProcessStatus.RUNNING)
+
+    # one upload, no calcs, unpublished
+    data.create_upload(
+        upload_id='id_empty',
+        published=False)
+
+    data.save(with_files=False)
+    del(data.archives['id_02'])
+    data.save(with_files=True, with_es=False, with_mongo=False)
+
+
+@pytest.fixture(scope='function')
+def example_data_writeable(mongo, test_user, normalized):
+    data = ExampleData(main_author=test_user)
+
+    # one upload with one entry, published
+    data.create_upload(
+        upload_id='id_published_w',
+        published=True,
+        embargo_length=12)
+    data.create_entry(
+        upload_id='id_published_w',
+        calc_id='id_published_w_entry',
+        mainfile='test_content/test_embargo_entry/mainfile.json')
+
+    # one upload with one entry, unpublished
+    data.create_upload(
+        upload_id='id_unpublished_w',
+        published=False,
+        embargo_length=12)
+    data.create_entry(
+        upload_id='id_unpublished_w',
+        calc_id='id_unpublished_w_entry',
+        mainfile='test_content/test_embargo_entry/mainfile.json')
+
+    # one upload, no entries, still processing
+    data.create_upload(
+        upload_id='id_processing_w',
+        published=False,
+        process_status=ProcessStatus.RUNNING)
+
+    # one upload, no entries, unpublished
+    data.create_upload(
+        upload_id='id_empty_w',
+        published=False)
+
+    data.save()
+
+    yield
+
+    data.delete()
+
+
+@pytest.fixture(scope='function')
+def a_dataset(mongo, test_user):
+    now = datetime.utcnow()
+    dataset = datamodel.Dataset(
+        dataset_id=utils.create_uuid(),
+        dataset_name='a dataset',
+        user_id=test_user.user_id,
+        dataset_create_time=now,
+        dataset_modified_time=now,
+        dataset_type='owned')
+    dataset.a_mongo.create()
+    yield dataset
+    dataset.a_mongo.delete()
+
+
 @pytest.fixture
 def reset_config():
     ''' Fixture that resets configuration. '''
diff --git a/tests/processing/test_data.py b/tests/processing/test_data.py
index 513cc7775dac6af9b3088d33c823b08427078f7c..c8e54438c24d16b9d07b2c406aad453bfcc5c3c5 100644
--- a/tests/processing/test_data.py
+++ b/tests/processing/test_data.py
@@ -36,7 +36,6 @@ from nomad.search import search
 
 from tests.test_search import assert_search_upload
 from tests.test_files import assert_upload_files
-from tests.app.conftest import test_users_dict  # pylint: disable=unused-import
 from tests.utils import create_template_upload_file, set_upload_entry_metadata
 
 
@@ -288,7 +287,7 @@ def test_publish_to_central_nomad(
             assert new_calc_metadata_dict[k] == (embargo_length > 0)
         elif k not in (
                 'upload_id', 'calc_id', 'upload_create_time', 'entry_create_time',
-                'last_processing_time', 'publish_time',
+                'last_processing_time', 'publish_time', 'embargo_length',
                 'n_quantities', 'quantities'):  # TODO: n_quantities and quantities update problem?
             assert new_calc_metadata_dict[k] == v, f'Metadata not matching: {k}'
     assert new_calc.datasets == ['dataset_id']
diff --git a/tests/processing/test_edit_metadata.py b/tests/processing/test_edit_metadata.py
new file mode 100644
index 0000000000000000000000000000000000000000..4408e68ac34c1111138cd9ad5625c8326f55f237
--- /dev/null
+++ b/tests/processing/test_edit_metadata.py
@@ -0,0 +1,290 @@
+#
+# Copyright The NOMAD Authors.
+#
+# This file is part of NOMAD. See https://nomad-lab.eu for further info.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import pytest
+from datetime import datetime
+
+from fastapi.exceptions import RequestValidationError
+
+from nomad import datamodel, metainfo
+from nomad.processing import Upload, MetadataEditRequestHandler
+from nomad.processing.data import _editable_metadata, _mongo_upload_metadata
+from nomad.search import search
+
+
+all_coauthor_metadata = dict(
+    # All attributes which a coauthor+ can edit
+    upload_name='a humble upload name',
+    embargo_length=14,
+    coauthors=['lhofstadter'],
+    external_id='31415926536',
+    comment='a humble comment',
+    references=['a reference', 'another reference'],
+    external_db='AFLOW',
+    reviewers=['lhofstadter'],
+    datasets=['a dataset'])
+
+all_coauthor_upload_metadata = {
+    k: v for k, v in all_coauthor_metadata.items() if k in _mongo_upload_metadata}
+
+all_coauthor_entry_metadata = {
+    k: v for k, v in all_coauthor_metadata.items() if k not in _mongo_upload_metadata}
+
+all_admin_metadata = dict(
+    # Every attribute which only admins can set
+    upload_create_time='2021-05-04T11:00:00',
+    entry_create_time='2021-05-04T11:00:00',
+    publish_time='2021-05-04T11:00:00',
+    license='a license',
+    main_author='lhofstadter')
+
+all_admin_entry_metadata = {
+    k: v for k, v in all_admin_metadata.items() if k not in _mongo_upload_metadata}
+
+
+def assert_edit_request(user, **kwargs):
+    # Extract test parameters (lots of defaults)
+    upload_id = kwargs.get('upload_id', 'id_unpublished_w')
+    query = kwargs.get('query')
+    owner = kwargs.get('owner')
+    metadata = kwargs.get('metadata')
+    entries = kwargs.get('entries')
+    entries_key = kwargs.get('entries_key', 'calc_id')
+    verify_only = kwargs.get('verify_only', False)
+    expected_error_loc = kwargs.get('expected_error_loc')
+    affected_upload_ids = kwargs.get('affected_upload_ids', [upload_id])
+    expected_metadata = kwargs.get('expected_metadata', metadata)
+    # Perform edit request
+    edit_request_json = dict(
+        query=query, owner=owner, metadata=metadata, entries=entries, entries_key=entries_key,
+        verify=verify_only)
+    edit_start = datetime.utcnow().isoformat()[0:22]
+    try:
+        MetadataEditRequestHandler.edit_metadata(edit_request_json, upload_id, user)
+    except RequestValidationError as e:
+        error_locs = [error_dict['loc'] for error_dict in e.errors()]
+    # Validate result
+    if expected_error_loc:
+        assert expected_error_loc in error_locs
+    if not expected_error_loc and not verify_only:
+        assert_metadata_edited(
+            user, upload_id, query, metadata, entries, entries_key, verify_only,
+            expected_metadata, affected_upload_ids, edit_start)
+
+
+def assert_metadata_edited(
+        user, upload_id, query, metadata, entries, entries_key, verify_only,
+        expected_metadata, affected_upload_ids, edit_start):
+
+    for upload_id in affected_upload_ids:
+        upload = Upload.get(upload_id)
+        upload.block_until_complete()
+        for entry in upload.calcs:
+            assert entry.last_edit_time
+            assert edit_start is None or entry.last_edit_time.isoformat()[0:22] >= edit_start
+            entry_metadata_mongo = entry.mongo_metadata(upload).m_to_dict()
+            entry_metadata_es = search(owner=None, query={'calc_id': entry.calc_id}).data[0]
+            values_to_check = expected_metadata
+            for quantity_name, value_expected in values_to_check.items():
+                # Note, the expected value is provided on the "request format"
+                quantity = _editable_metadata[quantity_name]
+                if quantity_name == 'embargo_length':
+                    assert upload.embargo_length == value_expected
+                    assert entry_metadata_mongo['embargo_length'] == value_expected
+                    assert entry_metadata_es['with_embargo'] == (value_expected > 0)
+                else:
+                    value_mongo = entry_metadata_mongo.get(quantity_name)
+                    value_es = entry_metadata_es.get(quantity_name)
+                    # coauthors and reviewers are not stored in ES. Instead check viewers and writers
+                    if quantity_name == 'coauthors':
+                        value_es = entry_metadata_es['writers']
+                    elif quantity_name == 'reviewers':
+                        value_es = entry_metadata_es['viewers']
+                    cmp_value_mongo = convert_to_comparable_value(quantity, value_mongo, 'mongo', user)
+                    cmp_value_es = convert_to_comparable_value(quantity, value_es, 'es', user)
+                    cmp_value_expected = convert_to_comparable_value(quantity, value_expected, 'request', user)
+                    # Verify mongo value
+                    assert cmp_value_mongo == cmp_value_expected, f'Wrong mongo value for {quantity_name}'
+                    # Verify ES value
+                    if quantity_name == 'license':
+                        continue  # Not stored indexed by ES
+                    elif quantity_name == 'coauthors':
+                        # Check that writers == main_author + coauthors
+                        assert cmp_value_es == [upload.main_author] + cmp_value_expected, (
+                            f'Wrong es value for {quantity_name}')
+                    elif quantity_name == 'reviewers':
+                        # Check that viewers == main_author + coauthors + reviewers
+                        assert set(cmp_value_es) == set(
+                            [upload.main_author] + (upload.coauthors or []) + cmp_value_expected), (
+                                f'Wrong es value for {quantity_name}')
+                    else:
+                        assert cmp_value_es == cmp_value_expected, f'Wrong es value for {quantity_name}'
+
+
+def convert_to_comparable_value(quantity, value, from_format, user):
+    '''
+    Converts `value` from the given source format ('mongo', 'es', 'request')
+    to a value that can be compared (user_id for user references, dataset_id
+    for datasets, timestamp strings with no more than millisecond precision, etc).
+    List quantities are also guaranteed to be converted to lists.
+    '''
+    if quantity.is_scalar:
+        return convert_to_comparable_value_single(quantity, value, from_format, user)
+    if value is None and from_format == 'es':
+        return []
+    if type(value) != list:
+        value = [value]
+    return [convert_to_comparable_value_single(quantity, v, from_format, user) for v in value]
+
+
+def convert_to_comparable_value_single(quantity, value, format, user):
+    if quantity.type in (str, int, float, bool) or isinstance(quantity.type, metainfo.MEnum):
+        if value == '' and format == 'request':
+            return None
+        return value
+    elif quantity.type == metainfo.Datetime:
+        if not value:
+            return None
+        return value[0:22]  # Only compare to the millisecond level (mongo's maximal precision)
+    elif isinstance(quantity.type, metainfo.Reference):
+        # Should be reference
+        verify_reference = quantity.type.target_section_def.section_cls
+        if verify_reference in [datamodel.User, datamodel.Author]:
+            if format == 'mongo':
+                return value
+            if format == 'es':
+                return value['user_id']
+            elif format == 'request':
+                try:
+                    return datamodel.User.get(user_id=value).user_id
+                except KeyError:
+                    try:
+                        return datamodel.User.get(username=value).user_id
+                    except KeyError:
+                        return datamodel.User.get(email=value).user_id
+        elif verify_reference == datamodel.Dataset:
+            if format == 'mongo':
+                return value
+            elif format == 'es':
+                return value['dataset_id']
+            elif format == 'request':
+                try:
+                    return datamodel.Dataset.m_def.a_mongo.get(dataset_id=value).dataset_id
+                except KeyError:
+                    return datamodel.Dataset.m_def.a_mongo.get(
+                        user_id=user.user_id, dataset_name=value).dataset_id
+    assert False, 'Unhandled type/source'
+
+
+@pytest.mark.parametrize('kwargs', [
+    pytest.param(
+        dict(
+            metadata=dict(external_db='bad value'),
+            expected_error_loc=('metadata', 'external_db')),
+        id='bad-external_db'),
+    pytest.param(
+        dict(
+            metadata=dict(coauthors='silly value'),
+            expected_error_loc=('metadata', 'coauthors')),
+        id='bad-coauthor-ref'),
+    pytest.param(
+        dict(
+            metadata=dict(reviewers='silly value'),
+            expected_error_loc=('metadata', 'reviewers')),
+        id='bad-reviewer-ref'),
+    pytest.param(
+        dict(
+            metadata=dict(datasets=['silly value']),
+            expected_error_loc=('metadata', 'datasets')),
+        id='bad-dataset-ref'),
+    pytest.param(
+        dict(
+            upload_id='id_published_w',
+            metadata=dict(embargo_length=0)),
+        id='lift-embargo'),
+    pytest.param(
+        dict(
+            query={'and': [{'upload_create_time:gt': '2021-01-01'}, {'published': False}]},
+            owner='user',
+            upload_id=None,
+            metadata=dict(comment='new comment'),
+            affected_upload_ids=['id_unpublished_w']),
+        id='query-ok'),
+    pytest.param(
+        dict(
+            query={'upload_create_time:lt': '2021-01-01'},
+            owner='user',
+            upload_id=None,
+            metadata=dict(comment='new comment'),
+            expected_error_loc=('query',)),
+        id='query-no-results'),
+    pytest.param(
+        dict(
+            query={'upload_create_time:gt': '2021-01-01'},
+            owner='user',
+            upload_id=None,
+            metadata=dict(comment='new comment'),
+            expected_error_loc=('metadata', 'comment')),
+        id='query-contains-published')])
+def test_edit_metadata(proc_infra, purged_app, example_data_writeable, a_dataset, test_users_dict, kwargs):
+    kwargs['user'] = test_users_dict[kwargs.get('user', 'test_user')]
+    assert_edit_request(**kwargs)
+
+
+def test_set_and_clear_all(proc_infra, example_data_writeable, a_dataset, test_user):
+    # Set all fields a coauthor can set
+    assert_edit_request(
+        user=test_user,
+        metadata=all_coauthor_metadata)
+    # Clear all fields that can be cleared with a 'set' operation
+    # = all of the above, except embargo_length and datasets
+    assert_edit_request(
+        user=test_user,
+        metadata=dict(
+            upload_name='',
+            coauthors=[],
+            external_id='',
+            comment='',
+            references=[],
+            external_db='',
+            reviewers=[]))
+
+
+def test_admin_quantities(proc_infra, example_data_writeable, test_user, other_test_user, admin_user):
+    assert_edit_request(
+        user=admin_user, upload_id='id_published_w', metadata=all_admin_metadata)
+    # try to do the same as a non-admin
+    for k, v in all_admin_metadata.items():
+        assert_edit_request(
+            user=test_user, upload_id='id_unpublished_w', metadata={k: v}, expected_error_loc=('metadata', k))
+
+
+def test_query_cannot_set_upload_attributes(proc_infra, example_data_writeable, a_dataset, test_user):
+    query = {'and': [{'upload_create_time:gt': '2021-01-01'}, {'published': False}]}
+    for k, v in all_coauthor_upload_metadata.items():
+        # Attempting to edit an upload level attribute with query should always fail,
+        # regardless of if upload_id is specified
+        for upload_id in (None, 'id_unpublished_w'):
+            assert_edit_request(
+                user=test_user, query=query, owner='user', upload_id=upload_id,
+                metadata={k: v},
+                expected_error_loc=('metadata', k))
+    # Attempting to edit an entry level attribute with query should always succeed
+    assert_edit_request(
+        user=test_user, query=query, owner='user', upload_id=None,
+        metadata=all_coauthor_entry_metadata,
+        affected_upload_ids=['id_unpublished_w'])