Source code for cis_interface.communication.ClientComm

import uuid
from cis_interface.communication import (
    CommBase, new_comm, get_comm, get_comm_class)


[docs]class ClientComm(CommBase.CommBase): r"""Class for handling Client side communication. Args: name (str): The environment variable where communication address is stored. request_comm (str, optional): Comm class that should be used for the request comm. Defaults to None. response_kwargs (dict, optional): Keyword arguments for the response comm. Defaults to empty dict. **kwargs: Additional keywords arguments are passed to the output comm. Attributes: response_kwargs (dict): Keyword arguments for the response comm. icomm (dict): Response comms keyed to the ID of the associated request. icomm_order (list): Response comm keys in the order or the requests. ocomm (Comm): Request comm. """ def __init__(self, name, request_comm=None, response_kwargs=None, dont_open=False, **kwargs): if response_kwargs is None: response_kwargs = dict() ocomm_name = name ocomm_kwargs = kwargs ocomm_kwargs['direction'] = 'send' ocomm_kwargs['dont_open'] = True ocomm_kwargs['comm'] = request_comm self.response_kwargs = response_kwargs self.ocomm = get_comm(ocomm_name, **ocomm_kwargs) self.icomm = dict() self.icomm_order = [] self.response_kwargs.setdefault('comm', self.ocomm.comm_class) self.response_kwargs.setdefault('recv_timeout', self.ocomm.recv_timeout) super(ClientComm, self).__init__(self.ocomm.name, dont_open=dont_open, recv_timeout=self.ocomm.recv_timeout, is_interface=self.ocomm.is_interface, direction='send', no_suffix=True, address=self.ocomm.address)
[docs] @classmethod def is_installed(cls, language=None): r"""Determine if the necessary libraries are installed for this communication class. Args: language (str, optional): Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked. Returns: bool: Is the comm installed. """ return get_comm_class().is_installed(language=language)
@property def maxMsgSize(self): r"""int: Maximum size of a single message that should be sent.""" return self.ocomm.maxMsgSize
[docs] @classmethod def underlying_comm_class(self): r"""str: Name of underlying communication class.""" return get_comm_class().underlying_comm_class()
[docs] @classmethod def comm_count(cls): r"""int: Number of communication connections.""" return get_comm_class().comm_count()
[docs] @classmethod def new_comm_kwargs(cls, name, request_comm=None, **kwargs): r"""Initialize communication with new comms. Args: name (str): Name for new comm. request_comm (str, optional): Name of class for new output comm. Defaults to None. """ args = [name] ocomm_class = get_comm_class(request_comm) kwargs['direction'] = 'send' if 'address' not in kwargs: oargs, kwargs = ocomm_class.new_comm_kwargs(name, **kwargs) kwargs['request_comm'] = request_comm return args, kwargs
@property def opp_comms(self): r"""dict: Name/address pairs for opposite comms.""" out = super(ClientComm, self).opp_comms out.update(**self.ocomm.opp_comms) return out
[docs] def opp_comm_kwargs(self): r"""Get keyword arguments to initialize communication with opposite comm object. Returns: dict: Keyword arguments for opposite comm object. """ kwargs = super(ClientComm, self).opp_comm_kwargs() kwargs['comm'] = "ServerComm" kwargs['request_comm'] = self.ocomm.comm_class kwargs['response_kwargs'] = self.response_kwargs return kwargs
[docs] def open(self): r"""Open the connection.""" super(ClientComm, self).open() self.ocomm.open()
[docs] def close(self, *args, **kwargs): r"""Close the connection.""" self.ocomm.close(*args, **kwargs) for k in self.icomm_order: self.icomm[k].close() super(ClientComm, self).close(*args, **kwargs)
@property def is_open(self): r"""bool: True if the connection is open.""" return self.ocomm.is_open @property def is_closed(self): r"""bool: True if the connection is closed.""" return self.ocomm.is_closed @property def n_msg_send(self): r"""int: The number of messages in the connection.""" return self.ocomm.n_msg_send @property def n_msg_send_drain(self): r"""int: The number of outgoing messages in the connection to drain.""" return self.ocomm.n_msg_send_drain # RESPONSE COMM
[docs] def create_response_comm(self): r"""Create a response comm based on information from the last header.""" comm_kwargs = dict(direction='recv', is_response_client=True, single_use=True, **self.response_kwargs) header = dict(request_id=str(uuid.uuid4())) if header['request_id'] in self.icomm: # pragma: debug raise ValueError("Request ID %s already in use." % header['request_id']) c = new_comm('client_response_comm.' + header['request_id'], **comm_kwargs) header['response_address'] = c.address self.icomm[header['request_id']] = c self.icomm_order.append(header['request_id']) return header
[docs] def remove_response_comm(self): r"""Remove response comm.""" key = self.icomm_order.pop(0) icomm = self.icomm.pop(key) icomm.close()
# SEND METHODS
[docs] def send(self, *args, **kwargs): r"""Create a response comm and then send a message to the output comm with the response address in the header. Args: *args: Arguments are passed to output comm send method. **kwargs: Keyword arguments are passed to output comm send method. Returns: obj: Output from output comm send method. """ msg = args[0] # if self.is_closed: # self.debug("send(): Connection closed.") # return False if msg != self.eof_msg: kwargs['header_kwargs'] = self.create_response_comm() out = self.ocomm.send(*args, **kwargs) if (not out) and (msg != self.eof_msg): self.remove_response_comm() return out
# RECV METHODS
[docs] def recv(self, *args, **kwargs): r"""Receive a message from the input comm and open a new response comm for output using address from the header. Args: *args: Arguments are passed to input comm recv method. **kwargs: Keyword arguments are passed to input comm recv method. Returns: obj: Output from input comm recv method. """ # if self.is_closed: # self.debug("recv(): Connection closed.") # return (False, None) if len(self.icomm) == 0: # pragma: debug raise RuntimeError("There are not any registered response comms.") out = self.icomm[self.icomm_order[0]].recv(*args, **kwargs) self.remove_response_comm() return out
# CALL
[docs] def call(self, *args, **kwargs): r"""Do RPC call. The request message is sent to the output comm and the response is received from the input comm. Args: *args: Arguments are passed to output comm send method. **kwargs: Keyword arguments are passed to output comm send method Returns: obj: Output from input comm recv method. """ flag = self.send(*args, **kwargs) if not flag: # pragma: debug return (False, self.empty_obj_recv) return self.recv(timeout=False)
[docs] def call_nolimit(self, *args, **kwargs): r"""Alias for call.""" return self.call(*args, **kwargs)
# OLD STYLE ALIASES
[docs] def rpcSend(self, *args, **kwargs): r"""Alias for RPCComm.send""" return self.send(*args, **kwargs)
[docs] def rpcRecv(self, *args, **kwargs): r"""Alias for RPCComm.recv""" return self.recv(*args, **kwargs)
[docs] def rpcCall(self, *args, **kwargs): r"""Alias for RPCComm.call""" return self.call(*args, **kwargs)
[docs] def drain_messages(self, direction='send', **kwargs): r"""Sleep while waiting for messages to be drained.""" if direction == 'send': self.ocomm.drain_messages(direction='send', **kwargs)
# def purge(self): # r"""Purge input and output comms.""" # self.ocomm.purge() # # Unsure if client should purge all input comms... # # for k in self.icomm_order: # # self.icomm[k].purge() # super(ClientComm, self).purge()