aiida.engine.processes.calcjobs package#

Module for the CalcJob process and related utilities.

Submodules#

Implementation of the CalcJob process.

class aiida.engine.processes.calcjobs.calcjob.CalcJob(*args: Any, **kwargs: Any)[source]#

Bases: Process

Implementation of the CalcJob process.

CACHE_VERSION: int | None = None#
KEY_CACHE_VERSION: str = 'cache_version'#
__abstractmethods__ = frozenset({})#
__annotations__ = {'CACHE_VERSION': 'int | None', 'CLASS_NAME': 'str', 'KEY_CACHE_VERSION': 'str', 'SINGLE_OUTPUT_LINKNAME': 'str', 'STATES': 'Optional[Sequence[Type[State]]]', '_STATES_MAP': 'Optional[Dict[Hashable, Type[State]]]', '__called': 'bool', '_auto_persist': 'Optional[Set[str]]', '_cleanups': 'Optional[List[Callable[[], None]]]', '_creation_time': 'Optional[float]', '_event_callbacks': 'Dict[Hashable, List[EVENT_CALLBACK_TYPE]]', '_interrupt_action': 'Optional[futures.CancellableAction]', '_killing': 'Optional[futures.CancellableAction]', '_node': 'Optional[orm.ProcessNode]', '_outputs': 'Dict[str, Any]', '_parsed_inputs': 'Optional[utils.AttributesFrozendict]', '_paused': 'Optional[persistence.SavableFuture]', '_pausing': 'Optional[futures.CancellableAction]', '_pre_paused_status': 'Optional[str]', '_state': 'Optional[State]', '_status': 'Optional[str]', '_uuid': 'Optional[uuid.UUID]', 'link_label_retrieved': 'str'}#
__init__(*args, **kwargs) None[source]#

Construct a CalcJob instance.

Construct the instance only if it is a sub class of CalcJob, otherwise raise InvalidOperation.

See documentation of aiida.engine.Process.

__module__ = 'aiida.engine.processes.calcjobs.calcjob'#
_abc_impl = <_abc._abc_data object>#
_node_class#

alias of CalcJobNode

_perform_dry_run()[source]#

Perform a dry run.

Instead of performing the normal sequence of steps, just the presubmit is called, which will call the method prepare_for_submission of the plugin to generate the input files based on the inputs. Then the upload action is called, but using a normal local transport that will copy the files to a local sandbox folder. The generated input script and the absolute path to the sandbox folder are stored in the dry_run_info attribute of the node of this process.

_perform_import()[source]#

Perform the import of an already completed calculation.

The inputs contained a RemoteData under the key remote_folder signalling that this is not supposed to be run as a normal calculation job, but rather the results are already computed outside of AiiDA and merely need to be imported.

_setup_inputs() None[source]#

Create the links between the input nodes and the ProcessNode that represents this process.

_setup_metadata(metadata: dict) None[source]#

Store the metadata on the ProcessNode.

_setup_version_info() dict[str, Any][source]#

Store relevant plugin version information.

_spec_class#

alias of CalcJobProcessSpec

classmethod define(spec: CalcJobProcessSpec) None[source]#

Define the process specification, including its inputs, outputs and known exit codes.

Ports are added to the metadata input namespace (inherited from the base Process), and a code input Port, a remote_folder output Port and retrieved folder output Port are added.

Parameters:

spec – the calculation job process spec to define.

classmethod get_importer(entry_point_name: str | None = None) CalcJobImporter[source]#

Load the CalcJobImporter associated with this CalcJob if it exists.

By default an importer with the same entry point as the CalcJob will be loaded, however, this can be overridden using the entry_point_name argument.

Parameters:

entry_point_name – optional entry point name of a CalcJobImporter to override the default.

Returns:

the loaded CalcJobImporter.

Raises:

if no importer class could be loaded.

classmethod get_state_classes() Dict[Hashable, Type[State]][source]#

A mapping of the State constants to the corresponding state class.

Overrides the waiting state with the Calcjob specific version.

property node: CalcJobNode#

Return the ProcessNode used by this process to represent itself in the database.

Returns:

instance of sub class of ProcessNode

on_terminated() None[source]#

Cleanup the node by deleting the calulation job state.

Note

This has to be done before calling the super because that will seal the node after we cannot change it

property options: AttributeDict#

Return the options of the metadata that were specified when this process instance was launched.

Returns:

options dictionary

parse(retrieved_temporary_folder: str | None = None, existing_exit_code: ExitCode | None = None) ExitCode[source]#

Parse a retrieved job calculation.

This is called once it’s finished waiting for the calculation to be finished and the data has been retrieved.

Parameters:

retrieved_temporary_folder – The path to the temporary folder

parse_retrieved_output(retrieved_temporary_folder: str | None = None) ExitCode | None[source]#

Parse the retrieved data by calling the parser plugin if it was defined in the inputs.

parse_scheduler_output(retrieved: Node) ExitCode | None[source]#

Parse the output of the scheduler if that functionality has been implemented for the plugin.

prepare_for_submission(folder: Folder) CalcInfo[source]#

Prepare the calculation for submission.

Convert the input nodes into the corresponding input files in the format that the code will expect. In addition, define and return a CalcInfo instance, which is a simple data structure that contains information for the engine, for example, on what files to copy to the remote machine, what files to retrieve once it has completed, specific scheduler settings and more.

Parameters:

folder – a temporary folder on the local file system.

Returns:

the CalcInfo instance

presubmit(folder: Folder) CalcInfo[source]#

Prepares the calculation folder with all inputs, ready to be copied to the cluster.

Parameters:

folder – a SandboxFolder that can be used to write calculation input files and the scheduling script.

Return calcinfo:

the CalcInfo object containing the information needed by the daemon to handle operations.

run() Stop | int | Wait[source]#

Run the calculation job.

This means invoking the presubmit and storing the temporary folder in the node’s repository. Then we move the process in the Wait state, waiting for the UPLOAD transport task to be started.

Returns:

the Stop command if a dry run, int if the process has an exit status, Wait command if the calcjob is to be uploaded

spec_options = <aiida.engine.processes.ports.PortNamespace object>#
static terminate(exit_code: ExitCode) ExitCode[source]#

Terminate the process immediately and return the given exit code.

This method is called by aiida.engine.processes.calcjobs.tasks.Waiting.execute() if a monitor triggered the job to be terminated and specified the parsing to be skipped. It will construct the running state and tell this method to be run, which returns the given exit code which will cause the process to be terminated.

Parameters:

exit_code – The exit code to return.

Returns:

The provided exit code.

aiida.engine.processes.calcjobs.calcjob.validate_additional_retrieve_list(additional_retrieve_list: Any, _: Any) str | None[source]#

Validate the additional retrieve list.

Returns:

string with error message in case the input is invalid.

aiida.engine.processes.calcjobs.calcjob.validate_calc_job(inputs: Any, ctx: PortNamespace) str | None[source]#

Validate the entire set of inputs passed to the CalcJob constructor.

Reasons that will cause this validation to raise an InputValidationError:

  • No Computer has been specified, neither directly in metadata.computer nor indirectly through the Code input

  • The specified computer is not stored

  • The Computer specified in metadata.computer is not the same as that of the specified Code

  • No Code has been specified and no remote_folder input has been specified, i.e. this is no import run

Returns:

string with error message in case the inputs are invalid

aiida.engine.processes.calcjobs.calcjob.validate_monitors(monitors: Any, _: PortNamespace) str | None[source]#

Validate the monitors input namespace.

aiida.engine.processes.calcjobs.calcjob.validate_parser(parser_name: Any, _: PortNamespace) str | None[source]#

Validate the parser.

Returns:

string with error message in case the inputs are invalid

aiida.engine.processes.calcjobs.calcjob.validate_stash_options(stash_options: Any, _: Any) str | None[source]#

Validate the stash options.

Abstract utility class that helps to import calculation jobs completed outside of AiiDA.

class aiida.engine.processes.calcjobs.importer.CalcJobImporter[source]#

Bases: ABC

An abstract class, to define an importer for computations completed outside of AiiDA.

This class is used to import the results of a calculation that was completed outside of AiiDA. The importer is responsible for parsing the output files of the calculation and creating the corresponding AiiDA nodes.

__abstractmethods__ = frozenset({'parse_remote_data'})#
__annotations__ = {}#
__dict__ = mappingproxy({'__module__': 'aiida.engine.processes.calcjobs.importer', '__doc__': 'An abstract class, to define an importer for computations completed outside of AiiDA.\n\n    This class is used to import the results of a calculation that was completed outside of AiiDA.\n    The importer is responsible for parsing the output files of the calculation and creating the\n    corresponding AiiDA nodes.\n    ', 'parse_remote_data': <staticmethod(<function CalcJobImporter.parse_remote_data>)>, '__dict__': <attribute '__dict__' of 'CalcJobImporter' objects>, '__weakref__': <attribute '__weakref__' of 'CalcJobImporter' objects>, '__abstractmethods__': frozenset({'parse_remote_data'}), '_abc_impl': <_abc._abc_data object>, '__annotations__': {}})#
__module__ = 'aiida.engine.processes.calcjobs.importer'#
__weakref__#

list of weak references to the object (if defined)

_abc_impl = <_abc._abc_data object>#
abstract static parse_remote_data(remote_data: RemoteData, **kwargs) Dict[str, Node | Dict][source]#

Parse the input nodes from the files in the provided RemoteData.

Parameters:
  • remote_data – the remote data node containing the raw input files.

  • kwargs – additional keyword arguments to control the parsing process.

Returns:

a dictionary with the parsed inputs nodes that match the input spec of the associated CalcJob.

Module containing utilities and classes relating to job calculations running on systems that require transport.

class aiida.engine.processes.calcjobs.manager.JobManager(transport_queue: TransportQueue)[source]#

Bases: object

A manager for CalcJob submitted to Computer instances.

When a calculation job is submitted to a Computer, it actually uses a specific AuthInfo, which is a computer configured for a User. The JobManager maintains a mapping of JobsList instances for each authinfo that has active calculation jobs. These jobslist instances are then responsible for bundling scheduler updates for all the jobs they maintain (i.e. that all share the same authinfo) and update their status.

As long as a Runner will create a single JobManager instance and use that for its lifetime, the guarantees made by the JobsList about respecting the minimum polling interval of the scheduler will be maintained. Note, however, that since each Runner will create its own job manager, these guarantees only hold per runner.

__dict__ = mappingproxy({'__module__': 'aiida.engine.processes.calcjobs.manager', '__doc__': 'A manager for :py:class:`~aiida.engine.processes.calcjobs.calcjob.CalcJob` submitted to ``Computer`` instances.\n\n    When a calculation job is submitted to a :py:class:`~aiida.orm.computers.Computer`, it actually uses a specific\n    :py:class:`~aiida.orm.authinfos.AuthInfo`, which is a computer configured for a :py:class:`~aiida.orm.users.User`.\n    The ``JobManager`` maintains a mapping of :py:class:`~aiida.engine.processes.calcjobs.manager.JobsList` instances\n    for each authinfo that has active calculation jobs. These jobslist instances are then responsible for bundling\n    scheduler updates for all the jobs they maintain (i.e. that all share the same authinfo) and update their status.\n\n    As long as a :py:class:`~aiida.engine.runners.Runner` will create a single ``JobManager`` instance and use that for\n    its lifetime, the guarantees made by the ``JobsList`` about respecting the minimum polling interval of the scheduler\n    will be maintained. Note, however, that since each ``Runner`` will create its own job manager, these guarantees\n    only hold per runner.\n    ', '__init__': <function JobManager.__init__>, 'get_jobs_list': <function JobManager.get_jobs_list>, 'request_job_info_update': <function JobManager.request_job_info_update>, '__dict__': <attribute '__dict__' of 'JobManager' objects>, '__weakref__': <attribute '__weakref__' of 'JobManager' objects>, '__annotations__': {'_job_lists': "Dict[Hashable, 'JobInfo']"}})#
__init__(transport_queue: TransportQueue) None[source]#
__module__ = 'aiida.engine.processes.calcjobs.manager'#
__weakref__#

list of weak references to the object (if defined)

get_jobs_list(authinfo: AuthInfo) JobsList[source]#

Get or create a new JobLists instance for the given authinfo.

Parameters:

authinfo – the AuthInfo

Returns:

a JobsList instance

request_job_info_update(authinfo: AuthInfo, job_id: Hashable) Iterator[asyncio.Future[JobInfo]][source]#

Get a future that will resolve to information about a given job.

This is a context manager so that if the user leaves the context the request is automatically cancelled.

class aiida.engine.processes.calcjobs.manager.JobsList(authinfo: AuthInfo, transport_queue: TransportQueue, last_updated: float | None = None)[source]#

Bases: object

Manager of calculation jobs submitted with a specific AuthInfo, i.e. computer configured for a specific user.

This container of active calculation jobs is used to update their status periodically in batches, ensuring that even when a lot of jobs are running, the scheduler update command is not triggered for each job individually.

In addition, the Computer for which the AuthInfo is configured, can define a minimum polling interval. This class will guarantee that the time between update calls to the scheduler is larger or equal to that minimum interval.

Note that since each instance operates on a specific authinfo, the guarantees of batching scheduler update calls and the limiting of number of calls per unit time, through the minimum polling interval, is only applicable for jobs launched with that particular authinfo. If multiple authinfo instances with the same computer, have active jobs these limitations are not respected between them, since there is no communication between JobsList instances. See the JobManager for example usage.

__dict__ = mappingproxy({'__module__': 'aiida.engine.processes.calcjobs.manager', '__doc__': 'Manager of calculation jobs submitted with a specific ``AuthInfo``, i.e. computer configured for a specific user.\n\n    This container of active calculation jobs is used to update their status periodically in batches, ensuring that\n    even when a lot of jobs are running, the scheduler update command is not triggered for each job individually.\n\n    In addition, the :py:class:`~aiida.orm.computers.Computer` for which the :py:class:`~aiida.orm.authinfos.AuthInfo`\n    is configured, can define a minimum polling interval. This class will guarantee that the time between update calls\n    to the scheduler is larger or equal to that minimum interval.\n\n    Note that since each instance operates on a specific authinfo, the guarantees of batching scheduler update calls\n    and the limiting of number of calls per unit time, through the minimum polling interval, is only applicable for jobs\n    launched with that particular authinfo. If multiple authinfo instances with the same computer, have active jobs\n    these limitations are not respected between them, since there is no communication between ``JobsList`` instances.\n    See the :py:class:`~aiida.engine.processes.calcjobs.manager.JobManager` for example usage.\n    ', '__init__': <function JobsList.__init__>, 'logger': <property object>, 'get_minimum_update_interval': <function JobsList.get_minimum_update_interval>, 'last_updated': <property object>, '_get_jobs_from_scheduler': <function JobsList._get_jobs_from_scheduler>, '_update_job_info': <function JobsList._update_job_info>, 'request_job_info_update': <function JobsList.request_job_info_update>, '_ensure_updating': <function JobsList._ensure_updating>, '_has_job_state_changed': <staticmethod(<function JobsList._has_job_state_changed>)>, '_get_next_update_delay': <function JobsList._get_next_update_delay>, '_update_requests_outstanding': <function JobsList._update_requests_outstanding>, '_get_jobs_with_scheduler': <function JobsList._get_jobs_with_scheduler>, '__dict__': <attribute '__dict__' of 'JobsList' objects>, '__weakref__': <attribute '__weakref__' of 'JobsList' objects>, '__annotations__': {'_jobs_cache': "Dict[Hashable, 'JobInfo']", '_job_update_requests': 'Dict[Hashable, asyncio.Future]', '_update_handle': 'Optional[asyncio.TimerHandle]'}})#
__init__(authinfo: AuthInfo, transport_queue: TransportQueue, last_updated: float | None = None)[source]#

Construct an instance for the given authinfo and transport queue.

Parameters:
  • authinfo – The authinfo used to check the jobs list

  • transport_queue – A transport queue

  • last_updated – initialize the last updated timestamp

__module__ = 'aiida.engine.processes.calcjobs.manager'#
__weakref__#

list of weak references to the object (if defined)

_ensure_updating() None[source]#

Ensure that we are updating the job list from the remote resource.

This will automatically stop if there are no outstanding requests.

async _get_jobs_from_scheduler() Dict[Hashable, JobInfo][source]#

Get the current jobs list from the scheduler.

Returns:

a mapping of job ids to JobInfo instances

_get_jobs_with_scheduler() List[str][source]#

Get all the jobs that are currently with scheduler.

Returns:

the list of jobs with the scheduler

Return type:

list

_get_next_update_delay() float[source]#

Calculate when we are next allowed to poll the scheduler.

This delay is calculated as the minimum polling interval defined by the authentication info for this instance, minus time elapsed since the last update.

Returns:

delay (in seconds) after which the scheduler may be polled again

static _has_job_state_changed(old: JobInfo | None, new: JobInfo | None) bool[source]#

Return whether the states old and new are different.

async _update_job_info() None[source]#

Update all of the job information objects.

This will set the futures for all pending update requests where the corresponding job has a new status compared to the last update.

_update_requests_outstanding() bool[source]#
get_minimum_update_interval() float[source]#

Get the minimum interval that should be respected between updates of the list.

Returns:

the minimum interval

property last_updated: float | None#

Get the timestamp of when the list was last updated as produced by time.time()

Returns:

The last update point

property logger: Logger#

Return the logger configured for this instance.

Returns:

the logger

request_job_info_update(authinfo: AuthInfo, job_id: Hashable) Iterator[asyncio.Future[JobInfo]][source]#

Request job info about a job when the job next changes state.

If the job is not found in the jobs list at the update, the future will resolve to None.

Parameters:

job_id – job identifier

Returns:

future that will resolve to a JobInfo object when the job changes state

Utilities to define monitor functions for CalcJobs.

class aiida.engine.processes.calcjobs.monitors.CalcJobMonitor(entry_point: str, kwargs: dict[~typing.Any, ~typing.Any] = <factory>, priority: int = 0, minimum_poll_interval: int | None = None, call_timestamp: ~datetime.datetime | None = None, disabled: bool = False)[source]#

Bases: object

Data class representing a monitor for a CalcJob.

__annotations__ = {'call_timestamp': 'datetime | None', 'disabled': 'bool', 'entry_point': 'str', 'kwargs': 'dict[t.Any, t.Any]', 'minimum_poll_interval': 'int | None', 'priority': 'int'}#
__dataclass_fields__ = {'call_timestamp': Field(name='call_timestamp',type='datetime | None',default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'disabled': Field(name='disabled',type='bool',default=False,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'entry_point': Field(name='entry_point',type='str',default=<dataclasses._MISSING_TYPE object>,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'kwargs': Field(name='kwargs',type='dict[t.Any, t.Any]',default=<dataclasses._MISSING_TYPE object>,default_factory=<class 'dict'>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'minimum_poll_interval': Field(name='minimum_poll_interval',type='int | None',default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'priority': Field(name='priority',type='int',default=0,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD)}#
__dataclass_params__ = _DataclassParams(init=True,repr=True,eq=True,order=False,unsafe_hash=False,frozen=False)#
__dict__ = mappingproxy({'__module__': 'aiida.engine.processes.calcjobs.monitors', '__annotations__': {'entry_point': 'str', 'kwargs': 'dict[t.Any, t.Any]', 'priority': 'int', 'minimum_poll_interval': 'int | None', 'call_timestamp': 'datetime | None', 'disabled': 'bool'}, '__doc__': 'Data class representing a monitor for a ``CalcJob``.', 'priority': 0, 'minimum_poll_interval': None, 'call_timestamp': None, 'disabled': False, '__post_init__': <function CalcJobMonitor.__post_init__>, 'validate': <function CalcJobMonitor.validate>, 'load_entry_point': <function CalcJobMonitor.load_entry_point>, '__dict__': <attribute '__dict__' of 'CalcJobMonitor' objects>, '__weakref__': <attribute '__weakref__' of 'CalcJobMonitor' objects>, '__dataclass_params__': _DataclassParams(init=True,repr=True,eq=True,order=False,unsafe_hash=False,frozen=False), '__dataclass_fields__': {'entry_point': Field(name='entry_point',type='str',default=<dataclasses._MISSING_TYPE object>,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'kwargs': Field(name='kwargs',type='dict[t.Any, t.Any]',default=<dataclasses._MISSING_TYPE object>,default_factory=<class 'dict'>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'priority': Field(name='priority',type='int',default=0,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'minimum_poll_interval': Field(name='minimum_poll_interval',type='int | None',default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'call_timestamp': Field(name='call_timestamp',type='datetime | None',default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'disabled': Field(name='disabled',type='bool',default=False,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD)}, '__init__': <function CalcJobMonitor.__init__>, '__repr__': <function CalcJobMonitor.__repr__>, '__eq__': <function CalcJobMonitor.__eq__>, '__hash__': None, '__match_args__': ('entry_point', 'kwargs', 'priority', 'minimum_poll_interval', 'call_timestamp', 'disabled')})#
__eq__(other)#

Return self==value.

__hash__ = None#
__init__(entry_point: str, kwargs: dict[~typing.Any, ~typing.Any] = <factory>, priority: int = 0, minimum_poll_interval: int | None = None, call_timestamp: ~datetime.datetime | None = None, disabled: bool = False) None#
__match_args__ = ('entry_point', 'kwargs', 'priority', 'minimum_poll_interval', 'call_timestamp', 'disabled')#
__module__ = 'aiida.engine.processes.calcjobs.monitors'#
__post_init__()[source]#

Validate the attributes.

__repr__()#

Return repr(self).

__weakref__#

list of weak references to the object (if defined)

call_timestamp: datetime | None = None#

Optional datetime representing the last time this monitor was called.

disabled: bool = False#

If this attribute is set to True the monitor should not be called when monitors are processed.

entry_point: str#

Entry point in the aiida.calculations.monitors group implementing the monitor interface.

kwargs: dict[Any, Any]#

Keyword arguments that will be passed to the monitor when invoked (should be JSON serializable).

load_entry_point() type[source]#

Return the function associated with the entry point of this monitor.

Raises:

EntryPointError – If the entry point does not exist or cannot be loaded.

minimum_poll_interval: int | None = None#

Optional minimum poll interval. If set, subsequent calls should be at least this many seconds apart.

priority: int = 0#

Determines the order in which monitors should be executed in the case of multiple monitors.

validate()[source]#

Validate the monitor.

Raises:
  • EntryPointError – If the entry point does not exist or cannot be loaded.

  • TypeError – If any of the attributes are of the incorrect type.

  • ValueError – If the entry point has an incorrect function signature.

  • ValueError – If the kwargs specified are not recognized by the function associated with the entry point.

class aiida.engine.processes.calcjobs.monitors.CalcJobMonitorAction(value)[source]#

Bases: Enum

The action a engine should undertake as a result of a monitor.

DISABLE_ALL = 'disable-all'#
DISABLE_SELF = 'disable-self'#
KILL = 'kill'#
__module__ = 'aiida.engine.processes.calcjobs.monitors'#
class aiida.engine.processes.calcjobs.monitors.CalcJobMonitorResult(key: str | None = None, message: str | None = None, action: CalcJobMonitorAction = CalcJobMonitorAction.KILL, retrieve: bool = True, parse: bool = True, override_exit_code: bool = True, outputs: dict[str, Any] | None = None)[source]#

Bases: object

Data class representing the result of a monitor.

__annotations__ = {'action': 'CalcJobMonitorAction', 'key': 'str | None', 'message': 'str | None', 'outputs': 'dict[str, t.Any] | None', 'override_exit_code': 'bool', 'parse': 'bool', 'retrieve': 'bool'}#
__dataclass_fields__ = {'action': Field(name='action',type='CalcJobMonitorAction',default=<CalcJobMonitorAction.KILL: 'kill'>,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'key': Field(name='key',type='str | None',default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'message': Field(name='message',type='str | None',default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'outputs': Field(name='outputs',type='dict[str, t.Any] | None',default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'override_exit_code': Field(name='override_exit_code',type='bool',default=True,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'parse': Field(name='parse',type='bool',default=True,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'retrieve': Field(name='retrieve',type='bool',default=True,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD)}#
__dataclass_params__ = _DataclassParams(init=True,repr=True,eq=True,order=False,unsafe_hash=False,frozen=False)#
__dict__ = mappingproxy({'__module__': 'aiida.engine.processes.calcjobs.monitors', '__annotations__': {'key': 'str | None', 'message': 'str | None', 'action': 'CalcJobMonitorAction', 'retrieve': 'bool', 'parse': 'bool', 'override_exit_code': 'bool', 'outputs': 'dict[str, t.Any] | None'}, '__doc__': 'Data class representing the result of a monitor.', 'key': None, 'message': None, 'action': <CalcJobMonitorAction.KILL: 'kill'>, 'retrieve': True, 'parse': True, 'override_exit_code': True, 'outputs': None, '__post_init__': <function CalcJobMonitorResult.__post_init__>, 'validate': <function CalcJobMonitorResult.validate>, '__dict__': <attribute '__dict__' of 'CalcJobMonitorResult' objects>, '__weakref__': <attribute '__weakref__' of 'CalcJobMonitorResult' objects>, '__dataclass_params__': _DataclassParams(init=True,repr=True,eq=True,order=False,unsafe_hash=False,frozen=False), '__dataclass_fields__': {'key': Field(name='key',type='str | None',default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'message': Field(name='message',type='str | None',default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'action': Field(name='action',type='CalcJobMonitorAction',default=<CalcJobMonitorAction.KILL: 'kill'>,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'retrieve': Field(name='retrieve',type='bool',default=True,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'parse': Field(name='parse',type='bool',default=True,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'override_exit_code': Field(name='override_exit_code',type='bool',default=True,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD), 'outputs': Field(name='outputs',type='dict[str, t.Any] | None',default=None,default_factory=<dataclasses._MISSING_TYPE object>,init=True,repr=True,hash=None,compare=True,metadata=mappingproxy({}),kw_only=False,_field_type=_FIELD)}, '__init__': <function CalcJobMonitorResult.__init__>, '__repr__': <function CalcJobMonitorResult.__repr__>, '__eq__': <function CalcJobMonitorResult.__eq__>, '__hash__': None, '__match_args__': ('key', 'message', 'action', 'retrieve', 'parse', 'override_exit_code', 'outputs')})#
__eq__(other)#

Return self==value.

__hash__ = None#
__init__(key: str | None = None, message: str | None = None, action: CalcJobMonitorAction = CalcJobMonitorAction.KILL, retrieve: bool = True, parse: bool = True, override_exit_code: bool = True, outputs: dict[str, Any] | None = None) None#
__match_args__ = ('key', 'message', 'action', 'retrieve', 'parse', 'override_exit_code', 'outputs')#
__module__ = 'aiida.engine.processes.calcjobs.monitors'#
__post_init__()[source]#

Validate the attributes.

__repr__()#

Return repr(self).

__weakref__#

list of weak references to the object (if defined)

action: CalcJobMonitorAction = 'kill'#

The action the engine should take.

key: str | None = None#

Key of the monitor in the monitors input namespace. Will be set automatically by the engine.

message: str | None = None#

Human readable message: could be a warning or an error message.

outputs: dict[str, Any] | None = None#

Optional dictionary of output nodes to be attached to the process.

override_exit_code: bool = True#

If set to False, the engine will keep the exit code returned by the parser.

parse: bool = True#

If set to False, the engine will skip the parsing of the retrieved files, if one was specified in inputs.

retrieve: bool = True#

If set to False, the engine will skip retrieving the output files.

validate()[source]#

Validate the instance.

Raises:
  • TypeError – If any of the attributes are of the incorrect type.

  • ValueError – If parse == True and retrieve == False.

class aiida.engine.processes.calcjobs.monitors.CalcJobMonitors(monitors: dict[str, Dict])[source]#

Bases: object

Collection of CalcJobMonitor instances.

The collection is initialized from a dictionary where the values are the parameters for initializing an instance of CalcJobMonitor, which are stored as an ordered dictionary. The monitors are sorted according to the priority set for the monitors (reversed, i.e., from high to low) and second alphabetically on their key.

The process() method can be called providing an instance of a CalcJobNode and a Transport and it will iterate over the collection of monitors, executing each monitor in order, and stopping on the first to return a CalcJobMonitorResult to pass it up to its caller.

__dict__ = mappingproxy({'__module__': 'aiida.engine.processes.calcjobs.monitors', '__doc__': 'Collection of ``CalcJobMonitor`` instances.\n\n    The collection is initialized from a dictionary where the values are the parameters for initializing an instance of\n    :class:`~aiida.engine.processes.calcjobs.monitors.CalcJobMonitor`, which are stored as an ordered dictionary. The\n    monitors are sorted according to the priority set for the monitors (reversed, i.e., from high to low) and second\n    alphabetically on their key.\n\n    The :meth:`~aiida.engine.processes.calcjobs.monitors.CalcJobMonitors.process` method can be called providing an\n    instance of a ``CalcJobNode`` and a ``Transport`` and it will iterate over the collection of monitors, executing\n    each monitor in order, and stopping on the first to return a ``CalcJobMonitorResult`` to pass it up to its caller.\n    ', '__init__': <function CalcJobMonitors.__init__>, 'monitors': <property object>, 'process': <function CalcJobMonitors.process>, '__dict__': <attribute '__dict__' of 'CalcJobMonitors' objects>, '__weakref__': <attribute '__weakref__' of 'CalcJobMonitors' objects>, '__annotations__': {}})#
__init__(monitors: dict[str, Dict])[source]#

Construct a new instance.

__module__ = 'aiida.engine.processes.calcjobs.monitors'#
__weakref__#

list of weak references to the object (if defined)

property monitors: OrderedDict#

Return an ordered dictionary of the monitor collection.

Monitors are first sorted on their priority (reversed, i.e., from high to low) and second on their key.

Returns:

Ordered dictionary of monitors..

process(node: CalcJobNode, transport: Transport) CalcJobMonitorResult | None[source]#

Call all monitors in order and return the result as one returns anything other than None.

Parameters:
  • node – The node to pass to the monitor invocation.

  • transport – The transport to pass to the monitor invocation.

Returns:

None or a monitor result.

Transport tasks for calculation jobs.

exception aiida.engine.processes.calcjobs.tasks.PreSubmitException[source]#

Bases: Exception

Raise in the do_upload coroutine when an exception is raised in CalcJob.presubmit.

__module__ = 'aiida.engine.processes.calcjobs.tasks'#
__weakref__#

list of weak references to the object (if defined)

class aiida.engine.processes.calcjobs.tasks.Waiting(process: CalcJob, done_callback: Callable[..., Any] | None, msg: str | None = None, data: Any | None = None)[source]#

Bases: Waiting

The waiting state for the CalcJob process.

__annotations__ = {}#
__init__(process: CalcJob, done_callback: Callable[..., Any] | None, msg: str | None = None, data: Any | None = None)[source]#
Parameters:

process – The process this state belongs to

__module__ = 'aiida.engine.processes.calcjobs.tasks'#
_auto_persist: Set[str] | None = {'_command', '_monitor_result', 'data', 'in_state', 'msg'}#
async _kill_job(node, transport_queue) None[source]#

Kill the job.

async _launch_task(coro, *args, **kwargs)[source]#

Launch a coroutine as a task, making sure to make it interruptable.

async _monitor_job(node, transport_queue, monitors) CalcJobMonitorResult | None[source]#

Process job monitors if any were specified as inputs.

async execute() State[source]#

Override the execute coroutine of the base Waiting state.

interrupt(reason: Any) Future | None[source]#

Interrupt the Waiting state by calling interrupt on the transport task InterruptableFuture.

load_instance_state(saved_state, load_context)[source]#
property monitors: CalcJobMonitors | None#

Return the collection of monitors if specified in the inputs.

Returns:

Instance of CalcJobMonitors containing monitors if specified in the process’ input.

parse(retrieved_temporary_folder: str, exit_code: ExitCode | None = None) Running[source]#

Return the Running state that will parse the CalcJob.

Parameters:

retrieved_temporary_folder – temporary folder used in retrieving that can be used during parsing.

property process: CalcJob#
Returns:

The process

retrieve(monitor_result: CalcJobMonitorResult | None = None) Waiting[source]#

Return the Waiting state that will retrieve the CalcJob.

stash(monitor_result: CalcJobMonitorResult | None = None) Waiting[source]#

Return the Waiting state that will stash the CalcJob.

submit() Waiting[source]#

Return the Waiting state that will submit the CalcJob.

update() Waiting[source]#

Return the Waiting state that will update the CalcJob.

upload() Waiting[source]#

Return the Waiting state that will upload the CalcJob.

async aiida.engine.processes.calcjobs.tasks.task_kill_job(node: CalcJobNode, transport_queue: TransportQueue, cancellable: InterruptableFuture)[source]#

Transport task that will attempt to kill a job calculation.

The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will retry after an interval that increases exponentially with the number of retries, for a maximum number of retries. If all retries fail, the task will raise a TransportTaskException

Parameters:
  • node – the node that represents the job calculation

  • transport_queue – the TransportQueue from which to request a Transport

  • cancellable – the cancelled flag that will be queried to determine whether the task was cancelled

Raises:

TransportTaskException if after the maximum number of retries the transport task still excepted

async aiida.engine.processes.calcjobs.tasks.task_monitor_job(node: CalcJobNode, transport_queue: TransportQueue, cancellable: InterruptableFuture, monitors: CalcJobMonitors)[source]#

Transport task that will monitor the job calculation if any monitors have been defined.

The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will retry after an interval that increases exponentially with the number of retries, for a maximum number of retries. If all retries fail, the task will raise a TransportTaskException

Parameters:
  • node – the node that represents the job calculation

  • transport_queue – the TransportQueue from which to request a Transport

  • cancellable – A cancel flag

  • monitors – An instance of CalcJobMonitors holding the collection of monitors to process.

Returns:

True if the tasks was successfully completed, False otherwise

async aiida.engine.processes.calcjobs.tasks.task_retrieve_job(process: CalcJob, transport_queue: TransportQueue, retrieved_temporary_folder: str, cancellable: InterruptableFuture)[source]#

Transport task that will attempt to retrieve all files of a completed job calculation.

The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will retry after an interval that increases exponentially with the number of retries, for a maximum number of retries. If all retries fail, the task will raise a TransportTaskException

Parameters:
  • process – the job calculation

  • transport_queue – the TransportQueue from which to request a Transport

  • retrieved_temporary_folder – the absolute path to a directory to store files

  • cancellable – the cancelled flag that will be queried to determine whether the task was cancelled

Raises:

TransportTaskException if after the maximum number of retries the transport task still excepted

async aiida.engine.processes.calcjobs.tasks.task_stash_job(node: CalcJobNode, transport_queue: TransportQueue, cancellable: InterruptableFuture)[source]#

Transport task that will optionally stash files of a completed job calculation on the remote.

The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will retry after an interval that increases exponentially with the number of retries, for a maximum number of retries. If all retries fail, the task will raise a TransportTaskException

Parameters:
  • node – the node that represents the job calculation

  • transport_queue – the TransportQueue from which to request a Transport

  • cancellable – the cancelled flag that will be queried to determine whether the task was cancelled

Raises:

Return if the tasks was successfully completed

Raises:

TransportTaskException if after the maximum number of retries the transport task still excepted

async aiida.engine.processes.calcjobs.tasks.task_submit_job(node: CalcJobNode, transport_queue: TransportQueue, cancellable: InterruptableFuture)[source]#

Transport task that will attempt to submit a job calculation.

The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will retry after an interval that increases exponentially with the number of retries, for a maximum number of retries. If all retries fail, the task will raise a TransportTaskException

Parameters:
  • node – the node that represents the job calculation

  • transport_queue – the TransportQueue from which to request a Transport

  • cancellable – the cancelled flag that will be queried to determine whether the task was cancelled

Raises:

TransportTaskException if after the maximum number of retries the transport task still excepted

async aiida.engine.processes.calcjobs.tasks.task_update_job(node: CalcJobNode, job_manager, cancellable: InterruptableFuture)[source]#

Transport task that will attempt to update the scheduler status of the job calculation.

The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will retry after an interval that increases exponentially with the number of retries, for a maximum number of retries. If all retries fail, the task will raise a TransportTaskException

Parameters:
  • node – the node that represents the job calculation

  • job_manager – The job manager

  • cancellable – A cancel flag

Returns:

True if the tasks was successfully completed, False otherwise

async aiida.engine.processes.calcjobs.tasks.task_upload_job(process: CalcJob, transport_queue: TransportQueue, cancellable: InterruptableFuture)[source]#

Transport task that will attempt to upload the files of a job calculation to the remote.

The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will retry after an interval that increases exponentially with the number of retries, for a maximum number of retries. If all retries fail, the task will raise a TransportTaskException

Parameters:
  • process – the job calculation

  • transport_queue – the TransportQueue from which to request a Transport

  • cancellable – the cancelled flag that will be queried to determine whether the task was cancelled

Raises:

TransportTaskException if after the maximum number of retries the transport task still excepted