aiida.engine package#

Module with all the internals that make up the engine of aiida-core.

Subpackages#

Submodules#

Exceptions that can be thrown by parts of the workflow engine.

exception aiida.engine.exceptions.PastException[source]#

Bases: AiidaException

Raised when an attempt is made to continue a Process that has already excepted before.

__annotations__ = {}#
__module__ = 'aiida.engine.exceptions'#

Top level functions that can be used to launch a Process.

aiida.engine.launch.await_processes(nodes: Sequence[ProcessNode], wait_interval: int = 1) None[source]#

Run a loop until all processes are terminated.

Parameters:
  • nodes – Sequence of nodes that represent the processes to await.

  • wait_interval – The interval between each iteration of checking the status of all processes.

aiida.engine.launch.run(process: Process | Type[Process] | ProcessBuilder, inputs: dict[str, Any] | None = None, **kwargs: Any) dict[str, Any][source]#

Run the process with the supplied inputs in a local runner that will block until the process is completed.

Parameters:
  • process – the process class or process function to run

  • inputs – the inputs to be passed to the process

Returns:

the outputs of the process

aiida.engine.launch.run_get_node(process: Process | Type[Process] | ProcessBuilder, inputs: dict[str, Any] | None = None, **kwargs: Any) tuple[dict[str, Any], ProcessNode][source]#

Run the process with the supplied inputs in a local runner that will block until the process is completed.

Parameters:
  • process – the process class, instance, builder or function to run

  • inputs – the inputs to be passed to the process

Returns:

tuple of the outputs of the process and the process node

aiida.engine.launch.run_get_pk(process: Process | Type[Process] | ProcessBuilder, inputs: dict[str, Any] | None = None, **kwargs: Any) ResultAndPk[source]#

Run the process with the supplied inputs in a local runner that will block until the process is completed.

Parameters:
  • process – the process class, instance, builder or function to run

  • inputs – the inputs to be passed to the process

Returns:

tuple of the outputs of the process and process node pk

aiida.engine.launch.submit(process: Process | Type[Process] | ProcessBuilder, inputs: dict[str, Any] | None = None, *, wait: bool = False, wait_interval: int = 5, **kwargs: Any) ProcessNode[source]#

Submit the process with the supplied inputs to the daemon immediately returning control to the interpreter.

Parameters:
  • process – the process class, instance or builder to submit

  • inputs – the input dictionary to be passed to the process

  • wait – when set to True, the submission will be blocking and wait for the process to complete at which point the function returns the calculation node.

  • wait_interval – the number of seconds to wait between checking the state of the process when wait=True.

  • kwargs – inputs to be passed to the process. This is an alternative to the positional inputs argument.

Returns:

the calculation node of the process

Definition of AiiDA’s process persister and the necessary object loaders.

class aiida.engine.persistence.AiiDAPersister[source]#

Bases: Persister

Persister to take saved process instance states and persisting them to the database.

__abstractmethods__ = frozenset({})#
__module__ = 'aiida.engine.persistence'#
_abc_impl = <_abc._abc_data object>#
delete_checkpoint(pid: Hashable, tag: str | None = None) None[source]#

Delete a persisted process checkpoint, where no error will be raised if the checkpoint does not exist.

Parameters:
  • pid – the process id of the plumpy.Process

  • tag – optional checkpoint identifier to allow retrieving a specific sub checkpoint

delete_process_checkpoints(pid: Hashable)[source]#

Delete all persisted checkpoints related to the given process id.

Parameters:

pid – the process id of the aiida.engine.processes.process.Process

get_checkpoints()[source]#

Return a list of all the current persisted process checkpoints

Returns:

list of PersistedCheckpoint tuples with element containing the process id and optional checkpoint tag.

get_process_checkpoints(pid: Hashable)[source]#

Return a list of all the current persisted process checkpoints for the specified process.

Parameters:

pid – the process pid

Returns:

list of PersistedCheckpoint tuples with element containing the process id and optional checkpoint tag.

load_checkpoint(pid: Hashable, tag: str | None = None) Bundle[source]#

Load a process from a persisted checkpoint by its process id.

Parameters:
  • pid – the process id of the plumpy.Process

  • tag – optional checkpoint identifier to allow retrieving a specific sub checkpoint

Returns:

a bundle with the process state

Return type:

plumpy.Bundle

Raises:

PersistenceError Raised if there was a problem loading the checkpoint

save_checkpoint(process: Process, tag: str | None = None)[source]#

Persist a Process instance.

Parameters:
  • processaiida.engine.Process

  • tag – optional checkpoint identifier to allow distinguishing multiple checkpoints for the same process

Raises:

PersistenceError Raised if there was a problem saving the checkpoint

class aiida.engine.persistence.ObjectLoader[source]#

Bases: DefaultObjectLoader

Custom object loader for aiida-core.

__abstractmethods__ = frozenset({})#
__module__ = 'aiida.engine.persistence'#
_abc_impl = <_abc._abc_data object>#
load_object(identifier: str) Any[source]#

Attempt to load the object identified by the given identifier.

Note

We override the plumpy.DefaultObjectLoader to be able to throw an ImportError instead of a ValueError which in the context of aiida-core is not as apt, since we are loading classes.

Parameters:

identifier – concatenation of module and resource name

Returns:

loaded object

Raises:

ImportError – if the object cannot be loaded

aiida.engine.persistence.get_object_loader() ObjectLoader[source]#

Return the global AiiDA object loader.

Returns:

The global object loader

Runners that can run and submit processes.

class aiida.engine.runners.ResultAndNode(result, node)[source]#

Bases: NamedTuple

__annotations__ = {'node': ForwardRef('ProcessNode'), 'result': ForwardRef('Dict[str, Any]')}#
__getnewargs__()#

Return self as a plain tuple. Used by copy and pickle.

__match_args__ = ('result', 'node')#
__module__ = 'aiida.engine.runners'#
static __new__(_cls, result: Dict[str, Any], node: ProcessNode)#

Create new instance of ResultAndNode(result, node)

__orig_bases__ = (<function NamedTuple>,)#
__repr__()#

Return a nicely formatted representation string

__slots__ = ()#
_asdict()#

Return a new dict which maps field names to their values.

_field_defaults = {}#
_fields = ('result', 'node')#
classmethod _make(iterable)#

Make a new ResultAndNode object from a sequence or iterable

_replace(**kwds)#

Return a new ResultAndNode object replacing specified fields with new values

node: ProcessNode#

Alias for field number 1

result: Dict[str, Any]#

Alias for field number 0

class aiida.engine.runners.ResultAndPk(result, pk)[source]#

Bases: NamedTuple

__annotations__ = {'pk': ForwardRef('int | None'), 'result': ForwardRef('Dict[str, Any]')}#
__getnewargs__()#

Return self as a plain tuple. Used by copy and pickle.

__match_args__ = ('result', 'pk')#
__module__ = 'aiida.engine.runners'#
static __new__(_cls, result: Dict[str, Any], pk: int | None)#

Create new instance of ResultAndPk(result, pk)

__orig_bases__ = (<function NamedTuple>,)#
__repr__()#

Return a nicely formatted representation string

__slots__ = ()#
_asdict()#

Return a new dict which maps field names to their values.

_field_defaults = {}#
_fields = ('result', 'pk')#
classmethod _make(iterable)#

Make a new ResultAndPk object from a sequence or iterable

_replace(**kwds)#

Return a new ResultAndPk object replacing specified fields with new values

pk: int | None#

Alias for field number 1

result: Dict[str, Any]#

Alias for field number 0

class aiida.engine.runners.Runner(poll_interval: int | float = 0, loop: AbstractEventLoop | None = None, communicator: Communicator | None = None, broker_submit: bool = False, persister: Persister | None = None)[source]#

Bases: object

Class that can launch processes by running in the current interpreter or by submitting them to the daemon.

__annotations__ = {'_closed': 'bool', '_communicator': 'Optional[kiwipy.Communicator]', '_controller': 'Optional[RemoteProcessThreadController]', '_persister': 'Optional[Persister]'}#
__dict__ = mappingproxy({'__module__': 'aiida.engine.runners', '__annotations__': {'_persister': 'Optional[Persister]', '_communicator': 'Optional[kiwipy.Communicator]', '_controller': 'Optional[RemoteProcessThreadController]', '_closed': 'bool'}, '__doc__': 'Class that can launch processes by running in the current interpreter or by submitting them to the daemon.', '_persister': None, '_communicator': None, '_controller': None, '_closed': False, '__init__': <function Runner.__init__>, '__enter__': <function Runner.__enter__>, '__exit__': <function Runner.__exit__>, 'loop': <property object>, 'transport': <property object>, 'persister': <property object>, 'communicator': <property object>, 'plugin_version_provider': <property object>, 'job_manager': <property object>, 'controller': <property object>, 'is_daemon_runner': <property object>, 'is_closed': <function Runner.is_closed>, 'start': <function Runner.start>, 'stop': <function Runner.stop>, 'run_until_complete': <function Runner.run_until_complete>, 'close': <function Runner.close>, 'instantiate_process': <function Runner.instantiate_process>, 'submit': <function Runner.submit>, 'schedule': <function Runner.schedule>, '_run': <function Runner._run>, 'run': <function Runner.run>, 'run_get_node': <function Runner.run_get_node>, 'run_get_pk': <function Runner.run_get_pk>, 'call_on_process_finish': <function Runner.call_on_process_finish>, 'get_process_future': <function Runner.get_process_future>, '_poll_process': <function Runner._poll_process>, '__dict__': <attribute '__dict__' of 'Runner' objects>, '__weakref__': <attribute '__weakref__' of 'Runner' objects>})#
__enter__() Runner[source]#
__exit__(exc_type, exc_val, exc_tb)[source]#
__init__(poll_interval: int | float = 0, loop: AbstractEventLoop | None = None, communicator: Communicator | None = None, broker_submit: bool = False, persister: Persister | None = None)[source]#

Construct a new runner.

Parameters:
  • poll_interval – interval in seconds between polling for status of active sub processes

  • loop – an asyncio event loop, if none is suppled a new one will be created

  • communicator – the communicator to use

  • broker_submit – if True, processes will be submitted to the broker, otherwise they will be scheduled here

  • persister – the persister to use to persist processes

__module__ = 'aiida.engine.runners'#
__weakref__#

list of weak references to the object (if defined)

_closed: bool = False#
_communicator: Communicator | None = None#
_controller: RemoteProcessThreadController | None = None#
_persister: Persister | None = None#
_poll_process(node, callback)[source]#

Check whether the process state of the node is terminated and call the callback or reschedule it.

Parameters:
  • node – the process node

  • callback – callback to be called when process is terminated

_run(process: Process | Type[Process] | ProcessBuilder, inputs: dict[str, Any] | None = None, **kwargs: Any) Tuple[Dict[str, Any], ProcessNode][source]#

Run the process with the supplied inputs in this runner that will block until the process is completed.

The return value will be the results of the completed process

Parameters:
  • process – the process class or process function to run

  • inputs – the inputs to be passed to the process

Returns:

tuple of the outputs of the process and the calculation node

call_on_process_finish(pk: int, callback: Callable[[], Any]) None[source]#

Schedule a callback when the process of the given pk is terminated.

This method will add a broadcast subscriber that will listen for state changes of the target process to be terminated. As a fail-safe, a polling-mechanism is used to check the state of the process, should the broadcast message be missed by the subscriber, in order to prevent the caller to wait indefinitely.

Parameters:
  • pk – pk of the process

  • callback – function to be called upon process termination

close() None[source]#

Close the runner by stopping the loop.

property communicator: Communicator | None#

Get the communicator used by this runner.

property controller: RemoteProcessThreadController | None#

Get the controller used by this runner.

get_process_future(pk: int) ProcessFuture[source]#

Return a future for a process.

The future will have the process node as the result when finished.

Returns:

A future representing the completion of the process node

instantiate_process(process: Process | Type[Process] | ProcessBuilder, **inputs)[source]#
is_closed() bool[source]#
property is_daemon_runner: bool#

Return whether the runner is a daemon runner, which means it submits processes over a broker.

Returns:

True if the runner is a daemon runner

property job_manager: JobManager#
property loop: AbstractEventLoop#

Get the event loop of this runner.

property persister: Persister | None#

Get the persister used by this runner.

property plugin_version_provider: PluginVersionProvider#
run(process: Process | Type[Process] | ProcessBuilder, inputs: dict[str, Any] | None = None, **kwargs: Any) Dict[str, Any][source]#

Run the process with the supplied inputs in this runner that will block until the process is completed.

The return value will be the results of the completed process

Parameters:
  • process – the process class or process function to run

  • inputs – the inputs to be passed to the process

Returns:

the outputs of the process

run_get_node(process: Process | Type[Process] | ProcessBuilder, inputs: dict[str, Any] | None = None, **kwargs: Any) ResultAndNode[source]#

Run the process with the supplied inputs in this runner that will block until the process is completed.

The return value will be the results of the completed process

Parameters:
  • process – the process class or process function to run

  • inputs – the inputs to be passed to the process

Returns:

tuple of the outputs of the process and the calculation node

run_get_pk(process: Process | Type[Process] | ProcessBuilder, inputs: dict[str, Any] | None = None, **kwargs: Any) ResultAndPk[source]#

Run the process with the supplied inputs in this runner that will block until the process is completed.

The return value will be the results of the completed process

Parameters:
  • process – the process class or process function to run

  • inputs – the inputs to be passed to the process

Returns:

tuple of the outputs of the process and process node pk

run_until_complete(future: Future) Any[source]#

Run the loop until the future has finished and return the result.

schedule(process: Process | Type[Process] | ProcessBuilder, inputs: dict[str, Any] | None = None, **kwargs: Any) ProcessNode[source]#

Schedule a process to be executed by this runner.

Parameters:
  • process – the process class to submit

  • inputs – the inputs to be passed to the process

Returns:

the calculation node of the process

start() None[source]#

Start the internal event loop.

stop() None[source]#

Stop the internal event loop.

submit(process: Process | Type[Process] | ProcessBuilder, inputs: dict[str, Any] | None = None, **kwargs: Any)[source]#

Submit the process with the supplied inputs to this runner immediately returning control to the interpreter.

The return value will be the calculation node of the submitted process

Parameters:
  • process – the process class to submit

  • inputs – the inputs to be passed to the process

Returns:

the calculation node of the process

property transport: TransportQueue#

A transport queue to batch process multiple tasks that require a Transport.

class aiida.engine.transports.TransportQueue(loop: AbstractEventLoop | None = None)[source]#

Bases: object

A queue to get transport objects from authinfo. This class allows clients to register their interest in a transport object which will be provided at some point in the future.

Internally the class will wait for a specific interval at the end of which it will open the transport and give it to all the clients that asked for it up to that point. This way opening of transports (a costly operation) can be minimised.

__dict__ = mappingproxy({'__module__': 'aiida.engine.transports', '__doc__': 'A queue to get transport objects from authinfo.  This class allows clients\n    to register their interest in a transport object which will be provided at\n    some point in the future.\n\n    Internally the class will wait for a specific interval at the end of which\n    it will open the transport and give it to all the clients that asked for it\n    up to that point.  This way opening of transports (a costly operation) can\n    be minimised.\n    ', '__init__': <function TransportQueue.__init__>, 'loop': <property object>, 'request_transport': <function TransportQueue.request_transport>, '__dict__': <attribute '__dict__' of 'TransportQueue' objects>, '__weakref__': <attribute '__weakref__' of 'TransportQueue' objects>, '__annotations__': {'_transport_requests': 'Dict[Hashable, TransportRequest]'}})#
__init__(loop: AbstractEventLoop | None = None)[source]#
Parameters:

loop – An asyncio event, will use asyncio.get_event_loop() if not supplied

__module__ = 'aiida.engine.transports'#
__weakref__#

list of weak references to the object (if defined)

property loop: AbstractEventLoop#

Get the loop being used by this transport queue

request_transport(authinfo: AuthInfo) Iterator[Awaitable[Transport]][source]#

Request a transport from an authinfo. Because the client is not allowed to request a transport immediately they will instead be given back a future that can be awaited to get the transport:

async def transport_task(transport_queue, authinfo):
    with transport_queue.request_transport(authinfo) as request:
        transport = await request
        # Do some work with the transport
Parameters:

authinfo – The authinfo to be used to get transport

Returns:

A future that can be yielded to give the transport

class aiida.engine.transports.TransportRequest[source]#

Bases: object

Information kept about request for a transport object

__dict__ = mappingproxy({'__module__': 'aiida.engine.transports', '__doc__': 'Information kept about request for a transport object', '__init__': <function TransportRequest.__init__>, '__dict__': <attribute '__dict__' of 'TransportRequest' objects>, '__weakref__': <attribute '__weakref__' of 'TransportRequest' objects>, '__annotations__': {'future': 'asyncio.Future'}})#
__init__()[source]#
__module__ = 'aiida.engine.transports'#
__weakref__#

list of weak references to the object (if defined)

Utilities for the workflow engine.

class aiida.engine.utils.InterruptableFuture(*, loop=None)[source]#

Bases: Future

A future that can be interrupted by calling interrupt.

__module__ = 'aiida.engine.utils'#
interrupt(reason: Exception) None[source]#

This method should be called to interrupt the coroutine represented by this InterruptableFuture.

async with_interrupt(coro: Awaitable[Any]) Any[source]#

Return result of a coroutine which will be interrupted if this future is interrupted

import asyncio
loop = asyncio.get_event_loop()

interruptable = InterutableFuture()
loop.call_soon(interruptable.interrupt, RuntimeError("STOP"))
loop.run_until_complete(interruptable.with_interrupt(asyncio.sleep(2.)))
>>> RuntimeError: STOP
Parameters:

coro – The coroutine that can be interrupted

Returns:

The result of the coroutine

aiida.engine.utils.ensure_coroutine(fct: Callable[[...], Any]) Callable[[...], Awaitable[Any]][source]#

Ensure that the given function fct is a coroutine

If the passed function is not already a coroutine, it will be made to be a coroutine

Parameters:

fct – the function

Returns:

the coroutine

async aiida.engine.utils.exponential_backoff_retry(fct: Callable[[...], Any], initial_interval: int | float = 10.0, max_attempts: int = 5, logger: Logger | None = None, ignore_exceptions: None | Type[Exception] | Tuple[Type[Exception], ...] = None) Any[source]#

Coroutine to call a function, recalling it with an exponential backoff in the case of an exception

This coroutine will loop max_attempts times, calling the fct function, breaking immediately when the call finished without raising an exception, at which point the result will be returned. If an exception is caught, the function will await a asyncio.sleep with a time interval equal to the initial_interval multiplied by 2 ** (N - 1) where N is the number of excepted calls.

Parameters:
  • fct – the function to call, which will be turned into a coroutine first if it is not already

  • initial_interval – the time to wait after the first caught exception before calling the coroutine again

  • max_attempts – the maximum number of times to call the coroutine before re-raising the exception

  • ignore_exceptions – exceptions to ignore, i.e. when caught do nothing and simply re-raise

Returns:

result if the coro call completes within max_attempts retries without raising

aiida.engine.utils.get_process_state_change_timestamp(process_type: str | None = None) datetime | None[source]#

Get the global setting that reflects the last time a process of the given process type changed its state. The returned value will be the corresponding timestamp or None if the setting does not exist.

Parameters:

process_type – optional process type for which to get the latest state change timestamp. Valid process types are either ‘calculation’ or ‘work’. If not specified, last timestamp for all known process types will be returned.

Returns:

a timestamp or None

aiida.engine.utils.instantiate_process(runner: Runner, process: 'Process' | Type['Process'] | 'ProcessBuilder', **inputs) Process[source]#

Return an instance of the process with the given inputs. The function can deal with various types of the process:

  • Process instance: will simply return the instance

  • ProcessBuilder instance: will instantiate the Process from the class and inputs defined within it

  • Process class: will instantiate with the specified inputs

If anything else is passed, a ValueError will be raised

Parameters:
  • process – Process instance or class, CalcJobNode class or ProcessBuilder instance

  • inputs – the inputs for the process to be instantiated with

aiida.engine.utils.interruptable_task(coro: Callable[[InterruptableFuture], Awaitable[Any]], loop: AbstractEventLoop | None = None) InterruptableFuture[source]#

Turn the given coroutine into an interruptable task by turning it into an InterruptableFuture and returning it.

Parameters:
  • coro – the coroutine that should be made interruptable with object of InterutableFuture as last paramenter

  • loop – the event loop in which to run the coroutine, by default uses asyncio.get_event_loop()

Returns:

an InterruptableFuture

aiida.engine.utils.is_process_function(function: Any) bool[source]#

Return whether the given function is a process function

Parameters:

function – a function

Returns:

True if the function is a wrapped process function, False otherwise

aiida.engine.utils.is_process_scoped() bool[source]#

Return whether the current scope is within a process.

Returns:

True if the current scope is within a nested process, False otherwise

aiida.engine.utils.loop_scope(loop) Iterator[None][source]#

Make an event loop current for the scope of the context

Parameters:

loop – The event loop to make current for the duration of the scope

aiida.engine.utils.prepare_inputs(inputs: dict[str, Any] | None = None, **kwargs: Any) dict[str, Any][source]#

Prepare inputs for launch of a process.

This is a utility function to pre-process inputs for the process that can be specified both through keyword arguments as well as through the explicit inputs argument. When both are specified, a ValueError is raised.

Parameters:
  • inputs – Inputs dictionary.

  • kwargs – Inputs defined as keyword arguments.

Raises:

ValueError – If both kwargs and inputs are defined.

Returns:

The dictionary of inputs for the process.

aiida.engine.utils.set_process_state_change_timestamp(process: Process) None[source]#

Set the global setting that reflects the last time a process changed state, for the process type of the given process, to the current timestamp. The process type will be determined based on the class of the calculation node it has as its database container.

Parameters:

process – the Process instance that changed its state