from cis_interface.communication import CommBase, get_comm, get_comm_class
[docs]class ServerComm(CommBase.CommBase):
r"""Class for handling Server side communication.
Args:
name (str): The environment variable where communication address is
stored.
request_comm (str, optional): Comm class that should be used for the
request comm. Defaults to None.
response_kwargs (dict, optional): Keyword arguments for the response
comm. Defaults to empty dict.
**kwargs: Additional keywords arguments are passed to the input comm.
Attributes:
response_kwargs (dict): Keyword arguments for the response comm.
icomm (Comm): Request comm.
ocomm (Comm): Response comm for last request.
"""
def __init__(self, name, request_comm=None, response_kwargs=None,
dont_open=False, **kwargs):
if response_kwargs is None:
response_kwargs = dict()
icomm_name = name
icomm_kwargs = kwargs
icomm_kwargs['direction'] = 'recv'
icomm_kwargs['dont_open'] = True
icomm_kwargs['comm'] = request_comm
self.response_kwargs = response_kwargs
self.icomm = get_comm(icomm_name, **icomm_kwargs)
self.ocomm = None
self.response_kwargs.setdefault('comm', self.icomm.comm_class)
self.response_kwargs.setdefault('recv_timeout', self.icomm.recv_timeout)
self._used_response_comms = dict()
super(ServerComm, self).__init__(self.icomm.name, dont_open=dont_open,
recv_timeout=self.icomm.recv_timeout,
is_interface=self.icomm.is_interface,
direction='recv', no_suffix=True,
address=self.icomm.address)
[docs] @classmethod
def is_installed(cls, language=None):
r"""Determine if the necessary libraries are installed for this
communication class.
Args:
language (str, optional): Specific language that should be checked
for compatibility. Defaults to None and all languages supported
on the current platform will be checked.
Returns:
bool: Is the comm installed.
"""
return get_comm_class().is_installed(language=language)
@property
def maxMsgSize(self):
r"""int: Maximum size of a single message that should be sent."""
return self.icomm.maxMsgSize
[docs] @classmethod
def underlying_comm_class(self):
r"""str: Name of underlying communication class."""
return get_comm_class().underlying_comm_class()
[docs] @classmethod
def comm_count(cls):
r"""int: Number of communication connections."""
return get_comm_class().comm_count()
[docs] @classmethod
def new_comm_kwargs(cls, name, request_comm=None, **kwargs):
r"""Initialize communication with new comms.
Args:
name (str): Name for new comm.
request_comm (str, optional): Name of class for new input comm.
Defaults to None.
"""
args = [name]
icomm_class = get_comm_class(request_comm)
kwargs['direction'] = 'recv'
if 'address' not in kwargs:
iargs, kwargs = icomm_class.new_comm_kwargs(name, **kwargs)
kwargs['request_comm'] = request_comm
return args, kwargs
@property
def opp_comms(self):
r"""dict: Name/address pairs for opposite comms."""
out = super(ServerComm, self).opp_comms
out.update(**self.icomm.opp_comms)
return out
[docs] def opp_comm_kwargs(self):
r"""Get keyword arguments to initialize communication with opposite
comm object.
Returns:
dict: Keyword arguments for opposite comm object.
"""
kwargs = super(ServerComm, self).opp_comm_kwargs()
kwargs['comm'] = "ClientComm"
kwargs['request_comm'] = self.icomm.comm_class
kwargs['response_kwargs'] = self.response_kwargs
return kwargs
[docs] def open(self):
r"""Open the connection."""
super(ServerComm, self).open()
self.icomm.open()
[docs] def close(self, *args, **kwargs):
r"""Close the connection."""
self.icomm.close(*args, **kwargs)
if self.ocomm is not None:
self.ocomm.close()
for ocomm in self._used_response_comms.values():
ocomm.close()
super(ServerComm, self).close(*args, **kwargs)
@property
def is_open(self):
r"""bool: True if the connection is open."""
return self.icomm.is_open
@property
def is_closed(self):
r"""bool: True if the connection is closed."""
return self.icomm.is_closed
@property
def n_msg_recv(self):
r"""int: The number of messages in the connection."""
return self.icomm.n_msg_recv
@property
def n_msg_recv_drain(self):
r"""int: The number of messages in the connection to drain."""
return self.icomm.n_msg_recv_drain
# RESPONSE COMM
[docs] def create_response_comm(self):
r"""Create a response comm based on information from the last header."""
if not isinstance(self.icomm._last_header, dict): # pragma: debug
raise RuntimeError("No header received with last message.")
elif 'response_address' not in self.icomm._last_header: # pragma: debug
raise RuntimeError("Last header does not contain response address.")
comm_kwargs = dict(address=self.icomm._last_header['response_address'],
direction='send', is_response_server=True,
single_use=True, **self.response_kwargs)
self.ocomm = get_comm(self.name + '.server_response_comm',
**comm_kwargs)
[docs] def remove_response_comm(self):
r"""Remove response comm."""
self.icomm._last_header = None
# self.ocomm.close_on_empty(no_wait=True)
self._used_response_comms[self.ocomm.name] = self.ocomm
self.ocomm = None
# SEND METHODS
[docs] def send(self, *args, **kwargs):
r"""Send a message to the output comm.
Args:
*args: Arguments are passed to output comm send method.
**kwargs: Keyword arguments are passed to output comm send method.
Returns:
obj: Output from output comm send method.
"""
# if self.is_closed:
# self.debug("send(): Connection closed.")
# return False
if self.ocomm is None: # pragma: debug
raise RuntimeError("There is no registered response comm.")
out = self.ocomm.send(*args, **kwargs)
self.remove_response_comm()
return out
# RECV METHODS
[docs] def recv(self, *args, **kwargs):
r"""Receive a message from the input comm and open a new response comm
for output using address from the header.
Args:
*args: Arguments are passed to input comm recv method.
**kwargs: Keyword arguments are passed to input comm recv method.
Returns:
obj: Output from input comm recv method.
"""
# if self.is_closed:
# self.debug("recv(): Connection closed.")
# return (False, None)
flag, msg = self.icomm.recv(*args, **kwargs)
if flag and msg and (msg != self.eof_msg):
self.create_response_comm()
return flag, msg
# OLD STYLE ALIASES
[docs] def rpcSend(self, *args, **kwargs):
r"""Alias for RPCComm.send"""
return self.send(*args, **kwargs)
[docs] def rpcRecv(self, *args, **kwargs):
r"""Alias for RPCComm.recv"""
return self.recv(*args, **kwargs)
[docs] def drain_messages(self, direction='recv', **kwargs):
r"""Sleep while waiting for messages to be drained."""
if direction == 'recv':
self.icomm.drain_messages(direction='recv', **kwargs)
[docs] def purge(self):
r"""Purge input and output comms."""
self.icomm.purge()
# Not sure if server should purge the response queue...
# if self.ocomm is not None:
# self.ocomm.purge()
super(ServerComm, self).purge()