import threading
from cis_interface import backwards, tools
from cis_interface.communication import RMQComm
from cis_interface.schema import register_component
if RMQComm._rmq_installed:
import pika
_pika_version_maj = int(float(pika.__version__.split('.')[0]))
if _pika_version_maj >= 1: # pragma: debug
raise ImportError("pika version 1.0 not yet supported.")
else:
pika = False
_pika_version_maj = 0
[docs]@register_component
class RMQAsyncComm(RMQComm.RMQComm):
r"""Class for handling asynchronous RabbitMQ communications. It is not
recommended to use this class as it can leave hanging threads if not
closed propertly. The normal RMQComm will cover most use cases.
Args:
name (str): The environment variable where the comm address is stored.
dont_open (bool, optional): If True, the connection will not be opened.
Defaults to False.
**kwargs: Additional keyword arguments are passed to CommBase.
Attributes:
times_connected (int): Number of times that this connections has been
established.
rmq_thread (tools.CisThread): Thread used to run IO loop.
"""
_commtype = 'rmq_async'
def _init_before_open(self, **kwargs):
r"""Initialize null variables and RMQ async thread."""
self.times_connected = 0
self.rmq_thread_count = 0
self.rmq_thread = self.new_run_thread()
self._opening = False
self._closing = False
self._reconnecting = False
self._close_called = False
self._buffered_messages = []
self._qres = None
self._qres_lock = threading.RLock()
self._qres_event = threading.Event()
self._qres_event.set()
super(RMQAsyncComm, self)._init_before_open(**kwargs)
@property
def rmq_lock(self):
r"""Lock associated with RMQ ioloop thread."""
return self.rmq_thread.lock
[docs] def new_run_thread(self, name=None):
r"""Get a new thread for running."""
if name is None:
name = self.name
self.rmq_thread_count += 1
return tools.CisThread(name=name + '.RMQThread%d' % self.rmq_thread_count,
target=self.run_thread)
[docs] def run_thread(self):
r"""Connect to the connection and begin the IO loop."""
self.debug('')
self.connect()
self.connection.ioloop.start()
self.debug("returning")
[docs] def start_run_thread(self):
r"""Start the run thread and wait for it to finish."""
with self.rmq_lock:
if self.rmq_thread.was_started:
return
self._opening = True
self.rmq_thread.start()
# Wait for connection to be established
T = self.start_timeout()
# interval = 1 # timeout / 5
while (not T.is_out) and (not self.channel_stable) and self.rmq_thread.is_alive():
self.sleep() # 0.5 # interval)
self.stop_timeout()
# Check that connection was established
if not self.rmq_thread.is_alive(): # pragma: debug
self._opening = False
self.force_close()
raise Exception("Connection ioloop could not be established.")
if not self.channel_stable: # pragma: debug
self.force_close()
raise RuntimeError("Connection never finished opening "
+ "(%f/%f timeout)." % (T.elapsed, T.max_time))
[docs] def bind(self):
r"""Declare queue to get random new queue."""
# Start ioloop in a new thread
with self.rmq_lock:
# Don't bind if already closing
if self.is_open or self._close_called: # pragma: debug
return
self._bound = True
self.start_run_thread()
# Register queue
if not self.queue: # pragma: debug
self.error("Queue was not initialized.")
self.register_comm(self.address, (self.connection, self.channel))
super(RMQComm.RMQComm, self).bind()
def _open_direct(self):
r"""Open connection and bind/connect to queue as necessary."""
super(RMQAsyncComm, self)._open_direct()
T = self.start_timeout()
while (not T.is_out) and self._opening: # pragma: debug
self.sleep()
self.stop_timeout()
def _close_direct(self, linger=False):
r"""Close the connection.
Args:
linger (bool, optional): If True, drain messages before closing the
comm. Defaults to False.
"""
with self.rmq_lock:
self._close_called = True
if self._closing: # pragma: debug
return # Don't close more than once
# Wait for connection to finish opening to close it
T = self.start_timeout(key=self.timeout_key + '_opening')
while (not T.is_out) and self._opening: # pragma: debug
self.sleep()
self.stop_timeout(key=self.timeout_key + '_opening')
# Close by cancelling consumption
if self.is_open or self._bound:
with self.rmq_lock:
self._closing = True
self._is_open = False
self._bound = False
if self.channel is not None:
try:
if self.direction == 'recv':
self.channel.basic_cancel(callback=self.on_cancelok,
consumer_tag=self.consumer_tag)
else:
self.channel.close()
except (pika.exceptions.ChannelClosed,
pika.exceptions.ConnectionClosed): # pragma: debug
self._closing = False
if not self.is_client:
self.unregister_comm(self.address)
# Wait for connection to finish closing & then force if it dosn't
T = self.start_timeout(key=self.timeout_key + '_closing')
while (not T.is_out) and self._closing:
self.sleep()
self.stop_timeout(key=self.timeout_key + '_closing')
if self._closing: # pragma: debug
self.force_close()
if self.rmq_thread.is_alive(): # pragma: debug
self.rmq_thread.join(self.timeout)
if self.rmq_thread.is_alive():
raise RuntimeError("Thread still running.")
# Close workers
# with self.rmq_lock:
super(RMQAsyncComm, self)._close_direct(linger=linger)
def _set_qres(self, res):
r"""Callback for getting message count."""
self._qres = res
self._qres_event.set()
[docs] def get_queue_result(self):
r"""Get the fram from passive queue declare."""
res = None
if self.is_open_direct:
with self._qres_lock:
if self._qres_event.is_set():
self._qres_event.clear()
try:
self.channel.queue_declare(queue=self.queue,
callback=self._set_qres,
# , auto_delete=True,
passive=True)
except (pika.exceptions.ChannelClosed,
pika.exceptions.ConnectionClosed): # pragma: debug
if not self._reconnecting:
self._close_direct()
else:
self._qres = None
self._qres_event.set()
self._qres_event.wait()
res = self._qres
return res
@property
def n_msg_direct_recv(self):
r"""int: Number of messages in the queue."""
if self.is_open_direct:
return len(self._buffered_messages)
return 0
# Access work comms with lock
# def get_work_comm(self, *args, **kwargs):
# r"""Alias for parent class that wraps method in Lock."""
# with self.rmq_lock:
# return super(RMQAsyncComm, self).get_work_comm(*args, **kwargs)
# def create_work_comm(self, *args, **kwargs):
# r"""Alias for parent class that wraps method in Lock."""
# with self.rmq_lock:
# return super(RMQAsyncComm, self).create_work_comm(*args, **kwargs)
# def add_work_comm(self, *args, **kwargs):
# r"""Alias for parent class that wraps method in Lock."""
# with self.rmq_lock:
# return super(RMQAsyncComm, self).add_work_comm(*args, **kwargs)
# def remove_work_comm(self, *args, **kwargs):
# r"""Alias for parent class that wraps method in Lock."""
# with self.rmq_lock:
# return super(RMQAsyncComm, self).remove_work_comm(*args, **kwargs)
def _send_direct(self, msg, exchange=None, routing_key=None, **kwargs):
r"""Send a message.
Args:
msg (str, bytes): Message to be sent.
exchange (str, optional): Exchange that message should be sent
to. Defaults to self.exchange.
routing_key (str, optional): Key that exchange should use to route
the message. Defaults to self.queue.
**kwargs: Additional keyword arguments are passed to
:method:`pika.BlockingChannel.basic_publish`.
Returns:
bool: Success or failure of send.
"""
with self.rmq_lock:
if self.is_closed: # pragma: debug
return False
out = super(RMQAsyncComm, self)._send_direct(msg, exchange=exchange,
routing_key=routing_key,
**kwargs)
# Basic publish returns None for asynchronous connection
if out is None:
out = True
return out
def _recv_direct(self):
r"""Receive a message.
Returns:
tuple (bool, obj): Success or failure of receive and received
message.
"""
with self.rmq_lock:
return (True, self._buffered_messages.pop(0))
# if self.n_msg_recv != 0:
# with self.rmq_lock:
# out = (True, self._buffered_messages.pop(0))
# return out
# if self.is_closed: # pragma: debug
# self.debug("Connection closed.")
# return (False, None)
# if self.n_msg_recv == 0:
# # self.debug(".recv(): No buffered messages.")
# out = (True, self.empty_bytes_msg)
# else:
# with self.rmq_lock:
# out = (True, self._buffered_messages.pop(0))
# return out
[docs] def on_message(self, ch, method, props, body):
r"""Buffer received messages."""
if self.direction == 'send': # pragma: debug
raise Exception("Send comm received a message.")
with self.rmq_lock:
self._buffered_messages.append(backwards.as_bytes(body))
ch.basic_ack(delivery_tag=method.delivery_tag)
# CONNECTION
[docs] def connect(self):
r"""Establish the connection."""
self.times_connected += 1
parameters = pika.URLParameters(self.url)
kwargs = dict(on_open_callback=self.on_connection_open,
on_open_error_callback=self.on_connection_open_error)
if _pika_version_maj < 1:
kwargs['stop_ioloop_on_close'] = False
self.connection = pika.SelectConnection(parameters, **kwargs)
[docs] def on_connection_open(self, connection):
r"""Actions that must be taken when the connection is opened.
Add the close connection callback and open the RabbitMQ channel."""
self.debug('::Connection opened')
connection.add_on_close_callback(self.on_connection_closed)
self.open_channel()
[docs] def on_connection_open_error(self, unused_connection): # pragma: debug
r"""Actions that must be taken when the connection fails to open."""
self.debug('::Connection could not be opened')
self.close()
raise Exception('Could not connect.')
[docs] def on_connection_closed(self, connection, *args):
r"""Actions that must be taken when the connection is closed. Set the
channel to None. If the connection is meant to be closing, stop the
IO loop. Otherwise, wait 5 seconds and try to reconnect."""
if _pika_version_maj < 1:
reply_code = args[0]
reply_text = args[1]
# else:
# reply_code = 0
# reply_text = args[0]
with self.rmq_lock:
self.debug('::on_connection_closed code %s (code = %d)', reply_text,
reply_code)
if self._closing or reply_code == 200:
connection.ioloop.stop()
self.connection = None
self._closing = False
self._qres_event.set()
else:
self.warning('Connection closed, reopening in %f seconds: (%s) %s',
self.sleeptime, reply_code, reply_text)
self._reconnecting = True
if _pika_version_maj < 1:
connection.add_timeout(self.sleeptime, self.reconnect)
# else:
# connection.ioloop.call_later(self.sleeptime, self.reconnect)
[docs] def reconnect(self):
r"""Try to re-establish a connection and resume a new IO loop."""
self.debug('')
# This is the old connection IOLoop instance, stop its ioloop
# with self.rmq_lock:
self.connection.ioloop.stop()
if not self._closing:
# self.run_thread()
# Create a new connection
self.connect()
# There is now a new connection, needs a new ioloop to run
self._reconnecting = False
self.connection.ioloop.start()
# CHANNEL
@property
def channel_open(self):
r"""bool: True if connection ready for messages, False otherwise."""
with self.rmq_lock:
if self.channel is None or self.connection is None:
return False
if self.channel.is_open:
if not self.channel.is_closing:
return True
return False # pragma: debug
@property
def channel_stable(self):
r"""bool: True if the connection ready for messages and not about to
close. False otherwise."""
with self.rmq_lock:
return (self.channel_open and (not self._closing)
and (not self._opening))
[docs] def open_channel(self):
r"""Open a RabbitMQ channel."""
self.debug('::Creating a new channel')
self.connection.channel(on_open_callback=self.on_channel_open)
[docs] def on_channel_open(self, channel):
r"""Actions to perform after a channel is opened. Add the channel
close callback and setup the exchange."""
self.debug('::Channel opened')
self.channel = channel
channel.add_on_close_callback(self.on_channel_closed)
self.setup_exchange(self.exchange)
[docs] def on_channel_closed(self, channel, *args):
r"""Actions to perform when the channel is closed. Close the
connection."""
with self.rmq_lock:
if _pika_version_maj < 1:
reply_code = args[0]
reply_text = args[1]
self.debug('::channel %i was closed: (%s) %s',
channel, reply_code, reply_text)
# else:
# reason = args[0]
# self.debug('::channel %i was closed: %s',
# channel, reason)
if not (channel.connection.is_closing or channel.connection.is_closed):
channel.connection.close()
self.channel = None
# if self.connection is not None:
# self.connection.close()
# EXCHANGE
[docs] def setup_exchange(self, exchange_name):
r"""Setup the exchange."""
self.debug('::Declaring exchange %s', exchange_name)
self.channel.exchange_declare(callback=self.on_exchange_declareok,
exchange=exchange_name,
auto_delete=True)
[docs] def on_exchange_declareok(self, unused_frame):
r"""Actions to perform once an exchange is succesfully declared.
Set up the queue."""
self.debug('::Exchange declared')
self.setup_queue()
# QUEUE
[docs] def setup_queue(self):
r"""Set up the message queue."""
self.debug('::Declaring queue %s', self.queue)
if self.direction == 'recv' and not self.queue:
exclusive = False # True
else:
exclusive = False
if self.queue.startswith('amq.'):
passive = True
else:
passive = False
self.channel.queue_declare(queue=self.queue,
callback=self.on_queue_declareok,
exclusive=exclusive,
# , auto_delete=True,
passive=passive)
[docs] def on_queue_declareok(self, method_frame):
r"""Actions to perform once the queue is succesfully declared. Bind
the queue."""
self.debug('::Binding')
with self.rmq_lock:
if not self.queue:
self.address += method_frame.method.queue
self.channel.queue_bind(callback=self.on_bindok,
exchange=self.exchange,
# routing_key=self.routing_key,
queue=self.queue)
[docs] def on_bindok(self, unused_frame):
r"""Actions to perform once the queue is succesfully bound. Start
consuming messages."""
self.debug('::Queue bound')
self.channel.basic_qos(prefetch_count=1)
self.channel.add_on_cancel_callback(self.on_cancelok)
if self.direction == 'recv':
kwargs = dict(on_message_callback=self.on_message,
queue=self.queue)
if _pika_version_maj < 1:
kwargs['consumer_callback'] = kwargs.pop('on_message_callback')
self.consumer_tag = self.channel.basic_consume(**kwargs)
with self.rmq_lock:
self._opening = False
# GENERAL
[docs] def on_cancelok(self, unused_frame):
r"""Actions to perform after succesfully cancelling consumption. Closes
the channel."""
self.debug('::on_cancelok()')
with self.rmq_lock:
self.close_queue()
self.close_channel()
[docs] def close_connection(self):
r"""Stop the ioloop and close the connection."""
if self.connection: # pragma: debug
self.connection.ioloop.stop()
super(RMQAsyncComm, self).close_connection()
[docs] def force_close(self): # pragma: debug
r"""Force stop by removing the queue and stopping the IO loop."""
with self.rmq_lock:
self.close_queue()
self.close_connection()
self.channel = None
self.connection = None
self._closing = False
[docs] def purge(self):
r"""Remove all messages from the associated queue."""
with self.rmq_lock:
self._buffered_messages = []
if not self.channel_stable: # pragma: debug
return
super(RMQAsyncComm, self).purge()