aiida.manage.external.rmq package#

Module with utilities to interact with RabbitMQ.

Submodules#

Client for RabbitMQ Management HTTP API.

exception aiida.manage.external.rmq.client.ManagementApiConnectionError[source]#

Bases: AiidaException

Raised when no connection can be made to the management HTTP API.

__annotations__ = {}#
__module__ = 'aiida.manage.external.rmq.client'#
class aiida.manage.external.rmq.client.RabbitmqManagementClient(username: str, password: str, hostname: str, virtual_host: str)[source]#

Bases: object

Client for RabbitMQ Management HTTP API.

This requires the rabbitmq_management plugin (https://www.rabbitmq.com/management.html) to be enabled. Typically this is enabled by running rabbitmq-plugins enable rabbitmq_management.

__dict__ = mappingproxy({'__module__': 'aiida.manage.external.rmq.client', '__doc__': 'Client for RabbitMQ Management HTTP API.\n\n    This requires the ``rabbitmq_management`` plugin (https://www.rabbitmq.com/management.html) to be enabled. Typically\n    this is enabled by running ``rabbitmq-plugins enable rabbitmq_management``.\n    ', '__init__': <function RabbitmqManagementClient.__init__>, 'format_url': <function RabbitmqManagementClient.format_url>, 'request': <function RabbitmqManagementClient.request>, 'is_connected': <property object>, '__dict__': <attribute '__dict__' of 'RabbitmqManagementClient' objects>, '__weakref__': <attribute '__weakref__' of 'RabbitmqManagementClient' objects>, '__annotations__': {}})#
__init__(username: str, password: str, hostname: str, virtual_host: str)[source]#

Construct a new instance.

Parameters:
  • username – The username to authenticate with.

  • password – The password to authenticate with.

  • hostname – The hostname of the RabbitMQ server.

  • virtual_host – The virtual host.

__module__ = 'aiida.manage.external.rmq.client'#
__weakref__#

list of weak references to the object (if defined)

format_url(url: str, url_params: dict[str, str] | None = None) str[source]#

Format the complete URL from a partial resource path with placeholders.

The base URL will be automatically prepended.

Parameters:
  • url – The resource path with placeholders, e.g., queues/{virtual_host}/{queue}.

  • url_params – Dictionary with values for the placeholders in the url. The virtual_host value is automatically inserted and should not be specified.

Returns:

The complete URL.

property is_connected: bool#

Return whether the API server can be connected to.

Note

Tries to reach the server at the /api/cluster-name end-point.

Returns:

True if the server can be reached, False otherwise.

request(url: str, url_params: dict[str, str] | None = None, method: str = 'GET', params: dict[str, t.Any] | None = None) requests.Response[source]#

Make a request.

Parameters:
  • url – The resource path with placeholders, e.g., queues/{virtual_host}/{queue}.

  • url_params – Dictionary with values for the placeholders in the url. The virtual_host value is automatically inserted and should not be specified.

  • method – The HTTP method.

  • params – Query parameters to add to the URL.

Returns:

The response of the request.

Raises:

ManagementApiConnectionError – If connection to the API cannot be made.

Defaults related to RabbitMQ.

A sub class of plumpy.ProcessLauncher to launch a Process.

class aiida.manage.external.rmq.launcher.ProcessLauncher(loop: AbstractEventLoop | None = None, persister: Persister | None = None, load_context: LoadSaveContext | None = None, loader: ObjectLoader | None = None)[source]#

Bases: ProcessLauncher

A sub class of plumpy.ProcessLauncher to launch a Process.

It overrides the _continue method to make sure the node corresponding to the task can be loaded and that if it is already marked as terminated, it is not continued but the future is reconstructed and returned

__module__ = 'aiida.manage.external.rmq.launcher'#
async _continue(communicator, pid, nowait, tag=None)[source]#

Continue the task.

Note that the task may already have been completed, as indicated from the corresponding the node, in which case it is not continued, but the corresponding future is reconstructed and returned. This scenario may occur when the Process was already completed by another worker that however failed to send the acknowledgment.

Parameters:
  • communicator – the communicator that called this method

  • pid – the pid of the process to continue

  • nowait – if True don’t wait for the process to finish, just return the pid, otherwise wait and return the results

  • tag – the tag of the checkpoint to continue from

static handle_continue_exception(node, exception, message)[source]#

Handle exception raised in _continue call.

If the process state of the node has not yet been put to excepted, the exception was raised before the process instance could be reconstructed, for example when the process class could not be loaded, thereby circumventing the exception handling of the state machine. Raising this exception will then acknowledge the process task with RabbitMQ leaving an uncleaned node in the CREATED state for ever. Therefore we have to perform the node cleaning manually.

Parameters:
  • exception – the exception object

  • message – string message to use for the log message

Utilites for RabbitMQ.

aiida.manage.external.rmq.utils.get_launch_queue_name(prefix=None)[source]#

Return the launch queue name with an optional prefix.

Returns:

launch queue name

aiida.manage.external.rmq.utils.get_message_exchange_name(prefix)[source]#

Return the message exchange name for a given prefix.

Returns:

message exchange name

aiida.manage.external.rmq.utils.get_rmq_url(protocol=None, username=None, password=None, host=None, port=None, virtual_host=None, **kwargs)[source]#

Return the URL to connect to RabbitMQ.

Note

The default of the host is set to 127.0.0.1 instead of localhost because on some computers localhost resolves first to IPv6 with address ::1 and if RMQ is not running on IPv6 one gets an annoying warning. For more info see: aiidateam/aiida-core#1142

Parameters:
  • protocol – the protocol to use, amqp or amqps.

  • username – the username for authentication.

  • password – the password for authentication.

  • host – the hostname of the RabbitMQ server.

  • port – the port of the RabbitMQ server.

  • virtual_host – the virtual host to connect to.

  • kwargs – remaining keyword arguments that will be encoded as query parameters.

Returns:

the connection URL string.

aiida.manage.external.rmq.utils.get_task_exchange_name(prefix)[source]#

Return the task exchange name for a given prefix.

Returns:

task exchange name