Commit 526632ae authored by Alvin Noe Ladines's avatar Alvin Noe Ladines
Browse files

Add task in workflow

parent 908fb5f9
Pipeline #118576 passed with stages
in 27 minutes and 40 seconds
......@@ -6,7 +6,7 @@ from nomad.metainfo import ( # pylint: disable=unused-import
Reference, derived)
from nomad.datamodel.metainfo.simulation.calculation import Calculation
from nomad.datamodel.metainfo.simulation.run import Run
from nomad.datamodel.metainfo.simulation.system import System
from nomad.datamodel.metainfo.simulation.system import System, Atoms
from .common import FastAccess
......@@ -515,6 +515,7 @@ class GeometryOptimization(MSection):
List of energy_total values gathered from the single configuration
calculations that are a part of the optimization trajectory.
''')
is_converged_geometry = Quantity(
type=bool,
shape=[],
......@@ -1301,6 +1302,55 @@ class SinglePoint(MSection):
''')
class Task(MSection):
'''
Section defining a specific task in the workflow chain. It has an input and an output,
both can either be a workflow or a calculation and their relation is noted in the
description.
'''
m_def = Section(
validate=False)
input_workflow = Quantity(
type=Reference(SectionProxy('Workflow')),
shape=[],
description='''
Reference to the input workflow.
''')
output_workflow = Quantity(
type=Reference(SectionProxy('Workflow')),
shape=[],
description='''
Reference to the output workflow.
''')
input_calculation = Quantity(
type=Reference(Calculation.m_def),
shape=[],
description='''
Reference to the input calculation.
''')
output_calculation = Quantity(
type=Reference(Calculation.m_def),
shape=[],
description='''
Reference to the output calculation.
''')
description = Quantity(
type=str,
shape=[],
description='''
Descibes the relationship between the input and output workflows. For example, if
a single_point workflow uses the relaxed structure from a geometry_optimization as
an input, the description may be "relaxed structure from geometry_optimization
used to calucalate single_point properties"
''')
class Workflow(MSection):
'''
Section containing the results of a workflow.
......@@ -1319,6 +1369,13 @@ class Workflow(MSection):
thermodyanamics, magnetic_ordering
''')
initial_structure = Quantity(
type=Reference(Atoms.m_def),
shape=[],
description='''
Starting structure for geometry optimization.
''')
calculator = Quantity(
type=str,
shape=[],
......@@ -1376,6 +1433,8 @@ class Workflow(MSection):
''',
categories=[FastAccess])
task = SubSection(sub_section=Task.m_def, repeats=True)
single_point = SubSection(
sub_section=SinglePoint.m_def,
# TODO determine if there is a need for this to be a repeating section
......
......@@ -17,11 +17,12 @@
#
import numpy as np
from nomad.datamodel.metainfo.simulation.calculation import Calculation
from nomad.normalizing.normalizer import Normalizer
from nomad.datamodel import EntryArchive
from nomad.datamodel.metainfo.workflow import (
Workflow, SinglePoint, GeometryOptimization, MolecularDynamics, Phonon, Elastic,
Workflow, Task, SinglePoint, GeometryOptimization, MolecularDynamics, Phonon, Elastic,
Thermodynamics)
......@@ -45,6 +46,37 @@ class TaskNormalizer(Normalizer):
run = self.workflow.run_ref
self.run = run if run else entry_archive.run[-1]
def normalize_task(self, kind=None):
if kind is None:
serial_tasks = ['geometry_optimization', 'molecular_dynamics']
kind = 'serial' if self.workflow.type in serial_tasks else 'parallel'
# TODO serialization of external archives does not work.
references = []
if self.workflow.workflows_ref:
references = self.workflow.workflows_ref
elif self.workflow.calculations_ref:
references = self.workflow.calculations_ref
references += [self.workflow]
if kind == 'kind':
references = self.workflow.workflows_ref
references = references if references else self.workflow.calculations_ref
for reference in references:
for n in range(2):
input, output = (self.workflow, reference) if n == 0 else (reference, self.workflow)
sec_task = self.workflow.m_create(Task)
sec_task.input_workflow = input if isinstance(input, Workflow) else None
sec_task.input_calculation = input if isinstance(input, Calculation) else None
sec_task.output_workflow = output if isinstance(output, Workflow) else None
sec_task.output_calculation = output if isinstance(output, Calculation) else None
else:
for n, reference in enumerate(references):
sec_task = self.workflow.m_create(Task)
input = references[-1] if n == 0 else references[n - 1]
sec_task.input_workflow = input if isinstance(input, Workflow) else None
sec_task.input_calculation = input if isinstance(input, Calculation) else None
sec_task.output_workflow = reference if isinstance(reference, Workflow) else None
sec_task.output_calculation = reference if isinstance(reference, Calculation) else None
class SinglePointNormalizer(TaskNormalizer):
def normalize(self):
......@@ -229,6 +261,17 @@ class GeometryOptimizationNormalizer(TaskNormalizer):
if criteria:
self.section.is_converged_geometry = True in criteria
# references to calculations (C)/single point (SP) workflows
# for geometry optimization (GO), calculations/workflows are run in series, i.e.
# input of a step is the preceeding step
# It starts from the input structure in GO and ends from the relaxed calculation
# reported to the the GO workflow
# GO <----------------------------
# | |
# v |
# SP1/C1 -> SP2/C2 -> .... -> SPN/CN
self.normalize_task('serial')
class PhononNormalizer(TaskNormalizer):
def _get_n_imaginary_frequencies(self):
......@@ -255,6 +298,19 @@ class PhononNormalizer(TaskNormalizer):
# get number from bands (not complete as this is not the whole mesh)
self.section.n_imaginary_frequencies = self._get_n_imaginary_frequencies()
# references to calculations/workflows
# for phonon workflow (Ph), calculations/workflows are run in parallel and
# for each run, two tasks are realized, first is when the structure is generated
# by the phonon code (input) and run by the force calculator (output) and the
# second is when the forces (input) are fed to the phonon code (output). We then
# have twice as number of tasks as workflows/calculations.
#
# Ph -------------...
# |^ |^ |^
# v| v| v|
# SP1/C1 SP2/C2 SPN/CN
self.normalize_task('parallel')
class ElasticNormalizer(TaskNormalizer):
def _resolve_mechanical_stability(self):
......
......@@ -69,6 +69,11 @@ def test_geometry_optimization_workflow(workflow_archive):
assert sec_workflow.geometry_optimization.final_force_maximum > 0.0
assert sec_workflow.geometry_optimization.is_converged_geometry
task = sec_workflow.task
assert len(task) == len(sec_workflow.calculations_ref) + 1
assert task[1].input_calculation == sec_workflow.calculations_ref[0]
assert task[-1].output_workflow == sec_workflow
def test_elastic_workflow(workflow_archive):
elastic_archive = workflow_archive(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment