Source code for aiida.work.legacy.job_process

# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved.                     #
# This file is part of the AiiDA code.                                    #
#                                                                         #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida_core #
# For further information on the license, see the LICENSE.txt file        #
# For further information please visit http://www.aiida.net               #
###########################################################################

import plum.port as port
import plum.process
import plum.util
from aiida.common.lang import override
from aiida.work.process import Process, DictSchema
from voluptuous import Any



[docs]class JobProcess(Process): CALC_NODE_LABEL = 'calc_node' OPTIONS_INPUT_LABEL = '_options' _CALC_CLASS = None
[docs] @classmethod def build(cls, calc_class): from aiida.orm.data import Data from aiida.orm.computer import Computer def define(cls_, spec): super(JobProcess, cls_).define(spec) # Calculation options options = { "max_wallclock_seconds": int, "resources": dict, "custom_scheduler_commands": unicode, "queue_name": basestring, "computer": Computer, "withmpi": bool, "mpirun_extra_params": Any(list, tuple), "import_sys_environment": bool, "environment_variables": dict, "priority": unicode, "max_memory_kb": int, "prepend_text": unicode, "append_text": unicode, 'parser_name': unicode, } spec.input(cls.OPTIONS_INPUT_LABEL, validator=DictSchema(options)) # Inputs from use methods for k, v in calc_class._use_methods.iteritems(): if v.get('additional_parameter'): spec.input_group(k, help=v.get('docstring', None), valid_type=v['valid_types'], required=False) else: spec.input(k, help=v.get('docstring', None), valid_type=v['valid_types'], required=False) # Outputs spec.dynamic_output(valid_type=Data) class_name = "{}_{}".format( JobProcess.__name__, plum.util.fullname(calc_class)) return type(class_name, (JobProcess,), { Process.define.__name__: classmethod(define), '_CALC_CLASS': calc_class })
[docs] @override def _run(self, **kwargs): from aiida.work.legacy.wait_on import wait_on_job_calculation # Create this wait_on here because there is a check to make sure the # callback function is defined correctly which may cause an assertion, in which # case we shouldn't submit wait_on = wait_on_job_calculation( self.calculation_finished, self.calc.pk) self.calc.submit() return wait_on
[docs] def calculation_finished(self, wait_on): """ The callback function that is called when the remote calculation is finished. :param wait_on: The original WaitOnJobCalculation object. :type wait_on: :class:`aiida.work.legacy.wait_on.WaitOnJobCalculation` """ assert not self.calc._is_running() for label, node in self.calc.get_outputs_dict().iteritems(): self.out(label, node)
[docs] @override def create_db_record(self): return self._CALC_CLASS()
[docs] @override def _on_output_emitted(self, output_port, value, dynamic): # Skip over parent _on_output_emitted because it will try to store stuff # which is already done for us by the Calculation plum.process.Process._on_output_emitted( self, output_port, value, dynamic)
[docs] @override def _setup_db_record(self): """ Link up all the retrospective provenance for this JobCalculation """ from aiida.common.links import LinkType parent_calc = self.get_parent_calc() # Set all the attributes using the setter methods for name, value in self.inputs.get(self.OPTIONS_INPUT_LABEL, {}).iteritems(): if value is not None: getattr(self._calc, "set_{}".format(name))(value) # First get a dictionary of all the inputs to link, this is needed to # deal with things like input groups to_link = {} for name, input in self.get_provenance_inputs_iterator(): if input is None or name is self.OPTIONS_INPUT_LABEL: continue # Call the 'use' methods to set up the data-calc links if self.spec().has_input(name) and \ isinstance(self.spec().get_input(name), port.InputGroupPort): additional =\ self._CALC_CLASS._use_methods[name]['additional_parameter'] for k, v in input.iteritems(): try: getattr(self._calc, 'use_{}'.format(name))(v, **{additional: k}) except AttributeError as exception: raise AttributeError("You have provided for an input the key '{}' but" "the JobCalculation has no such use_{} method".format(name, name)) else: getattr(self._calc, 'use_{}'.format(name))(input) # Get the computer from the code if necessary if self._calc.get_computer() is None and 'code' in self.inputs: code = self.inputs['code'] if not code.is_local(): self._calc.set_computer(code.get_remote_computer()) if parent_calc: self._calc.add_link_from(parent_calc, "CALL", LinkType.CALL) self._add_description_and_label()