aiida.engine.daemon package#

Module with resources for the daemon.

Submodules#

Client to interact with the daemon.

class aiida.engine.daemon.client.ControllerProtocol(value)[source]#

Bases: Enum

The protocol to use for the controller of the Circus daemon.

IPC = 0#
TCP = 1#
__module__ = 'aiida.engine.daemon.client'#
class aiida.engine.daemon.client.DaemonClient(profile: Profile)[source]#

Bases: object

Client to interact with the daemon.

_DAEMON_NAME = 'aiida-{name}'#
_ENDPOINT_PROTOCOL = 0#
__dict__ = mappingproxy({'__module__': 'aiida.engine.daemon.client', '__doc__': 'Client to interact with the daemon.', '_DAEMON_NAME': 'aiida-{name}', '_ENDPOINT_PROTOCOL': <ControllerProtocol.IPC: 0>, '__init__': <function DaemonClient.__init__>, 'profile': <property object>, 'daemon_name': <property object>, '_verdi_bin': <property object>, 'cmd_start_daemon': <function DaemonClient.cmd_start_daemon>, 'cmd_start_daemon_worker': <property object>, 'loglevel': <property object>, 'virtualenv': <property object>, 'circus_log_file': <property object>, 'circus_pid_file': <property object>, 'circus_port_file': <property object>, 'circus_socket_file': <property object>, 'circus_socket_endpoints': <property object>, 'daemon_log_file': <property object>, 'daemon_pid_file': <property object>, 'get_circus_port': <function DaemonClient.get_circus_port>, 'get_env': <staticmethod(<function DaemonClient.get_env>)>, 'get_circus_socket_directory': <function DaemonClient.get_circus_socket_directory>, 'get_daemon_pid': <function DaemonClient.get_daemon_pid>, 'is_daemon_running': <property object>, 'delete_circus_socket_directory': <function DaemonClient.delete_circus_socket_directory>, 'get_available_port': <classmethod(<function DaemonClient.get_available_port>)>, 'get_controller_endpoint': <function DaemonClient.get_controller_endpoint>, 'get_pubsub_endpoint': <function DaemonClient.get_pubsub_endpoint>, 'get_stats_endpoint': <function DaemonClient.get_stats_endpoint>, 'get_ipc_endpoint': <function DaemonClient.get_ipc_endpoint>, 'get_tcp_endpoint': <function DaemonClient.get_tcp_endpoint>, 'get_client': <function DaemonClient.get_client>, 'call_client': <function DaemonClient.call_client>, 'get_status': <function DaemonClient.get_status>, 'get_numprocesses': <function DaemonClient.get_numprocesses>, 'get_worker_info': <function DaemonClient.get_worker_info>, 'get_daemon_info': <function DaemonClient.get_daemon_info>, 'increase_workers': <function DaemonClient.increase_workers>, 'decrease_workers': <function DaemonClient.decrease_workers>, 'start_daemon': <function DaemonClient.start_daemon>, 'restart_daemon': <function DaemonClient.restart_daemon>, 'stop_daemon': <function DaemonClient.stop_daemon>, '_clean_potentially_stale_pid_file': <function DaemonClient._clean_potentially_stale_pid_file>, '_is_pid_file_stale': <property object>, '_check_pid_file': <function DaemonClient._check_pid_file>, '_await_condition': <staticmethod(<function DaemonClient._await_condition>)>, '_start_daemon': <function DaemonClient._start_daemon>, '__dict__': <attribute '__dict__' of 'DaemonClient' objects>, '__weakref__': <attribute '__weakref__' of 'DaemonClient' objects>, '__annotations__': {'_socket_directory': 'str | None', '_daemon_timeout': 'int'}})#
__init__(profile: Profile)[source]#

Construct an instance for a given profile.

Parameters:

profile – The profile instance.

__module__ = 'aiida.engine.daemon.client'#
__weakref__#

list of weak references to the object (if defined)

static _await_condition(condition: Callable, exception: Exception, timeout: int = 5, interval: float = 0.1)[source]#

Await a condition to evaluate to True or raise the exception if the timeout is reached.

Parameters:
  • condition – A callable that is waited for to return True.

  • exception – Raise this exception if condition does not return True after timeout seconds.

  • timeout – Wait this number of seconds for condition to return True before raising.

  • interval – The time in seconds to wait between invocations of condition.

Raises:

The exception provided by exception if timeout is reached.

_check_pid_file() None[source]#

Check that the daemon’s PID file is not stale.

Checks if the PID contained in the circus PID file matches a valid running verdi process. The PID file is considered stale if any of the following conditions are true:

  • The process with the given PID no longer exists

  • The process name does not match the command of the circus daemon

  • The process username does not match the username of this Python interpreter

In the latter two cases, the process with the PID of the PID file exists, but it is very likely that it is not the original process that created the PID file, since the command or user is different, indicating the original process died and the PID was recycled for a new process.

The PID file can got stale if a system is shut down suddenly and so the process is killed but the PID file is not deleted in time. When the get_daemon_pid() method is called, an incorrect PID is returned. Alternatively, another process or the user may have meddled with the PID file in some way, corrupting it.

Raises:

DaemonException – If the PID file is likely to be stale.

_clean_potentially_stale_pid_file() None[source]#

Check the daemon PID file and delete it if it is likely to be stale.

property _is_pid_file_stale: bool#

Return whether the daemon PID file is likely to be stale.

Returns:

True if the PID file is likely to be stale, False otherwise.

_start_daemon(number_workers: int = 1, foreground: bool = False) None[source]#

Start the daemon.

Warning

This will daemonize the current process and put it in the background. It is most likely not what you want to call if you want to start the daemon from the Python API. Instead you probably will want to use the aiida.engine.daemon.client.DaemonClient.start_daemon() function instead.

Parameters:
  • number_workers – Number of daemon workers to start.

  • foreground – Whether to launch the subprocess in the background or not.

property _verdi_bin: str#

Return the absolute path to the verdi binary.

Raises:

ConfigurationError – If the path to verdi could not be found

call_client(command: dict[str, Any], timeout: int | None = None) dict[str, Any][source]#

Call the client with a specific command.

Will check whether the daemon is running first by checking for the pid file. When the pid is found yet the call still fails with a timeout, this means the daemon was actually not running and it was terminated unexpectedly causing the pid file to not be cleaned up properly.

Parameters:
  • command – Command to call the circus client with.

  • timeout – Optional timeout to set for trying to reach the circus daemon. Default is set on the client upon instantiation taken from the daemon.timeout config option.

Returns:

The result of the circus client call.

Raises:
property circus_log_file: str#
property circus_pid_file: str#
property circus_port_file: str#
property circus_socket_endpoints: dict[str, str]#
property circus_socket_file: str#
cmd_start_daemon(number_workers: int = 1, foreground: bool = False) list[str][source]#

Return the command to start the daemon.

Parameters:
  • number_workers – Number of daemon workers to start.

  • foreground – Whether to launch the subprocess in the background or not.

property cmd_start_daemon_worker: list[str]#

Return the command to start a daemon worker process.

property daemon_log_file: str#
property daemon_name: str#

Get the daemon name which is tied to the profile name.

property daemon_pid_file: str#
decrease_workers(number: int, timeout: int | None = None) dict[str, Any][source]#

Decrease the number of workers.

Parameters:
  • number – The number of workers to remove.

  • timeout – Optional timeout to set for trying to reach the circus daemon. Default is set on the client upon instantiation taken from the daemon.timeout config option.

Returns:

The client call response.

delete_circus_socket_directory() None[source]#

Attempt to delete the directory used to store the circus endpoint sockets.

Will not raise if the directory does not exist.

classmethod get_available_port()[source]#

Get an available port from the operating system.

Returns:

A currently available port.

get_circus_port() int[source]#

Retrieve the port for the circus controller, which should be written to the circus port file.

If the daemon is running, the port file should exist and contain the port to which the controller is connected. If it cannot be read, a RuntimeError will be thrown. If the daemon is not running, an available port will be requested from the operating system, written to the port file and returned.

Returns:

The port for the circus controller.

get_circus_socket_directory() str[source]#

Retrieve the absolute path of the directory where the circus sockets are stored.

If the daemon is running, the sockets file should exist and contain the absolute path of the directory that contains the sockets of the circus endpoints. If it cannot be read, a RuntimeError will be thrown. If the daemon is not running, a temporary directory will be created and its path will be written to the sockets file and returned.

Note

A temporary folder needs to be used for the sockets because UNIX limits the filepath length to 107 bytes. Placing the socket files in the AiiDA config folder might seem like the more logical choice but that folder can be placed in an arbitrarily nested directory, the socket filename will exceed the limit. The solution is therefore to always store them in the temporary directory of the operation system whose base path is typically short enough as to not exceed the limit

Returns:

The absolute path of directory to write the sockets to.

get_client(timeout: int | None = None) CircusClient[source]#

Return an instance of the CircusClient.

The endpoint is defined by the controller endpoint, which used the port that was written to the port file upon starting of the daemon.

Parameters:

timeout – Optional timeout to set for trying to reach the circus daemon. Default is set on the client upon instantiation taken from the daemon.timeout config option.

Returns:

CircusClient instance

get_controller_endpoint()[source]#

Get the endpoint string for the circus controller.

For the IPC protocol a profile specific socket will be used, whereas for the TCP protocol an available port will be found and saved in the profile specific port file.

Returns:

The endpoint string.

get_daemon_info(timeout: int | None = None) dict[str, Any][source]#

Get statistics about this daemon itself.

Parameters:

timeout – Optional timeout to set for trying to reach the circus daemon. Default is set on the client upon instantiation taken from the daemon.timeout config option.

Returns:

The client call response. If successful, will contain ‘info’ key.

get_daemon_pid() int | None[source]#

Get the daemon pid which should be written in the daemon pid file specific to the profile.

Returns:

The pid of the circus daemon process or None if not found.

static get_env() dict[str, str][source]#

Return the environment for this current process.

This method is used to pass variables from the environment of the current process to a subprocess that is spawned when the daemon or a daemon worker is started.

It replicates the PATH, PYTHONPATH` and the ``AIIDA_PATH environment variables. The PYTHONPATH variable ensures that all Python modules that can be imported by the parent process, are also importable by the subprocess. The AIIDA_PATH variable ensures that the subprocess will use the same AiiDA configuration directory as used by the current process.

get_ipc_endpoint(endpoint)[source]#

Get the ipc endpoint string for a circus daemon endpoint for a given socket.

Parameters:

endpoint – The circus endpoint for which to return a socket.

Returns:

The ipc endpoint string.

get_numprocesses(timeout: int | None = None) dict[str, Any][source]#

Get the number of running daemon processes.

Parameters:

timeout – Optional timeout to set for trying to reach the circus daemon. Default is set on the client upon instantiation taken from the daemon.timeout config option.

Returns:

The client call response. If successful, will contain ‘numprocesses’ key.

get_pubsub_endpoint()[source]#

Get the endpoint string for the circus pubsub endpoint.

For the IPC protocol a profile specific socket will be used, whereas for the TCP protocol any available port will be used.

Returns:

The endpoint string.

get_stats_endpoint()[source]#

Get the endpoint string for the circus stats endpoint.

For the IPC protocol a profile specific socket will be used, whereas for the TCP protocol any available port will be used.

Returns:

The endpoint string.

get_status(timeout: int | None = None) dict[str, Any][source]#

Return the status of the daemon.

Parameters:

timeout – Optional timeout to set for trying to reach the circus daemon. Default is set on the client upon instantiation taken from the daemon.timeout config option.

Returns:

The client call response. If successful, will contain ‘pid’ key.

get_tcp_endpoint(port=None)[source]#

Get the tcp endpoint string for a circus daemon endpoint.

If the port is unspecified, the operating system will be asked for a currently available port.

Parameters:

port – A port to use for the endpoint.

Returns:

The tcp endpoint string.

get_worker_info(timeout: int | None = None) dict[str, Any][source]#

Get workers statistics for this daemon.

Parameters:

timeout – Optional timeout to set for trying to reach the circus daemon. Default is set on the client upon instantiation taken from the daemon.timeout config option.

Returns:

The client call response. If successful, will contain ‘info’ key.

increase_workers(number: int, timeout: int | None = None) dict[str, Any][source]#

Increase the number of workers.

Parameters:
  • number – The number of workers to add.

  • timeout – Optional timeout to set for trying to reach the circus daemon. Default is set on the client upon instantiation taken from the daemon.timeout config option.

Returns:

The client call response.

property is_daemon_running: bool#

Return whether the daemon is running, which is determined by seeing if the daemon pid file is present.

Returns:

True if daemon is running, False otherwise.

property loglevel: str#
property profile: Profile#
restart_daemon(wait: bool = True, timeout: int | None = None) dict[str, Any][source]#

Restart the daemon.

Parameters:
  • wait – Boolean to indicate whether to wait for the result of the command.

  • timeout – Optional timeout to set for trying to reach the circus daemon. Default is set on the client upon instantiation taken from the daemon.timeout config option.

Returns:

The client call response.

Raises:
start_daemon(number_workers: int = 1, foreground: bool = False, wait: bool = True, timeout: int | None = None) None[source]#

Start the daemon in a sub process running in the background.

Parameters:
  • number_workers – Number of daemon workers to start.

  • foreground – Whether to launch the subprocess in the background or not.

  • wait – Boolean to indicate whether to wait for the result of the command.

  • timeout – Optional timeout to set for trying to reach the circus daemon after the subprocess has started. Default is set on the client upon instantiation taken from the daemon.timeout config option.

Raises:
stop_daemon(wait: bool = True, timeout: int | None = None) dict[str, Any][source]#

Stop the daemon.

Parameters:
  • wait – Boolean to indicate whether to wait for the result of the command.

  • timeout – Optional timeout to set for trying to reach the circus daemon. Default is set on the client upon instantiation taken from the daemon.timeout config option.

Returns:

The client call response.

Raises:
property virtualenv: str | None#
exception aiida.engine.daemon.client.DaemonException[source]#

Bases: AiidaException

Base class for exceptions related to the daemon.

__annotations__ = {}#
__module__ = 'aiida.engine.daemon.client'#
exception aiida.engine.daemon.client.DaemonNotRunningException[source]#

Bases: DaemonException

Raised when a connection to the daemon is attempted but it is not running.

__annotations__ = {}#
__module__ = 'aiida.engine.daemon.client'#
exception aiida.engine.daemon.client.DaemonStalePidException[source]#

Bases: DaemonException

Raised when a connection to the daemon is attempted but it fails and the PID file appears to be stale.

__annotations__ = {}#
__module__ = 'aiida.engine.daemon.client'#
exception aiida.engine.daemon.client.DaemonTimeoutException[source]#

Bases: DaemonException

Raised when a connection to the daemon is attempted but it times out.

__annotations__ = {}#
__module__ = 'aiida.engine.daemon.client'#
aiida.engine.daemon.client.get_daemon_client(profile_name: str | None = None) DaemonClient[source]#

Return the daemon client for the given profile or the current profile if not specified.

Parameters:

profile_name – Optional profile name to load.

Returns:

The daemon client.

Raises:

This file contains the main routines to submit, check and retrieve calculation results. These are general and contain only the main logic; where appropriate, the routines make reference to the suitable plugins for all plugin-specific operations.

aiida.engine.daemon.execmanager._copy_local_files(logger, node, transport, inputs, local_copy_list)[source]#

Perform the copy instrctions of the local_copy_list.

aiida.engine.daemon.execmanager._copy_remote_files(logger, node, computer, transport, remote_copy_list, remote_symlink_list)[source]#

Perform the copy instructions of the remote_copy_list and remote_symlink_list.

aiida.engine.daemon.execmanager._copy_sandbox_files(logger, node, transport, folder)[source]#

Copy the contents of the sandbox folder to the working directory.

aiida.engine.daemon.execmanager._find_data_node(inputs: Mapping[str, Any], uuid: str) Node | None[source]#

Find and return the node with the given UUID from a nested mapping of input nodes.

Parameters:
  • inputs – (nested) mapping of nodes

  • uuid – UUID of the node to find

Returns:

instance of Node or None if not found

aiida.engine.daemon.execmanager.kill_calculation(calculation: CalcJobNode, transport: Transport) None[source]#

Kill the calculation through the scheduler

Parameters:
  • calculation – the instance of CalcJobNode to kill.

  • transport – an already opened transport to use to address the scheduler

aiida.engine.daemon.execmanager.retrieve_calculation(calculation: CalcJobNode, transport: Transport, retrieved_temporary_folder: str) FolderData | None[source]#

Retrieve all the files of a completed job calculation using the given transport.

If the job defined anything in the retrieve_temporary_list, those entries will be stored in the retrieved_temporary_folder. The caller is responsible for creating and destroying this folder.

Parameters:
  • calculation – the instance of CalcJobNode to update.

  • transport – an already opened transport to use for the retrieval.

  • retrieved_temporary_folder – the absolute path to a directory in which to store the files listed, if any, in the retrieved_temporary_folder of the jobs CalcInfo.

Returns:

The FolderData into which the files have been retrieved, or None if the calculation already has a retrieved output node attached.

aiida.engine.daemon.execmanager.retrieve_files_from_list(calculation: CalcJobNode, transport: Transport, folder: str, retrieve_list: List[str | Tuple[str, str, int] | list]) None[source]#

Retrieve all the files in the retrieve_list from the remote into the local folder instance through the transport. The entries in the retrieve_list can be of two types:

  • a string

  • a list

If it is a string, it represents the remote absolute filepath of the file. If the item is a list, the elements will correspond to the following:

  • remotepath

  • localpath

  • depth

If the remotepath contains file patterns with wildcards, the localpath will be treated as the work directory of the folder and the depth integer determines upto what level of the original remotepath nesting the files will be copied.

Parameters:
  • transport – the Transport instance.

  • folder – an absolute path to a folder that contains the files to copy.

  • retrieve_list – the list of files to retrieve.

aiida.engine.daemon.execmanager.stash_calculation(calculation: CalcJobNode, transport: Transport) None[source]#

Stash files from the working directory of a completed calculation to a permanent remote folder.

After a calculation has been completed, optionally stash files from the work directory to a storage location on the same remote machine. This is useful if one wants to keep certain files from a completed calculation to be removed from the scratch directory, because they are necessary for restarts, but that are too heavy to retrieve. Instructions of which files to copy where are retrieved from the stash.source_list option.

Parameters:
  • calculation – the calculation job node.

  • transport – an already opened transport.

aiida.engine.daemon.execmanager.submit_calculation(calculation: CalcJobNode, transport: Transport) str | ExitCode[source]#

Submit a previously uploaded CalcJob to the scheduler.

Parameters:
  • calculation – the instance of CalcJobNode to submit.

  • transport – an already opened transport to use to submit the calculation.

Returns:

the job id as returned by the scheduler submit_from_script call

aiida.engine.daemon.execmanager.upload_calculation(node: CalcJobNode, transport: Transport, calc_info: CalcInfo, folder: SandboxFolder, inputs: MappingType[str, Any] | None = None, dry_run: bool = False) RemoteData | None[source]#

Upload a CalcJob instance

Parameters:
  • node – the CalcJobNode.

  • transport – an already opened transport to use to submit the calculation.

  • calc_info – the calculation info datastructure returned by CalcJob.presubmit

  • folder – temporary local file system folder containing the inputs written by CalcJob.prepare_for_submission

Returns:

The RemoteData representing the working directory on the remote if, or None if dry_run=True.

Function that starts a daemon worker.

async aiida.engine.daemon.worker.shutdown_worker(runner: Runner) None[source]#

Cleanup tasks tied to the service’s shutdown.

aiida.engine.daemon.worker.start_daemon_worker(foreground: bool = False) None[source]#

Start a daemon worker for the currently configured profile.

Parameters:

foreground – If true, the logging will be configured to write to stdout, otherwise it will be configured to write to the daemon log file.