aiida.brokers.rabbitmq package#

Submodules#

Implementation of the message broker interface using RabbitMQ through kiwipy.

class aiida.brokers.rabbitmq.broker.RabbitmqBroker(profile: Profile)[source]#

Bases: Broker

Implementation of the message broker interface using RabbitMQ through kiwipy.

__annotations__ = {}#
__init__(profile: Profile) None[source]#

Construct a new instance.

Parameters:

profile – The profile.

__module__ = 'aiida.brokers.rabbitmq.broker'#
__str__()[source]#

Return str(self).

_create_communicator() RmqThreadCommunicator[source]#

Return an instance of kiwipy.Communicator.

check_rabbitmq_version()[source]#

Check the version of RabbitMQ that is being connected to and emit warning if it is not compatible.

close()[source]#

Close the broker.

get_communicator() RmqThreadCommunicator[source]#

Return an instance of kiwipy.Communicator.

get_rabbitmq_version()[source]#

Return the version of the RabbitMQ server that the current profile connects to.

Returns:

packaging.version.Version

get_url() str[source]#

Return the RMQ url for this profile.

is_rabbitmq_version_supported() bool[source]#

Return whether the version of RabbitMQ configured for the current profile is supported.

Versions 3.5 and below are not supported at all, whereas versions 3.8.15 and above are not compatible with a default configuration of the RabbitMQ server.

Returns:

boolean whether the current RabbitMQ version is supported.

iterate_tasks()[source]#

Return an iterator over the tasks in the launch queue.

Client for RabbitMQ Management HTTP API.

exception aiida.brokers.rabbitmq.client.ManagementApiConnectionError[source]#

Bases: AiidaException

Raised when no connection can be made to the management HTTP API.

__module__ = 'aiida.brokers.rabbitmq.client'#
class aiida.brokers.rabbitmq.client.RabbitmqManagementClient(username: str, password: str, hostname: str, virtual_host: str)[source]#

Bases: object

Client for RabbitMQ Management HTTP API.

This requires the rabbitmq_management plugin (https://www.rabbitmq.com/management.html) to be enabled. Typically this is enabled by running rabbitmq-plugins enable rabbitmq_management.

__dict__ = mappingproxy({'__module__': 'aiida.brokers.rabbitmq.client', '__doc__': 'Client for RabbitMQ Management HTTP API.\n\n    This requires the ``rabbitmq_management`` plugin (https://www.rabbitmq.com/management.html) to be enabled. Typically\n    this is enabled by running ``rabbitmq-plugins enable rabbitmq_management``.\n    ', '__init__': <function RabbitmqManagementClient.__init__>, 'format_url': <function RabbitmqManagementClient.format_url>, 'request': <function RabbitmqManagementClient.request>, 'is_connected': <property object>, '__dict__': <attribute '__dict__' of 'RabbitmqManagementClient' objects>, '__weakref__': <attribute '__weakref__' of 'RabbitmqManagementClient' objects>, '__annotations__': {}})#
__init__(username: str, password: str, hostname: str, virtual_host: str)[source]#

Construct a new instance.

Parameters:
  • username – The username to authenticate with.

  • password – The password to authenticate with.

  • hostname – The hostname of the RabbitMQ server.

  • virtual_host – The virtual host.

__module__ = 'aiida.brokers.rabbitmq.client'#
__weakref__#

list of weak references to the object (if defined)

format_url(url: str, url_params: dict[str, str] | None = None) str[source]#

Format the complete URL from a partial resource path with placeholders.

The base URL will be automatically prepended.

Parameters:
  • url – The resource path with placeholders, e.g., queues/{virtual_host}/{queue}.

  • url_params – Dictionary with values for the placeholders in the url. The virtual_host value is automatically inserted and should not be specified.

Returns:

The complete URL.

property is_connected: bool#

Return whether the API server can be connected to.

Note

Tries to reach the server at the /api/cluster-name end-point.

Returns:

True if the server can be reached, False otherwise.

request(url: str, url_params: dict[str, str] | None = None, method: str = 'GET', params: dict[str, t.Any] | None = None) requests.Response[source]#

Make a request.

Parameters:
  • url – The resource path with placeholders, e.g., queues/{virtual_host}/{queue}.

  • url_params – Dictionary with values for the placeholders in the url. The virtual_host value is automatically inserted and should not be specified.

  • method – The HTTP method.

  • params – Query parameters to add to the URL.

Returns:

The response of the request.

Raises:

ManagementApiConnectionError – If connection to the API cannot be made.

Defaults related to RabbitMQ.

Utilites for RabbitMQ.

aiida.brokers.rabbitmq.utils.get_launch_queue_name(prefix=None)[source]#

Return the launch queue name with an optional prefix.

Returns:

launch queue name

aiida.brokers.rabbitmq.utils.get_message_exchange_name(prefix)[source]#

Return the message exchange name for a given prefix.

Returns:

message exchange name

aiida.brokers.rabbitmq.utils.get_rmq_url(protocol=None, username=None, password=None, host=None, port=None, virtual_host=None, **kwargs)[source]#

Return the URL to connect to RabbitMQ.

Note

The default of the host is set to 127.0.0.1 instead of localhost because on some computers localhost resolves first to IPv6 with address ::1 and if RMQ is not running on IPv6 one gets an annoying warning. For more info see: aiidateam/aiida-core#1142

Parameters:
  • protocol – the protocol to use, amqp or amqps.

  • username – the username for authentication.

  • password – the password for authentication.

  • host – the hostname of the RabbitMQ server.

  • port – the port of the RabbitMQ server.

  • virtual_host – the virtual host to connect to.

  • kwargs – remaining keyword arguments that will be encoded as query parameters.

Returns:

the connection URL string.

aiida.brokers.rabbitmq.utils.get_task_exchange_name(prefix)[source]#

Return the task exchange name for a given prefix.

Returns:

task exchange name