import uuid
from collections import OrderedDict
from yggdrasil.communication import CommBase, new_comm, get_comm, import_comm
[docs]class ClientComm(CommBase.CommBase):
r"""Class for handling Client side communication.
Args:
name (str): The environment variable where communication address is
stored.
request_commtype (str, optional): Comm class that should be used for
the request comm. Defaults to None.
response_kwargs (dict, optional): Keyword arguments for the response
comm. Defaults to empty dict.
direct_connection (bool, optional): If True, the comm will be
directly connected to a ServerComm. Defaults to False.
**kwargs: Additional keywords arguments are passed to the output comm.
Attributes:
response_kwargs (dict): Keyword arguments for the response comm.
request_order (list): Order of request IDs.
responses (dict): Mapping between request IDs and response messages.
ocomm (Comm): Request comm.
icomm (Comm): Response comm.
"""
_commtype = 'client'
_dont_register = True
def __init__(self, name, request_commtype=None, response_kwargs=None,
dont_open=False, is_async=False, direct_connection=False,
**kwargs):
if response_kwargs is None:
response_kwargs = dict()
ocomm_name = name
ocomm_kwargs = kwargs
ocomm_kwargs['direction'] = 'send'
ocomm_kwargs['dont_open'] = True
ocomm_kwargs['commtype'] = request_commtype
ocomm_kwargs.setdefault('use_async', is_async)
if direct_connection:
ocomm_kwargs.setdefault('is_client', True)
self.direct_connection = direct_connection
self.response_kwargs = response_kwargs
self.ocomm = get_comm(ocomm_name, **ocomm_kwargs)
self.icomm = None
self.request_order = []
self.responses = OrderedDict()
for k, v in self.ocomm.get_response_comm_kwargs.items():
self.response_kwargs.setdefault(k, v)
self.response_kwargs.setdefault('is_interface', self.ocomm.is_interface)
self.response_kwargs.setdefault('recv_timeout', self.ocomm.recv_timeout)
self.response_kwargs.setdefault('language', self.ocomm.language)
self.response_kwargs.setdefault('use_async', self.ocomm.is_async)
self.response_kwargs.setdefault('env', self.ocomm.env)
super(ClientComm, self).__init__(self.ocomm.name, dont_open=dont_open,
recv_timeout=self.ocomm.recv_timeout,
is_interface=self.ocomm.is_interface,
direction='send', no_suffix=True,
address=self.ocomm.address,
is_async=self.ocomm.is_async,
env=self.ocomm.env)
[docs] def get_status_message(self, nindent=0, **kwargs):
r"""Return lines composing a status message.
Args:
nindent (int, optional): Number of tabs that should
be used to indent each line. Defaults to 0.
*kwargs: Additional arguments are passed to the
parent class's method.
Returns:
tuple(list, prefix): Lines composing the status message and the
prefix string used for the last message.
"""
lines, prefix = super(ClientComm, self).get_status_message(
nindent=nindent, **kwargs)
lines.append('%s%-15s:' % (prefix, 'request comm'))
lines += self.ocomm.get_status_message(nindent=(nindent + 1))[0]
lines.append('%s%-15s:' % (prefix, 'response comms'))
if self.icomm:
lines += self.icomm.get_status_message(nindent=(nindent + 1))[0]
return lines, prefix
[docs] @classmethod
def is_installed(cls, language=None):
r"""Determine if the necessary libraries are installed for this
communication class.
Args:
language (str, optional): Specific language that should be checked
for compatibility. Defaults to None and all languages supported
on the current platform will be checked.
Returns:
bool: Is the comm installed.
"""
return import_comm().is_installed(language=language)
@property
def maxMsgSize(self):
r"""int: Maximum size of a single message that should be sent."""
return self.ocomm.maxMsgSize
[docs] @classmethod
def new_comm_kwargs(cls, name, request_commtype=None, **kwargs):
r"""Initialize communication with new comms.
Args:
name (str): Name for new comm.
request_commtype (str, optional): Name of class for new output
comm. Defaults to None.
"""
args = [name]
ocomm_class = import_comm(request_commtype)
kwargs['direction'] = 'send'
if 'address' not in kwargs:
oargs, kwargs = ocomm_class.new_comm_kwargs(name, **kwargs)
kwargs['request_commtype'] = request_commtype
return args, kwargs
@property
def opp_address(self):
r"""str: Address for opposite comm."""
return self.ocomm.opp_address
@property
def opp_comms(self):
r"""dict: Name/address pairs for opposite comms."""
out = super(ClientComm, self).opp_comms
out.update(**self.ocomm.opp_comms)
return out
[docs] def opp_comm_kwargs(self, for_yaml=False):
r"""Get keyword arguments to initialize communication with opposite
comm object.
Args:
for_yaml (bool, optional): If True, the returned dict will only
contain values that can be specified in a YAML file. Defaults
to False.
Returns:
dict: Keyword arguments for opposite comm object.
"""
kwargs = super(ClientComm, self).opp_comm_kwargs(for_yaml=for_yaml)
kwargs['commtype'] = "server"
kwargs['request_commtype'] = self.ocomm._commtype
kwargs['response_kwargs'] = self.response_kwargs
kwargs['direct_connection'] = self.direct_connection
return kwargs
[docs] def open(self):
r"""Open the connection."""
super(ClientComm, self).open()
self.ocomm.open()
[docs] def close(self, *args, **kwargs):
r"""Close the connection."""
self.ocomm.close(*args, **kwargs)
if self.icomm is not None:
self.icomm.close(*args, **kwargs)
super(ClientComm, self).close(*args, **kwargs)
@property
def is_open(self):
r"""bool: True if the connection is open."""
return self.ocomm.is_open
@property
def is_closed(self):
r"""bool: True if the connection is closed."""
return self.ocomm.is_closed
@property
def n_msg_send(self):
r"""int: The number of messages in the connection."""
return self.ocomm.n_msg_send
@property
def n_msg_send_drain(self):
r"""int: The number of outgoing messages in the connection to drain."""
return self.ocomm.n_msg_send_drain
@property
def n_msg_direct(self):
r"""int: Number of messages currently being routed."""
return self.ocomm.n_msg_direct
[docs] def atexit(self): # pragma: debug
r"""Close operations."""
self.send_eof()
super(ClientComm, self).atexit()
# RESPONSE COMM
[docs] def create_response_comm(self):
r"""Create a response comm based on information from the last header."""
header = {'request_id': str(uuid.uuid4())}
while header['request_id'] in self.request_order: # pragma: debug
header['request_id'] += str(uuid.uuid4())
if self.icomm is None:
comm_kwargs = dict(direction='recv', is_response_client=True,
**self.response_kwargs)
if comm_kwargs.get('use_async', False):
comm_kwargs['async_recv_method'] = 'recv_message'
self.icomm = new_comm(self.name + '-client_response_comm', **comm_kwargs)
header['response_address'] = self.icomm.opp_address
self.request_order.append(header['request_id'])
return header
# SEND METHODS
[docs] def prepare_message(self, *args, **kwargs):
r"""Perform actions preparing to send a message.
Args:
*args: Components of the outgoing message.
**kwargs: Keyword arguments are passed to the request comm's
prepare_message method.
Returns:
CommMessage: Serialized and annotated message.
"""
def add_response_address(msg):
if msg.flag == CommBase.FLAG_SUCCESS:
msg.header.setdefault('__meta__', {})
msg.header['__meta__'].update(self.create_response_comm())
return msg
kwargs.setdefault('after_prepare_message', [])
kwargs['after_prepare_message'].append(add_response_address)
return self.ocomm.prepare_message(*args, **kwargs)
[docs] def send_message(self, msg, **kwargs):
r"""Send a message encapsulated in a CommMessage object.
Args:
msg (CommMessage): Message to be sent.
**kwargs: Additional keyword arguments are passed to the request
comm's send_message method.
Returns:
bool: Success or failure of send.
"""
out = self.ocomm.send_message(msg, **kwargs)
self.errors += self.ocomm.errors
return out
# RECV METHODS
[docs] def recv_message(self, *args, **kwargs):
r"""Receive a message.
Args:
*args: Arguments are passed to the response comm's recv_message method.
**kwargs: Keyword arguments are passed to the response comm's recv_message
method.
Returns:
CommMessage: Received message.
"""
if not self.request_order: # pragma: debug
raise RuntimeError("There are not any requests registered.")
while self.responses.get(self.request_order[0], None) is None:
msg = self.icomm.recv_message(*args, **kwargs)
self.errors += self.icomm.errors
if msg.flag != CommBase.FLAG_SUCCESS: # pragma: debug
break
assert msg.header['__meta__']['request_id'] not in self.responses
self.responses[msg.header['__meta__']['request_id']] = msg
if self.request_order[0] in self.responses:
msg = self.responses.pop(self.request_order[0])
self.request_order.pop(0)
return msg
[docs] def finalize_message(self, msg, **kwargs):
r"""Perform actions to decipher a message.
Args:
msg (CommMessage): Initial message object to be finalized.
**kwargs: Keyword arguments are passed to the response comm's
finalize_message method.
Returns:
CommMessage: Deserialized and annotated message.
"""
if self.icomm is None: # pragma: debug
raise RuntimeError("There are not any registered response comms.")
return self.icomm.finalize_message(msg, **kwargs)
# CALL
[docs] def call(self, *args, **kwargs):
r"""Do RPC call. The request message is sent to the output comm and the
response is received from the input comm.
Args:
*args: Arguments are passed to output comm send method.
**kwargs: Keyword arguments are passed to output comm send method
Returns:
obj: Output from input comm recv method.
"""
flag = self.send(*args, **kwargs)
if not flag: # pragma: debug
return (False, self.empty_obj_recv)
return self.recv(timeout=False)
[docs] def call_nolimit(self, *args, **kwargs):
r"""Alias for call."""
return self.call(*args, **kwargs)
# OLD STYLE ALIASES
[docs] def rpcSend(self, *args, **kwargs):
r"""Alias for RPCComm.send"""
return self.send(*args, **kwargs)
[docs] def rpcRecv(self, *args, **kwargs):
r"""Alias for RPCComm.recv"""
return self.recv(*args, **kwargs)
[docs] def rpcCall(self, *args, **kwargs):
r"""Alias for RPCComm.call"""
return self.call(*args, **kwargs)
[docs] def drain_messages(self, direction='send', **kwargs):
r"""Sleep while waiting for messages to be drained."""
if direction == 'send':
self.ocomm.drain_messages(direction='send', **kwargs)
[docs] def disconnect(self, *args, **kwargs):
r"""Disconnect the comm."""
if hasattr(self, 'ocomm'):
self.ocomm.disconnect()
if getattr(self, 'icomm', None):
self.icomm.disconnect()
super(ClientComm, self).disconnect()
# ALIASED PROPERTIES WITH SETTERS
@property
def filter(self):
r"""FilterBase: filter for the communicator."""
return self.ocomm.filter
@filter.setter
def filter(self, x):
r"""Set the filter."""
self.ocomm.filter = x