import uuid
from yggdrasil import multitasking
from yggdrasil.tools import ProxyObject
from yggdrasil.components import ComponentBaseUnregistered
from yggdrasil.communication import (
CommBase, TemporaryCommunicationError)
FLAG_FAILURE = 0
FLAG_SUCCESS = 1
FLAG_TRYAGAIN = 2
[docs]class AsyncComm(ProxyObject, ComponentBaseUnregistered):
r"""Class for handling asynchronous I/O.
Args:
name (str): The name of the message queue.
async_recv_method (str, optional): The method that should be used
to receive message into the backlog. Defaults to 'recv'.
async_send_method (str, optional): The method that should be used
to send message in the backlog. Defaults to 'send_message'.
async_recv_kwargs (dict, optional): Keyword arguments to pass to calls
receiving messages into the backlog. Defaults to {}.
async_send_method (dict, optional): Keyword arguments to pass to calls
sending message from the backlog. Defaults to {}.
**kwargs: Additional keyword arguments are passed to CommBase.
Attributes:
backlog_ready (multitasking.Event): Event set when there is a
message in the recv backlog.
"""
__slots__ = ['_backlog_buffer', '_backlog_thread',
'backlog_ready', '_used_direct', 'close_on_eof_recv',
'_backlog_received_eof', '_used', '_closed',
'async_recv_method', 'async_send_method',
'async_recv_kwargs', 'async_send_kwargs',
'_error_registry', 'daemon']
__overrides__ = ['_input_args', '_input_kwargs']
_disconnect_attr = ['backlog_ready', '_backlog_thread', '_wrapped']
_async_kws = ['async_recv_method', 'async_send_method',
'async_recv_kwargs', 'async_send_kwargs', 'daemon']
def __init__(self, wrapped, daemon=False,
async_recv_method='recv', async_send_method='send_message',
async_recv_kwargs=None, async_send_kwargs=None):
self._backlog_buffer = []
self._backlog_thread = None
self.backlog_ready = multitasking.Event()
self._used_direct = False
self.close_on_eof_recv = wrapped.close_on_eof_recv
self._used = False
self._closed = False
self._backlog_received_eof = False
self._error_registry = {}
self.daemon = daemon
self.async_recv_method = async_recv_method
self.async_send_method = async_send_method
if async_recv_kwargs is None:
async_recv_kwargs = {}
if async_send_kwargs is None:
async_send_kwargs = {}
self.async_recv_kwargs = async_recv_kwargs
self.async_send_kwargs = async_send_kwargs
wrapped.close_on_eof_recv = False
wrapped.is_async = True
super(AsyncComm, self).__init__(wrapped)
# Open backlog to match
if self._wrapped.is_open:
self.open()
if self._wrapped.is_interface: # pragma: debug
# atexit.register(self.atexit)
raise RuntimeError("Use of async comm inside model is untested")
def __reduce__(self):
rv = list(super(AsyncComm, self).__reduce__())
(wrapped, ) = rv[1]
wrapped.close_on_eof_recv = self.close_on_eof_recv
rv[1] = (wrapped, )
return tuple(rv)
[docs] def precheck(self, *args, **kwargs):
CommBase.CommBase.precheck(self, *args, **kwargs)
[docs] def printStatus(self, *args, **kwargs):
r"""Print status of the communicator."""
lines = ['%-15s: %s' % ('open (backlog)', self.is_open_backlog),
'%-15s: %s' % ('close called (backlog)', self._closed)]
if self.direction == 'send':
lines.append(
'%-15s: %s' % ('nsent (backlog)', self.n_msg_backlog_send))
else:
lines.append(
'%-15s: %s' % ('nrecv (backlog)', self.n_msg_backlog_recv))
kwargs.setdefault('extra_lines_after', [])
kwargs['extra_lines_after'] += lines
return self._wrapped.printStatus(*args, **kwargs)
@property
def backlog_thread(self):
r"""tools.YggTask: Task that will handle sinding or receiving
backlogged messages."""
if self._backlog_thread is None:
if self.direction == 'send':
self._backlog_thread = CommBase.CommTaskLoop(
self, target=self.run_backlog_send,
daemon=self.daemon, suffix='SendBacklog')
else:
self._backlog_thread = CommBase.CommTaskLoop(
self, target=self.run_backlog_recv,
daemon=self.daemon, suffix='RecvBacklog')
return self._backlog_thread
@property
def errors(self):
r"""list: Errors raised by the wrapped comm or async thread."""
out = self._wrapped.errors
if self._backlog_thread:
out += self._backlog_thread.errors
self._backlog_thread.errors = []
return out
[docs] def atexit(self): # pragma: debug
r"""Close operations."""
if (self.direction == 'send') and self.is_open_backlog:
self.linger()
[docs] def open(self):
r"""Open the connection by connecting to the queue."""
self._wrapped.open()
self._wrapped.suppress_special_debug = True
if self._wrapped.is_open and (not self.is_open_backlog):
self.backlog_thread.start()
[docs] def close(self, linger=False):
r"""Close the connection.
Args:
linger (bool, optional): If True, drain messages before closing the
comm. Defaults to False.
"""
with self._closing_thread.lock:
self._wrapped.close(linger=linger)
self._close_backlog(wait=linger)
with self.backlog_thread.lock:
self._closed = True
# Requeue messages for other servers
if ((self.is_server and (self.model_copies > 1)
and self._backlog_buffer)): # pragma: debug
# client_kws = self.opp_comm_kwargs
# client_kws.update(use_async=False)
# client = get_comm(self.name + '_client', **client_kws)
# for msg in self._backlog_buffer:
# self.info("Resending: %s", msg)
# client.send(msg[0])
# client.close()
raise RuntimeError("Returning backlogged messages to the server "
"is untested.")
def _close_backlog(self, wait=False):
r"""Close the backlog thread."""
self.debug('')
if self._backlog_thread is not None:
self.backlog_thread.set_break_flag()
self.backlog_ready.set()
if wait and (self._backlog_thread is not None): # pragma: intermittent
self.backlog_thread.wait(key=str(uuid.uuid4()))
if hasattr(self._wrapped, '_close_backlog'):
self._wrapped._close_backlog(wait=wait)
@property
def is_open(self):
r"""bool: True if the backlog is open."""
if self.direction == 'send':
return self._wrapped.is_open and self.is_open_backlog
else:
return ((self.is_open_backlog or (self.n_msg_backlog > 0))
and (not self._closed))
@property
def is_open_direct(self):
r"""bool: True if the direct comm is not None."""
return self._wrapped.is_open
@property
def is_open_backlog(self):
r"""bool: True if the backlog thread is running."""
return ((self._backlog_thread is not None)
and (not self.backlog_thread.was_break)
and (self.backlog_thread.is_alive()))
@property
def is_closed(self):
r"""bool: True if the connection is closed."""
return (not self.is_open)
@property
def n_msg(self):
r"""int: The number of messages in the connection."""
if self.direction == 'recv':
return self.n_msg_recv
else:
return self.n_msg_send
@property
def n_msg_recv(self):
r"""int: Number of messages in the receive backlog."""
return self.n_msg_backlog_recv
@property
def n_msg_send(self):
r"""int: Number of messages in the send backlog."""
return self.n_msg_backlog_send
@property
def n_msg_direct_recv(self):
r"""int: Number of messages currently being routed in recv."""
return self._wrapped.n_msg_recv
@property
def n_msg_direct_send(self):
r"""int: Number of messages currently being routed in send."""
return self._wrapped.n_msg_send
@property
def n_msg_direct(self):
r"""int: Number of messages currently being routed."""
if self.direction == 'send':
return self.n_msg_direct_send
else:
return self.n_msg_direct_recv
@property
def n_msg_backlog_recv(self):
r"""int: Number of messages in the receive backlog."""
if self.direction == 'recv':
return self.n_msg_backlog
return 0 # pragma: debug
@property
def n_msg_backlog_send(self):
r"""int: Number of messages in the send backlog."""
if self.direction == 'send':
return self.n_msg_backlog
return 0 # pragma: debug
@property
def n_msg_backlog(self):
r"""int: Number of messages in the backlog."""
if (self.direction == 'recv') or self.is_open_backlog:
return len(self.backlog_buffer)
return 0
@property
def n_msg_recv_drain(self):
r"""int: Number of messages in the receive backlog and direct comm."""
return self.n_msg_direct_recv + self.n_msg_backlog_recv
@property
def n_msg_send_drain(self):
r"""int: Number of messages in the send backlog and direct comm."""
return self.n_msg_direct_send + self.n_msg_backlog_send
@property
def is_confirmed_send(self):
r"""bool: True if all sent messages have been confirmed."""
if not self._wrapped.is_confirmed_send:
return False
return (self.n_msg_send == 0)
@property
def backlog_buffer(self):
r"""list: Messages that have been received."""
with self.backlog_thread.lock:
return self._backlog_buffer
[docs] def add_backlog(self, payload):
r"""Add a message to the backlog of messages.
Args:
payload (tuple): Arguments and keyword argumetns for send
or data from receive.
"""
with self.backlog_thread.lock:
self.debug("Added message to %s backlog.",
self.direction)
if not self._closed:
self._backlog_buffer.append(payload)
self.backlog_ready.set()
[docs] def pop_backlog(self):
r"""Pop a message from the front of the backlog.
Returns:
tuple: First backlogged send arguments/keyword arguments
or received data.
"""
with self.backlog_thread.lock:
out = self._backlog_buffer.pop(0)
self.debug("Removed message from backlog.")
if len(self._backlog_buffer) == 0:
self.backlog_ready.clear()
return out
[docs] def run_backlog_send(self):
r"""Continue trying to send buffered messages."""
if not self.send_backlog(): # pragma: debug
self.debug("Stopping because send_backlog failed")
self._close_backlog()
return
self.periodic_debug('run_backlog_send', period=1000)(
"Sleeping (is_confirmed_send=%s, n_msg_send=%d)",
str(self.is_confirmed_send), self.n_msg_backlog_send)
self.sleep()
[docs] def run_backlog_recv(self):
r"""Continue buffering received messages."""
if self.backlog_thread.main_terminated: # pragma: debug
self.debug("Main thread terminated")
self.close()
return
if not self.recv_backlog():
self.debug("Failure to receive message into backlog.")
self._close_backlog()
return
self.periodic_debug('run_backlog_recv', period=1000)(
"Sleeping (is_confirmed_recv=%s)",
str(self.is_confirmed_recv))
self.sleep()
[docs] def send_direct(self, *args, **kwargs):
r"""Send a message directly to the underlying comm."""
self.periodic_debug("send_direct", period=1000)(
"Sending message to %s", self.address)
self.suppress_special_debug = True
kwargs.update(self.async_send_kwargs)
try:
kwargs.setdefault('timeout', 0)
if getattr(self._wrapped, self.async_send_method)(*args, **kwargs):
async_flag = FLAG_SUCCESS
self._used_direct = True
else:
async_flag = FLAG_FAILURE
self._error_registry = {}
except TemporaryCommunicationError:
async_flag = FLAG_TRYAGAIN
self.suppress_special_debug = False
return async_flag
[docs] def recv_direct(self):
r"""Receive a message directly from the underlying comm."""
self.periodic_debug("recv_direct", period=1000)(
"Receiving message from %s", self.address)
self.suppress_special_debug = True
msg = None
kwargs = self.async_recv_kwargs
try:
kwargs.setdefault('timeout', 0)
if self.async_recv_method != 'recv_message':
kwargs['return_message_object'] = True
msg = getattr(self._wrapped, self.async_recv_method)(**kwargs)
if msg.flag in [CommBase.FLAG_EMPTY, CommBase.FLAG_SKIP]:
async_flag = FLAG_TRYAGAIN # pragma: debug
elif msg.flag in [CommBase.FLAG_SUCCESS, CommBase.FLAG_EOF]:
async_flag = FLAG_SUCCESS
self._used_direct = True
if msg.flag == CommBase.FLAG_EOF:
self._backlog_received_eof = True
elif msg.flag == CommBase.FLAG_FAILURE:
async_flag = FLAG_FAILURE
else: # pragma: debug
raise Exception("Unsupported flag: %s" % msg.flag)
self._error_registry = {}
except TemporaryCommunicationError:
async_flag = FLAG_TRYAGAIN
self.suppress_special_debug = False
return async_flag, msg
[docs] def send_backlog(self):
r"""Send a message from the send backlog to the queue."""
if len(self.backlog_buffer) == 0:
flag = True
else:
iargs, ikwargs = self.backlog_buffer[0]
async_flag = self.send_direct(*iargs, **ikwargs)
flag = bool(async_flag)
if async_flag == FLAG_SUCCESS:
self.pop_backlog()
self.confirm_send()
return flag
[docs] def recv_backlog(self):
r"""Check for any messages in the queue and add them to the recv
backlog."""
if not self.is_open_direct:
flag = False
elif self._backlog_received_eof and self.close_on_eof_recv:
# Don't keep receiving, but don't close so that this thread
# can continue confirmation until the EOF is actually received
flag = True
else:
async_flag, msg = self.recv_direct()
flag = bool(async_flag)
if async_flag == FLAG_SUCCESS:
self.add_backlog(msg)
self.confirm_recv()
return flag
[docs] def send_message(self, msg, **kwargs):
r"""Send a message encapsulated in a CommMessage object.
Args:
msg (CommMessage): Message to be sent.
**kwargs: Additional keyword arguments are passed to _safe_send.
Returns:
bool: Success or failure of send.
"""
# This is required so that call to send_message for work comms
# in CommBase.send_message will put the messages in the backlog
kwargs['dont_prepare'] = True
return self.send(msg, **kwargs)
[docs] def send(self, *args, dont_prepare=False, **kwargs):
r"""Send a message to the backlog.
Args:
*args: All arguments are assumed to be part of the message.
**kwargs: All keywords arguments are passed to comm _send method.
Returns:
bool: Success or failure of sending the message.
"""
self.precheck('send')
kws_prepare = {k: kwargs.pop(k) for k in self._prepare_message_kws
if k in kwargs}
if not self.is_open_direct: # pragma: debug
return False
if dont_prepare:
assert (len(args) == 1) and isinstance(args[0], CommBase.CommMessage)
msg = args[0]
else:
msg = self._wrapped.prepare_message(*args, **kws_prepare)
if not self.backlog_ready.is_set():
async_flag = self.send_direct(msg, **kwargs)
if async_flag != FLAG_TRYAGAIN:
return bool(async_flag)
self.add_backlog(((msg, ), kwargs))
self._used = True
return True
[docs] def send_eof(self, *args, **kwargs):
r"""Send the EOF message as a short message.
Args:
*args: All arguments are passed to comm send.
**kwargs: All keywords arguments are passed to comm send.
Returns:
bool: Success or failure of send.
"""
return self.send(self.eof_msg, *args, **kwargs)
[docs] def recv_message(self, timeout=None, **kwargs):
r"""Receive a message.
Args:
*args: Arguments are passed to the response comm's recv_message method.
**kwargs: Keyword arguments are passed to the response comm's recv_message
method.
Returns:
CommMessage: Received message.
"""
# Sleep until there is a message
if timeout is None:
timeout = kwargs.get('timeout', self.recv_timeout)
T = self.start_timeout(timeout, key_suffix='.recv:backlog')
while (not T.is_out) and (not self.backlog_ready.is_set()):
self.backlog_ready.wait(self.sleeptime)
self.stop_timeout(key_suffix='.recv:backlog')
# Handle absence of messages
if self.n_msg_backlog == 0:
self.verbose_debug("No messages waiting.")
if self.is_closed:
self.debug(("No messages waiting and comm closed."
"%s, %s, %s")
% (self.backlog_thread is not None,
not self.backlog_thread.was_break,
self.backlog_thread.is_alive()))
self.printStatus(level='debug')
if self.backlog_thread.was_break:
self.debug("Break stack:\n%s",
self.backlog_thread.break_stack)
out = CommBase.CommMessage(flag=CommBase.FLAG_FAILURE)
else:
out = CommBase.CommMessage(flag=CommBase.FLAG_EMPTY,
args=self.empty_obj_recv)
# Return backlogged message
else:
self.debug('Returning backlogged received message')
out = self.pop_backlog()
return out
[docs] def finalize_message(self, msg, **kwargs):
r"""Perform actions to decipher a message.
Args:
msg (CommMessage): Initial message object to be finalized.
**kwargs: Keyword arguments are passed to the request comm's
finalize_message method.
Returns:
CommMessage: Deserialized and annotated message.
"""
orig_flag = msg.flag
if (msg.flag == CommBase.FLAG_EOF) and self.close_on_eof_recv:
self.close()
msg.flag = CommBase.FLAG_FAILURE
if orig_flag not in [CommBase.FLAG_FAILURE, CommBase.FLAG_EMPTY]:
self._used = True
if self.async_recv_method != 'recv_message':
msg.finalized = False
kwargs['skip_processing'] = True
return self._wrapped.finalize_message(msg, **kwargs)
[docs] def recv(self, timeout=None, return_message_object=False, dont_finalize=False,
**kwargs):
r"""Receive a message.
Args:
*args: All arguments are passed to comm _recv method.
return_message_object (bool, optional): If True, the full wrapped
CommMessage message object is returned instead of the tuple.
Defaults to False.
dont_finalize (bool, optional): If True, finalize_message will not
be called even if async_recv_method is 'recv_message'. Defaults
to False.
**kwargs: All keywords arguments are passed to comm _recv method.
Returns:
tuple (bool, obj): Success or failure of receive and received
message.
"""
self.precheck('recv')
out = self.recv_message(timeout=timeout, **kwargs)
if not dont_finalize:
kws_finalize = {k: kwargs.pop(k) for k in self._finalize_message_kws
if k in kwargs}
out = self.finalize_message(out, **kws_finalize)
if not return_message_object:
out = (bool(out.flag), out.args)
return out
[docs] def purge(self):
r"""Purge all messages from the comm."""
self._wrapped.purge()
with self.backlog_thread.lock:
self.backlog_ready.clear()
self._backlog_buffer = []
# ALIASES
[docs] def send_nolimit(self, *args, **kwargs):
r"""Alias for send_nolimit on wrapped comm."""
return CommBase.CommBase.send_nolimit(self, *args, **kwargs)
[docs] def recv_nolimit(self, *args, **kwargs):
r"""Alias for recv_nolimit on wrapped comm."""
return CommBase.CommBase.recv_nolimit(self, *args, **kwargs)
[docs] def send_array(self, *args, **kwargs):
r"""Alias for send_array on wrapped comm."""
return CommBase.CommBase.send_array(self, *args, **kwargs)
[docs] def recv_array(self, *args, **kwargs):
r"""Alias for recv_array on wrapped comm."""
return CommBase.CommBase.recv_array(self, *args, **kwargs)
[docs] def send_dict(self, *args, **kwargs):
r"""Alias for send_dict on wrapped comm."""
return CommBase.CommBase.send_dict(self, *args, **kwargs)
[docs] def recv_dict(self, *args, **kwargs):
r"""Alias for recv_dict on wrapped comm."""
return CommBase.CommBase.recv_dict(self, *args, **kwargs)