import functools
import weakref
from yggdrasil import tools, multitasking
from yggdrasil.communication import (
RMQComm, NoMessages, TemporaryCommunicationError)
from yggdrasil.communication.RMQComm import pika
[docs]class RMQTaskLoop(multitasking.YggTaskLoop):
r"""Task loop for RMQ consumer."""
def __init__(self, comm, *args, **kwargs):
self.comm = weakref.proxy(comm)
super(RMQTaskLoop, self).__init__(*args, **kwargs)
[docs] def atexit(self): # pragma: debug
if self.comm.is_interface:
if self.comm.direction == 'send' and self.comm.is_open:
self.comm.send_eof()
self.comm.linger()
self.join(10.0)
self.comm.close()
super(RMQTaskLoop, self).atexit()
[docs]class RMQAsyncComm(RMQComm.RMQComm):
r"""Class for handling asynchronous RabbitMQ communications. It is not
recommended to use this class as it can leave hanging threads if not
closed propertly. The normal RMQComm will cover most use cases.
Args:
name (str): The environment variable where the comm address is stored.
dont_open (bool, optional): If True, the connection will not be opened.
Defaults to False.
**kwargs: Additional keyword arguments are passed to CommBase.
Attributes:
times_connected (int): Number of times that this connections has been
established.
rmq_thread (multitasking.YggTask): Thread used to run IO loop.
"""
_commtype = 'rmq_async'
_schema_subtype_description = ('Asynchronous RabbitMQ connection.')
_disconnect_attr = (RMQComm.RMQComm._disconnect_attr
+ ['rmq_thread', '_consuming', '_reconnecting',
'_buffered_messages', '_deliveries',
'_external_close'])
_deprecated_drivers = ['RMQAsyncInputDriver', 'RMQAsyncOutputDriver']
def _init_before_open(self, **kwargs):
r"""Initialize null variables and RMQ async thread."""
self.original_queue = None
self.times_connected = 0
self.rmq_thread_count = 0
self.rmq_thread = self.new_run_thread()
self._consuming = multitasking.ProcessEvent(task_method='thread')
self._reconnecting = multitasking.ProcessEvent(task_method='thread')
self._reconnect_delay = 0
self._prefetch_count = 2
self._buffered_messages = multitasking.Queue(task_method='thread')
self._deliveries = multitasking.LockedDict(task_method='thread')
self._external_close = multitasking.Event(task_method='thread')
# self._publish_interval = 0
self._acked = 0
self._nacked = 0
self._message_number = 0
super(RMQAsyncComm, self)._init_before_open(**kwargs)
self._opening.stopped.add_callback(self._reconnecting.stop)
@property
def rmq_lock(self):
r"""Lock associated with RMQ ioloop thread."""
return self.rmq_thread.lock
[docs] def new_run_thread(self, name=None):
r"""Get a new thread for running."""
if name is None:
name = self.name
self.rmq_thread_count += 1
return RMQTaskLoop(
self, name=f'{name}.RMQThread{self.rmq_thread_count}',
target=self.run_thread, task_method='thread',
daemon=self.is_interface)
[docs] def reset_for_reconnection(self):
r"""Reset variables in preparation for reconnection."""
self.connection = None
self.channel = None
self._acked = 0
self._nacked = 0
self._message_number = 0
self._consuming.started.clear()
self._consuming.stopped.clear()
self._closing.started.clear()
self._closing.stopped.clear()
self._consumer_tag = None
if self.direction == 'send': # pragma: intermittent
for k in list(self._deliveries.keys()):
self._deliveries.pop(k)
[docs] def check_for_close(self):
r"""bool: Check if close has been called from the main process."""
if self._external_close.is_set(): # pragma: debug
self._closing.start()
if self._reconnecting.is_running():
self._reconnecting.stop()
self.connection.ioloop.stop()
return True
return False
[docs] def run_thread(self):
r"""Connect to the connection and begin the IO loop."""
class BreakLoop(multitasking.BreakLoopException):
def __init__(solf, *args, **kwargs):
self.stop()
super(BreakLoop, solf).__init__(*args, **kwargs)
if self._closing.has_started():
raise BreakLoop("closing")
try:
self.debug('')
self.connect()
self.connection.ioloop.start()
self._consuming.stop()
self.debug("returning")
except BaseException as e: # pragma: debug
self.error("Error in RMQ thread %s: %s", type(e), e)
raise BreakLoop(e)
self.close_queue(skip_unbind=True)
self.close_channel()
self.close_connection()
if self._closing.has_started():
self._closing.stop()
if self._reconnecting.is_running():
if self._consuming.has_started():
self._reconnect_delay = 0
else:
self._reconnect_delay += 1
self._reconnect_delay = min(self._reconnect_delay, 30)
self.info(f'Reconnecting after {self._reconnect_delay} seconds')
self._external_close.wait(self._reconnect_delay)
if self._external_close.is_set(): # pragma: debug
self._reconnecting.stop()
raise BreakLoop("external close")
self.reset_for_reconnection()
[docs] def start_run_thread(self):
r"""Start the run thread and wait for it to finish."""
with self.rmq_lock:
if self.rmq_thread.was_started:
return
self._opening.start()
self.rmq_thread.start()
# Wait for connection to be established
self._opening.stopped.wait(self.timeout)
# Check that connection was established
if not self.rmq_thread.is_alive(): # pragma: debug
self.stop()
raise Exception("Connection ioloop could not be established.")
if not self.is_open: # pragma: debug
self.stop(call_on_thread=True)
raise RuntimeError("Connection never finished opening ")
[docs] def bind(self):
r"""Declare queue to get random new queue."""
if self._opening.has_started() or self._closing.has_started():
return
self.start_run_thread() # Start ioloop in a new thread
# Register queue
if not self.queue: # pragma: debug
self.error("Queue was not initialized.")
self.register_comm(self.address, (self.connection, self.channel))
super(RMQComm.RMQComm, self).bind()
multitasking.wait_on_function(
lambda: self._opening.has_stopped() or self._closing.has_started(),
timeout=self.timeout)
def _close(self, linger=False):
r"""Close the connection.
Args:
linger (bool, optional): If True, drain messages before closing the
comm. Defaults to False.
"""
self.stop(call_on_thread=True)
self._closing.stopped.wait(self.timeout)
if not self._closing.has_stopped(): # pragma: debug
if self.connection is not None:
self.connection.ioloop.stop()
self._closing.stop()
self.error("Closing has not completed, resources may be leaked")
if not self.is_client:
self.unregister_comm(self.address)
with self.rmq_lock:
self.channel = None
self.connection = None
super(RMQComm.RMQComm, self)._close(linger=linger)
if self.rmq_thread.is_alive(): # pragma: debug
self.rmq_thread.join(self.timeout)
if self.rmq_thread.is_alive():
raise RuntimeError(
self.logger.process("Thread still running.", {})[0])
[docs] def atexit(self): # pragma: debug
r"""Close operations."""
super(RMQComm.RMQComm, self).atexit()
@property
def is_open(self):
r"""bool: True if the connection and channel are open."""
return (super(RMQAsyncComm, self).is_open
or self._reconnecting.is_running())
@property
def n_msg_recv(self):
r"""int: Number of messages in the queue."""
if self.is_open:
return self._buffered_messages.qsize()
return 0
@property
def n_msg_send(self):
r"""int: Number of messages in the queue."""
return self.n_msg_recv # + len(self._deliveries)
def _send(self, msg, **kwargs):
r"""Send a message.
Args:
msg (str, bytes): Message to be sent.
exchange (str, optional): Exchange that message should be sent
to. Defaults to self.exchange.
routing_key (str, optional): Key that exchange should use to route
the message. Defaults to self.queue.
**kwargs: Additional keyword arguments are passed to
:method:`pika.BlockingChannel.basic_publish`.
Returns:
bool: Success or failure of send.
"""
try:
self._buffered_messages.put((msg, kwargs), block=False)
self.connection.ioloop.add_callback_threadsafe(
self.publish_message)
except multitasking.queue.Full: # pragma: debug
raise TemporaryCommunicationError("Queue full.")
return True
def _recv(self):
r"""Receive a message.
Returns:
tuple (bool, obj): Success or failure of receive and received
message.
"""
try:
return (True, self._buffered_messages.get(block=False))
except multitasking.queue.Empty:
raise NoMessages("No messages in buffer.")
[docs] def close_channel(self):
r"""Close the channel if it exists."""
if self.channel:
self.close_queue(skip_unbind=True)
self.channel.add_callback(
self.on_channel_closeok, [pika.spec.Channel.CloseOk])
super(RMQAsyncComm, self).close_channel()
[docs] def close_connection(self, *args, **kwargs):
r"""Close the connection."""
if self.direction == 'recv':
self._consuming.stop()
if self.connection is not None:
if self.connection.is_closing or self.connection.is_closed:
self.debug('Connection is closing or already closed')
else:
super(RMQAsyncComm, self).close_connection(*args, **kwargs)
# CONNECTION
[docs] def connect(self):
r"""Establish the connection."""
self.times_connected += 1
self.connection = pika.SelectConnection(
pika.URLParameters(self.url),
on_open_callback=self.on_connection_open,
on_open_error_callback=self.on_connection_open_error,
on_close_callback=self.on_connection_closed)
[docs] def on_connection_open(self, connection):
r"""Actions that must be taken when the connection is opened.
Add the close connection callback and open the RabbitMQ channel."""
self.debug('Connection opened')
self.open_channel()
[docs] def on_connection_open_error(self, unused_connection, err): # pragma: debug
r"""Actions that must be taken when the connection fails to open."""
self.debug(f'Connection open failed: {err}')
if not self._external_close.is_set():
self.reconnect()
[docs] def on_connection_closed(self, connection, reason):
r"""Actions that must be taken when the connection is closed. Set the
channel to None. If the connection is meant to be closing, stop the
IO loop. Otherwise, wait 5 seconds and try to reconnect."""
with self.rmq_lock:
self.close_queue(skip_unbind=True)
self.channel = None
if self._closing.has_started() or self._external_close.is_set():
self.debug('Connection closed')
self._closing.stop()
else:
self.debug(f'Connection closed, reconnecting: {reason}')
self.reconnect()
self.connection.ioloop.stop()
[docs] def stop(self, call_on_thread=False):
r"""Stop the ioloop."""
if call_on_thread:
if not self.rmq_thread.is_alive():
# Ensure that shutdown is not prevented by flags
assert self._closing.has_stopped()
return
self._external_close.set()
if self._reconnecting.is_running(): # pragma: debug
self._reconnecting.stopped.wait(10.0)
return
if self.connection is not None:
self.connection.ioloop.add_callback_threadsafe(self.stop)
return
if not self._closing.has_started():
self._closing.start()
self.debug(f'Stopping {self.direction}')
if self.direction == 'recv':
if self._consuming.is_running():
self.stop_consuming()
# Not called here because KeyboardInterrupt is not used
# to stop the loop
# self.connection.ioloop.start()
elif self.connection is not None: # pragma: debug
self.connection.ioloop.stop()
self.close_channel()
self.close_connection()
self._closing.stop()
self.debug('Stopped')
else:
self.close_channel()
self.close_connection()
[docs] def reconnect(self):
r"""Try to re-establish a connection and resume a new IO loop."""
# Stop the old IOLoop, create a new connection and start a new IOLoop
self.debug('')
if not self.original_queue: # pragma: debug
self.error("Cannot reconnect to a queue with a generated name.")
return
self._reconnecting.started.clear()
self._reconnecting.stopped.clear()
self._reconnecting.start() # stopped by re-opening
self._opening.stopped.clear()
# CHANNEL
[docs] def open_channel(self):
r"""Open a RabbitMQ channel."""
self.debug('Creating a new channel')
if self.check_for_close(): # pragma: debug
return
self.connection.channel(on_open_callback=self.on_channel_open)
[docs] def on_channel_open(self, channel):
r"""Actions to perform after a channel is opened. Add the channel
close callback and setup the exchange."""
self.debug('Channel opened')
self.channel = channel
self.add_on_channel_close_callback()
if self.check_for_close(): # pragma: debug
return
self.setup_exchange(self.exchange)
[docs] def add_on_channel_close_callback(self):
"""This method tells pika to call the on_channel_closed method if
RabbitMQ unexpectedly closes the channel."""
self.debug('Adding channel close callback')
self.channel.add_on_close_callback(self.on_channel_closed)
[docs] def on_channel_closed(self, channel, reason):
r"""Actions to perform when the channel is closed. Close the
connection."""
self.debug(f'Channel {channel} was closed: {reason}, {type(reason)}')
self.close_queue(skip_unbind=True)
kwargs = {}
if isinstance(reason, pika.exceptions.ChannelClosedByBroker): # pragma: debug
self._closing.start()
self._consuming.stop()
kwargs['reply_code'] = reason.reply_code
kwargs['reply_text'] = reason.reply_text
self.channel = None
self.close_connection(**kwargs)
# EXCHANGE
[docs] def setup_exchange(self, exchange_name):
r"""Setup the exchange."""
self.debug('Declaring exchange %s', exchange_name)
cb = functools.partial(
self.on_exchange_declareok, userdata=exchange_name)
self.channel.exchange_declare(
exchange=exchange_name,
auto_delete=True,
callback=cb)
[docs] def on_exchange_declareok(self, unused_frame, userdata):
r"""Actions to perform once an exchange is succesfully declared.
Set up the queue."""
self.debug('Exchange declared: %s', userdata)
if self.check_for_close(): # pragma: debug
return
self.setup_queue(self.queue)
# QUEUE
[docs] def setup_queue(self, queue_name):
r"""Set up the message queue."""
if self.original_queue is None:
self.original_queue = queue_name
passive = self.original_queue.startswith('amq.')
self.debug(f'Declaring queue {queue_name} (passive={passive})')
cb = functools.partial(self.on_queue_declareok, userdata=queue_name)
self.channel.queue_declare(queue=queue_name, callback=cb,
exclusive=False, passive=passive)
[docs] def on_queue_declareok(self, method_frame, userdata):
r"""Actions to perform once the queue is succesfully declared. Bind
the queue."""
queue_name = userdata
self.debug(f'Binding {self.exchange} to {queue_name}')
if not self.queue:
self.address += method_frame.method.queue
cb = functools.partial(self.on_bindok, userdata=queue_name)
if self.check_for_close(): # pragma: debug
return
self.channel.queue_bind(
queue_name,
self.exchange,
callback=cb)
[docs] def on_bindok(self, unused_frame, userdata):
r"""Actions to perform once the queue is succesfully bound. Start
consuming messages."""
self.debug('Queue bound')
if self.check_for_close(): # pragma: debug
return
if self.direction == 'recv':
self.set_qos()
else:
self.start_publishing()
self.debug('Finished opening producer')
# CONSUMER (RECV) SPECIFIC
[docs] def set_qos(self):
self.channel.basic_qos(
prefetch_count=self._prefetch_count,
callback=self.on_basic_qos_ok)
[docs] def on_basic_qos_ok(self, unused_frame):
r"""Actions to perform one the qos is set."""
self.debug('QOS set to: %d', 1)
if self.check_for_close(): # pragma: debug
return
self.start_consuming()
[docs] def start_consuming(self):
self.debug('Issuing consumer related RPC commands')
self.add_on_cancel_callback()
self.consumer_tag = self.channel.basic_consume(
self.queue, self.on_message, auto_ack=True)
self._consuming.start()
self._opening.stop()
self.debug('Finished opening consumer')
[docs] def add_on_cancel_callback(self):
self.debug('Adding consumer cancellation callback')
self.channel.add_on_cancel_callback(self.on_consumer_cancelled)
[docs] def on_consumer_cancelled(self, method_frame): # pragma: debug
self.debug('Consumer was cancelled remotely, shutting down: %r',
method_frame)
self.close_channel()
[docs] def on_channel_closeok(self, _unused_frame):
self.debug('Channel closed.')
[docs] def on_message(self, _unused_channel, basic_deliver, properties, body):
r"""Buffer received messages."""
self.debug('Received message # %s from %s: %.100s',
basic_deliver.delivery_tag, properties.app_id, body)
body = tools.str2bytes(body)
self._buffered_messages.put(body)
# Not required because auto_ack is True
# self.acknowledge_message(basic_deliver.delivery_tag)
# def acknowledge_message(self, delivery_tag):
# self.debug('Acknowledging message %s', delivery_tag)
# self.channel.basic_ack(delivery_tag, True)
[docs] def stop_consuming(self):
if self.channel:
self.debug('Sending a Basic.Cancel RPC command to RabbitMQ')
cb = functools.partial(
self.on_cancelok, userdata=self.consumer_tag)
self.channel.basic_cancel(self.consumer_tag, cb)
[docs] def on_cancelok(self, unused_frame, userdata):
r"""Actions to perform after succesfully cancelling consumption. Closes
the channel."""
self._consuming.stop()
self.debug(
'RabbitMQ acknowledged the cancellation of the consumer: %s',
userdata)
self.close_channel()
# PUBLISHER (SEND) SPECIFIC
[docs] def start_publishing(self):
r"""Enable confirmations and begin sending messages."""
self.debug('Issuing publisher related RPC commands')
self.enable_delivery_confirmations()
# self.schedule_next_message()
self._opening.stop() # Ready for sending
[docs] def enable_delivery_confirmations(self):
r"""Turn on delivery confirmations."""
self.debug('Issuing Confirm.Select RPC command')
self.channel.confirm_delivery(self.on_delivery_confirmation)
[docs] def on_delivery_confirmation(self, method_frame):
r"""Actions performed when a sent message is confirmed."""
confirmation_type = method_frame.method.NAME.split('.')[1].lower()
self.debug('Received %s for delivery tag: %i', confirmation_type,
method_frame.method.delivery_tag)
if confirmation_type == 'ack':
self._acked += 1
elif confirmation_type == 'nack': # pragma: intermittent
self._nacked += 1
self._deliveries.pop(method_frame.method.delivery_tag)
self.debug(
'Published %i messages, %i have yet to be confirmed, '
'%i were acked and %i were nacked', self._message_number,
len(self._deliveries), self._acked, self._nacked)
# def schedule_next_message(self):
# r"""Schedule a new send."""
# self.debug('Scheduling next message for %0.1f seconds',
# self._publish_interval)
# self.connection.ioloop.call_later(self._publish_interval,
# self.publish_message)
[docs] def publish_message(self):
r"""Publish the next message in the list."""
try:
msg, kwargs = self._buffered_messages.get(block=False)
except multitasking.queue.Empty: # pragma: intermittent
# self.schedule_next_message()
return
exchange = kwargs.pop('exchange', self.exchange)
routing_key = kwargs.pop('routing_key', self.queue)
kwargs.setdefault('mandatory', True)
try:
self.channel.basic_publish(exchange, routing_key, msg, **kwargs)
except pika.exceptions.UnroutableError: # pragma: debug
self.error("Failed to publish message")
self.stop()
return
self._message_number += 1
self._deliveries[self._message_number] = (msg, kwargs)
self.debug('Published message # %i', self._message_number)
# self.schedule_next_message()