Source code for aiida.brokers.rabbitmq.defaults
"""Defaults related to RabbitMQ."""
from __future__ import annotations
import typing as t
from aiida.common.extendeddicts import AttributeDict
__all__ = ('BROKER_DEFAULTS',)
LAUNCH_QUEUE = 'process.queue'
MESSAGE_EXCHANGE = 'messages'
TASK_EXCHANGE = 'tasks'
BROKER_DEFAULTS = AttributeDict(
{
'protocol': 'amqp',
'username': 'guest',
'password': 'guest',
'host': '127.0.0.1',
'port': 5672,
'virtual_host': '',
'heartbeat': 600,
}
)
[docs]
def detect_rabbitmq_config() -> dict[str, t.Any] | None:
"""Try to connect to a RabbitMQ server with the default connection parameters.
:returns: The connection parameters if the RabbitMQ server was successfully connected to, or ``None`` otherwise.
"""
from kiwipy.rmq.threadcomms import connect
connection_params = dict(BROKER_DEFAULTS)
try:
connect(connection_params=connection_params)
except ConnectionError:
return None
return connection_params