Source code for yggdrasil.communication.RMQComm

from yggdrasil import multitasking
from yggdrasil.communication import CommBase, NoMessages
from yggdrasil.config import ygg_cfg
import logging
logger = logging.getLogger(__name__)
try:
    import pika
    _rmq_installed = True
    _pika_version_maj = int(float(pika.__version__.split('.')[0]))
    if _pika_version_maj < 1:  # pragma: debug
        raise ImportError("pika version <1.0 no longer supported.")
except ImportError:
    logger.debug("Could not import pika. "
                 + "RabbitMQ support will be disabled.")
    pika = None
    _rmq_installed = False
    _pika_version_maj = 0


_rmq_param_sep = '_RMQPARAM_'
_localhost = '127.0.0.1'


[docs]def get_rmq_parameters(url=None, user=None, username=None, password=None, host=None, virtual_host=None, vhost=None, port=None, exchange=None, queue=''): r"""Get RabbitMQ connection parameters. Args: url (str, optional): Address of RMQ server that includes all the necessary information. If this is provided, the remaining arguments are ignored. Defaults to None and the connection parameters are taken from the other arguments. user (str, optional): RabbitMQ server username. Defaults to config option 'user' in section 'rmq' if it exists and 'guest' if it does not. username (str, optional): Alias for user. password (str, optional): RabbitMQ server password. Defaults to config option 'password' in section 'rmq' if it exists and 'guest' if it does not. host (str, optional): RabbitMQ server host. Defaults to config option 'host' in section 'rmq' if it exists and _localhost if it does not. If _localhost, the output of socket.gethostname() is used. virtual_host (str, optional): RabbitMQ server virtual host. Defaults to config option 'vhost' in section 'rmq' if it exists and '/' if it does not. vhost (str, optional): Alias for virtual_host. port (str, optional): Port on host to use. Defaults to config option 'port' in section 'rmq' if it exists and '5672' if it does not. exchange (str, optional): RabbitMQ exchange. Defaults to config option 'namespace' in section 'rmq' if it exits and '' if it does not. queue (str, optional): Name of the queue that messages will be send to or received from. If an empty string, the queue will be a random string and exclusive to a receiving comm. Defaults to ''. Returns: tuple: The connection url, exchange, & queue. """ if url is None: if user is None: user = username or ygg_cfg.get('rmq', 'user', 'guest') if password is None: password = ygg_cfg.get('rmq', 'password', 'guest') if host is None: host = ygg_cfg.get('rmq', 'host', _localhost) if virtual_host is None: virtual_host = vhost or ygg_cfg.get('rmq', 'vhost', '/') if virtual_host == '/': virtual_host = '%2f' if port is None: port = ygg_cfg.get('rmq', 'port', '5672') url = 'amqp://%s:%s@%s:%s/%s' % ( user, password, host, port, virtual_host) if exchange is None: exchange = ygg_cfg.get('rmq', 'namespace', '') return url, exchange, queue
[docs]def check_rmq_server(url=None, **kwargs): r"""Check that connection to a RabbitMQ server is possible. Args: url (str, optional): Address of RMQ server that includes all the necessary information. If this is provided, the remaining arguments are ignored. Defaults to None and the connection parameters are taken from the other arguments. username (str, optional): RMQ server username. Defaults to config value. password (str, optional): RMQ server password. Defaults to config value. host (str, optional): RMQ hostname or IP Address to connect to. Defaults to config value. port (str, optional): RMQ server TCP port to connect to. Defaults to config value. vhost (str, optional): RMQ virtual host to use. Defaults to config value. Returns: bool: True if connection to RabbitMQ server is possible, False otherwise. """ out = True if not _rmq_installed: return False if url is not None: parameters = pika.URLParameters(url) else: username = kwargs.get('username', ygg_cfg.get('rmq', 'user', 'guest')) password = kwargs.get('password', ygg_cfg.get('rmq', 'password', 'guest')) host = kwargs.get('host', ygg_cfg.get('rmq', 'host', _localhost)) port = kwargs.get('port', ygg_cfg.get('rmq', 'port', '5672')) vhost = kwargs.get('vhost', ygg_cfg.get('rmq', 'vhost', '/')) credentials = pika.PlainCredentials(username, password) parameters = pika.ConnectionParameters(host=host, port=int(port), virtual_host=vhost, credentials=credentials) # Try to establish connection logging.getLogger("pika").propagate = False try: from yggdrasil import tools with tools.track_fds("pika test connection: "): connection = pika.BlockingConnection(parameters) if not connection.is_open: # pragma: debug del connection raise BaseException("Connection was not opened.") connection.close() del connection except BaseException as e: # pragma: debug print("Error when attempting to connect to the RabbitMQ server: " + str(e)) out = False logging.getLogger("pika").propagate = True return out
[docs]class RMQServer(CommBase.CommServer): r"""RMQ server object for cleaning up server connections.""" def __init__(self, *args, **kwargs): self.comm_cls = kwargs.get('comm_cls', RMQComm) super(RMQServer, self).__init__(*args, **kwargs)
[docs] def terminate(self, *args, **kwargs): self.comm_cls.unregister_comm(self.srv_address) super(RMQServer, self).terminate(*args, **kwargs)
[docs]class RMQComm(CommBase.CommBase): r"""Class for handling basic RabbitMQ communications. Attributes: connection (:class:`pika.Connection`): RabbitMQ connection. channel (:class:`pika.Channel`): RabbitMQ channel. Raises: RuntimeError: If a connection cannot be established. Developer Notes: It is not advised that new language implement a RabbitMQ communication interface. Rather RMQ communication is included explicitly for connections between models that are not co-located on the same machine and are used by the |yggdrasil| framework connections on the Python side. """ _commtype = 'rmq' _schema_subtype_description = ('RabbitMQ connection.') # Based on limit of 32bit int, this could be 2**30, but this is # too large for stack allocation in C so 2**20 will be used. _maxMsgSize = 2**20 address_description = ("AMPQ queue address of the form " "``<url>_RMQPARAM_<exchange>_RMQPARAM_<queue>`` " "where ``url`` is the broker address (see explanation " "`here <https://pika.readthedocs.io/en/stable/" "examples/using_urlparameters.html>`_), " "``exchange`` is the name of the exchange on the queue " "that should be used, and ``queue`` is the name of " "the queue.") _disconnect_attr = (CommBase.CommBase._disconnect_attr + ['_opening', '_closing']) _deprecated_drivers = ['RMQInputDriver', 'RMQOutputDriver'] def _init_before_open(self, **kwargs): r"""Set null connection and channel.""" if not hasattr(self, 'rmq_lock'): self.rmq_lock = multitasking.RLock() self.connection = None self.channel = None self._opening = multitasking.ProcessEvent() self._closing = multitasking.ProcessEvent() self._server_class = RMQServer self._server_kwargs = {'comm_cls': self.__class__} super(RMQComm, self)._init_before_open(**kwargs) @property def url(self): r"""str: AMQP server address.""" return self.address.split(_rmq_param_sep)[0] @property def exchange(self): r"""str: AMQP exchange.""" return self.address.split(_rmq_param_sep)[1] @property def queue(self): r"""str: AMQP queue.""" return self.address.split(_rmq_param_sep)[2]
[docs] @classmethod def new_comm_kwargs(cls, name, user=None, password=None, host=None, virtual_host=None, port=None, exchange=None, queue='', **kwargs): r"""Initialize communication with new connection. Args: name (str): Name of new connection. user (str, optional): RabbitMQ server username. Defaults to config option 'user' in section 'rmq' if it exists and 'guest' if it does not. password (str, optional): RabbitMQ server password. Defaults to config option 'password' in section 'rmq' if it exists and 'guest' if it does not. host (str, optional): RabbitMQ server host. Defaults to config option 'host' in section 'rmq' if it exists and _localhost if it does not. If _localhost, the output of socket.gethostname() is used. virtual_host (str, optional): RabbitMQ server virtual host. Defaults to config option 'vhost' in section 'rmq' if it exists and '/' if it does not. port (str, optional): Port on host to use. Defaults to config option 'port' in section 'rmq' if it exists and '5672' if it does not. exchange (str, optional): RabbitMQ exchange. Defaults to config option 'namespace' in section 'rmq' if it exits and '' if it does not. queue (str, optional): Name of the queue that messages will be send to or received from. If an empty string, the queue will be a random string and exclusive to a receiving comm. Defaults to ''. **kwargs: Additional keywords arguments are returned as keyword arguments for the new comm. Returns: tuple(tuple, dict): Arguments and keyword arguments for new comm. """ args = [name] if 'address' not in kwargs: (url, exchange, queue) = get_rmq_parameters( user=user, password=password, host=host, virtual_host=virtual_host, port=port, exchange=exchange, queue=queue) kwargs['address'] = _rmq_param_sep.join([url, exchange, queue]) return args, kwargs
[docs] def opp_comm_kwargs(self, for_yaml=False): r"""Get keyword arguments to initialize communication with opposite comm object. Args: for_yaml (bool, optional): If True, the returned dict will only contain values that can be specified in a YAML file. Defaults to False. Returns: dict: Keyword arguments for opposite comm object. """ kwargs = super(RMQComm, self).opp_comm_kwargs(for_yaml=for_yaml) return kwargs
[docs] def bind(self): r"""Declare queue to get random new queue.""" if self._opening.has_started() or self._closing.has_started(): return self._opening.start() with self.rmq_lock: if self.connection is None: parameters = pika.URLParameters(self.url) self.connection = pika.BlockingConnection(parameters) self.original_queue = self.queue self.channel = self.connection.channel() self.channel.exchange_declare(exchange=self.exchange, auto_delete=True) if self.direction == 'send': self.channel.confirm_delivery() res = self.channel.queue_declare( queue=self.queue, exclusive=False, auto_delete=True, passive=self.original_queue.startswith('amq.')) if not self.queue: self.address += res.method.queue self.channel.queue_bind(exchange=self.exchange, queue=self.queue) self.register_comm(self.address, (self.connection, self.channel)) super(RMQComm, self).bind() self._opening.stop()
def _close(self, linger=False): r"""Close the connection. Args: linger (bool, optional): If True, drain messages before closing the comm. Defaults to False. """ if self._closing.has_started(): # pragma: debug return self._closing.start() if self.direction == 'recv': self.close_queue() self.close_channel() self.close_connection() if not self.is_client: self.unregister_comm(self.address) with self.rmq_lock: self.channel = None self.connection = None super(RMQComm, self)._close(linger=linger) self._closing.stop()
[docs] def close_queue(self, skip_unbind=False): r"""Close the queue if the channel exists.""" if self.direction != 'recv': return with self.rmq_lock: if self.channel and (not self.is_client): try: if not skip_unbind: self.channel.queue_unbind(queue=self.queue, exchange=self.exchange) self.channel.queue_delete(queue=self.queue) except (pika.exceptions.ChannelClosed, pika.exceptions.ConnectionClosed, pika.exceptions.ChannelWrongStateError, pika.exceptions.ConnectionWrongStateError): # pragma: debug pass except AttributeError: # pragma: debug if self.channel is not None: raise
[docs] def close_channel(self): r"""Close the channel if it exists.""" with self.rmq_lock: if self.channel is not None: self.debug('Closing the channel') try: self.channel.close() except (pika.exceptions.ChannelWrongStateError, pika.exceptions.StreamLostError): # pragma: debug pass
[docs] def close_connection(self, *args, **kwargs): r"""Close the connection.""" with self.rmq_lock: if self.connection is not None: self.debug('Closing connection') self.connection.close(*args, **kwargs)
[docs] def atexit(self): # pragma: debug r"""Close operations.""" if self.direction == 'send': self.linger() super(RMQComm, self).atexit()
@property def is_open(self): r"""bool: True if the connection and channel are open.""" with self.rmq_lock: if self.channel is None or self.connection is None: return False if not self.connection.is_open: return False if not self.channel.is_open: return False return (self._opening.has_stopped() and (not self._closing.has_started()))
[docs] def get_queue_result(self): r"""Get the fram from passive queue declare.""" with self.rmq_lock: res = None if self.is_open: try: res = self.channel.queue_declare(queue=self.queue, auto_delete=True, passive=True) except pika.exceptions.ChannelClosedByBroker: # pragma: debug self._close() # except BlockingIOError: # pragma: debug # self.sleep() # res = self.get_queue_result() # except (pika.exceptions.ChannelClosed, # pika.exceptions.ConnectionClosed, # pika.exceptions.ChannelWrongStateError, # pika.exceptions.ConnectionWrongStateError, # pika.exceptions.StreamLostError, # AttributeError): # pragma: debug # self._close() return res
@property def n_msg_recv(self): r"""int: Number of messages in the queue.""" out = 0 res = self.get_queue_result() if res is not None: out = res.method.message_count return out @property def n_msg_send(self): r"""int: Number of messages in the queue.""" return 0 @property def is_confirmed_send(self): r"""bool: True if all sent messages have been confirmed.""" return True @property def is_confirmed_recv(self): r"""bool: True if all received messages have been confirmed.""" return True @property def get_work_comm_kwargs(self): r"""dict: Keyword arguments for an existing work comm.""" out = super(RMQComm, self).get_work_comm_kwargs out['exchange'] = self.exchange return out @property def create_work_comm_kwargs(self): r"""dict: Keyword arguments for a new work comm.""" out = super(RMQComm, self).create_work_comm_kwargs out['exchange'] = self.exchange return out def _send(self, msg, exchange=None, routing_key=None, **kwargs): r"""Send a message. Args: msg (str, bytes): Message to be sent. exchange (str, optional): Exchange that message should be sent to. Defaults to self.exchange. routing_key (str, optional): Key that exchange should use to route the message. Defaults to self.queue. **kwargs: Additional keyword arguments are passed to :method:`pika.BlockingChannel.basic_publish`. Returns: bool: Success or failure of send. """ if exchange is None: exchange = self.exchange if routing_key is None: routing_key = self.queue kwargs.setdefault('mandatory', True) with self.rmq_lock: try: self.channel.basic_publish(exchange, routing_key, msg, **kwargs) except pika.exceptions.UnroutableError: # pragma: debug return False return True def _recv(self): r"""Receive a message. Returns: tuple (bool, obj): Success or failure of receive and received message. """ with self.rmq_lock: method_frame, props, msg = self.channel.basic_get( queue=self.queue, auto_ack=False) if method_frame: self.channel.basic_ack(method_frame.delivery_tag) else: # pragma: debug raise NoMessages("No messages in connection.") return (True, msg)
[docs] def purge(self): r"""Remove all messages from the associated queue.""" if not self._closing.has_started(): with self.rmq_lock: with self._closing_thread.lock: if self.is_open: self.channel.queue_purge(queue=self.queue) super(RMQComm, self).purge()