Source code for yggdrasil.communication.ServerComm

import os
import uuid
from collections import OrderedDict
from yggdrasil import constants
from yggdrasil.communication import CommBase, get_comm, import_comm


[docs]class Request(object): __slots__ = ['response_address', 'request_id'] def __init__(self, response_address, request_id): self.response_address = response_address self.request_id = request_id super(Request, self).__init__()
[docs]class ServerComm(CommBase.CommBase): r"""Class for handling Server side communication. Args: name (str): The environment variable where communication address is stored. request_commtype (str, optional): Comm class that should be used for the request comm. Defaults to None. response_kwargs (dict, optional): Keyword arguments for the response comm. Defaults to empty dict. direct_connection (bool, optional): If True, the comm will be directly connected to a ServerComm. Defaults to False. **kwargs: Additional keywords arguments are passed to the input comm. Attributes: response_kwargs (dict): Keyword arguments for the response comm. icomm (Comm): Request comm. ocomm (OrderedDict): Response comms for each request. """ _commtype = 'server' _dont_register = True def __init__(self, name, request_commtype=None, response_kwargs=None, dont_open=False, is_async=False, direct_connection=False, **kwargs): if response_kwargs is None: response_kwargs = dict() icomm_name = name icomm_kwargs = kwargs icomm_kwargs.update(direction='recv', dont_open=True, commtype=request_commtype) icomm_kwargs.setdefault('is_server', True) icomm_kwargs.setdefault('use_async', is_async) if icomm_kwargs.get('use_async', False): icomm_kwargs.setdefault('async_recv_method', 'recv_message') self.direct_connection = direct_connection self.response_kwargs = response_kwargs self.icomm = get_comm(icomm_name, **icomm_kwargs) self.ocomm = OrderedDict() self.requests = OrderedDict() self.response_kwargs.setdefault('is_interface', self.icomm.is_interface) self.response_kwargs.setdefault('commtype', self.icomm._commtype) self.response_kwargs.setdefault('recv_timeout', self.icomm.recv_timeout) self.response_kwargs.setdefault('language', self.icomm.language) self.response_kwargs.setdefault('use_async', self.icomm.is_async) self.response_kwargs.setdefault('env', self.icomm.env) self.clients = [] self.closed_clients = [] self.nclients_expected = int(os.environ.get('YGG_NCLIENTS', 0)) super(ServerComm, self).__init__(self.icomm.name, dont_open=dont_open, recv_timeout=self.icomm.recv_timeout, is_interface=self.icomm.is_interface, direction='recv', no_suffix=True, address=self.icomm.address, is_async=self.icomm.is_async, env=self.icomm.env)
[docs] def get_status_message(self, nindent=0, **kwargs): r"""Return lines composing a status message. Args: nindent (int, optional): Number of tabs that should be used to indent each line. Defaults to 0. *kwargs: Additional arguments are passed to the parent class's method. Returns: tuple(list, prefix): Lines composing the status message and the prefix string used for the last message. """ lines, prefix = super(ServerComm, self).get_status_message( nindent=nindent, **kwargs) lines.append('%s%-15s:' % (prefix, 'request comm')) lines += self.icomm.get_status_message(nindent=(nindent + 1))[0] lines.append('%s%-15s:' % (prefix, 'response comms')) for x in self.ocomm.values(): lines += x.get_status_message(nindent=(nindent + 1))[0] return lines, prefix
[docs] @classmethod def is_installed(cls, language=None): r"""Determine if the necessary libraries are installed for this communication class. Args: language (str, optional): Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked. Returns: bool: Is the comm installed. """ return import_comm().is_installed(language=language)
@property def maxMsgSize(self): r"""int: Maximum size of a single message that should be sent.""" return self.icomm.maxMsgSize
[docs] @classmethod def new_comm_kwargs(cls, name, request_commtype=None, **kwargs): r"""Initialize communication with new comms. Args: name (str): Name for new comm. request_commtype (str, optional): Name of class for new input comm. Defaults to None. """ args = [name] icomm_class = import_comm(request_commtype) kwargs['direction'] = 'recv' if 'address' not in kwargs: iargs, kwargs = icomm_class.new_comm_kwargs(name, **kwargs) kwargs['request_commtype'] = request_commtype return args, kwargs
@property def opp_comms(self): r"""dict: Name/address pairs for opposite comms.""" out = super(ServerComm, self).opp_comms out.update(**self.icomm.opp_comms) return out
[docs] def opp_comm_kwargs(self, for_yaml=False): r"""Get keyword arguments to initialize communication with opposite comm object. Args: for_yaml (bool, optional): If True, the returned dict will only contain values that can be specified in a YAML file. Defaults to False. Returns: dict: Keyword arguments for opposite comm object. """ kwargs = super(ServerComm, self).opp_comm_kwargs(for_yaml=for_yaml) kwargs['commtype'] = "client" kwargs['request_commtype'] = self.icomm._commtype kwargs['response_kwargs'] = self.response_kwargs kwargs['direct_connection'] = self.direct_connection return kwargs
[docs] def open(self): r"""Open the connection.""" super(ServerComm, self).open() self.icomm.open()
[docs] def close(self, *args, **kwargs): r"""Close the connection.""" self.icomm.close(*args, **kwargs) for ocomm in self.ocomm.values(): ocomm.close() super(ServerComm, self).close(*args, **kwargs)
@property def is_open(self): r"""bool: True if the connection is open.""" return self.icomm.is_open @property def is_closed(self): r"""bool: True if the connection is closed.""" return self.icomm.is_closed @property def n_msg_recv(self): r"""int: The number of messages in the connection.""" return self.icomm.n_msg_recv @property def n_msg_recv_drain(self): r"""int: The number of messages in the connection to drain.""" return self.icomm.n_msg_recv_drain @property def n_msg_direct(self): r"""int: Number of messages currently being routed.""" return self.icomm.n_msg_direct @property def open_clients(self): r"""list: Available open clients.""" return list(set(self.clients) - set(self.closed_clients)) @property def all_clients_connected(self): r"""bool: True if all expected clients have connected. False otherwise.""" return ((self.nclients_expected > 0) and (len(self.clients) >= self.nclients_expected)) # RESPONSE COMM
[docs] def create_response_comm(self, header): r"""Create a response comm based on information from the last header.""" if not isinstance(header, dict): # pragma: debug raise RuntimeError("No header received with last message.") meta = header.get('__meta__', {}) if 'response_address' not in meta: # pragma: debug raise RuntimeError(f"Last header does not contain response " f"address: {header}") comm_kwargs = dict(address=meta['response_address'], direction='send', **self.response_kwargs) if self.direct_connection: comm_kwargs['is_response_client'] = True else: comm_kwargs['is_response_server'] = True response_id = meta['request_id'] while response_id in self.requests: # pragma: debug response_id += str(uuid.uuid4()) meta['response_id'] = response_id if meta['response_address'] not in self.ocomm: self.ocomm[meta['response_address']] = get_comm( self.name + '-server_response_comm-' + response_id, **comm_kwargs) client_model = meta.get('model', '') self.ocomm[meta['response_address']].client_model = client_model if client_model and (client_model not in self.clients): self.clients.append(client_model) self.requests[response_id] = Request(meta['response_address'], meta['request_id'])
# SEND METHODS
[docs] def send_to(self, response_id, *args, **kwargs): r"""Send a message to a specific response comm. Args: response_id (str): ID used to register the response comm. *args: Arguments are passed to output comm send method. **kwargs: Keyword arguments are passed to output comm send method. Returns: obj: Output from output comm send method. """ kwargs.setdefault('header_kwargs', {}) kwargs['header_kwargs'].setdefault('__meta__', {}) kwargs['header_kwargs']['__meta__']['response_id'] = response_id return self.send(*args, **kwargs)
[docs] def prepare_message(self, *args, **kwargs): r"""Perform actions preparing to send a message. Args: *args: Components of the outgoing message. **kwargs: Keyword arguments are passed to the request comm's prepare_message method. Returns: CommMessage: Serialized and annotated message. """ if len(self.requests) == 0: # pragma: debug raise RuntimeError("There is no registered request.") kwargs.setdefault('header_kwargs', {}) kwargs['header_kwargs'].setdefault('__meta__', {}) response_id = kwargs['header_kwargs']['__meta__'].get('response_id', None) if response_id is None: response_id = next(iter(self.requests.keys())) kwargs['header_kwargs']['__meta__']['response_id'] = response_id request = self.requests[response_id] kwargs['header_kwargs']['__meta__']['request_id'] = request.request_id return self.ocomm[request.response_address].prepare_message( *args, **kwargs)
[docs] def send_message(self, msg, **kwargs): r"""Send a message encapsulated in a CommMessage object. Args: msg (CommMessage): Message to be sent. **kwargs: Additional keyword arguments are passed to the response comm's send_message method. Returns: bool: Success or failure of send. """ response_id = msg.header['__meta__']['response_id'] response_comm = self.ocomm[self.requests[response_id].response_address] out = response_comm.send_message(msg, **kwargs) self.errors += response_comm.errors if out: self.requests.pop(response_id) return out
# RECV METHODS
[docs] def recv_from(self, *args, **kwargs): r"""Receive a message from the input comm and open a new response comm for output using address from the header, returning the response_id. Args: *args: Arguments are passed to input comm recv method. **kwargs: Keyword arguments are passed to input comm recv method. Returns: tuple(bool, obj, str): Success or failure of recv call, output from input comm recv method, and response_id that response should be sent to. """ return_message_object = kwargs.pop('return_message_object', False) kwargs['return_message_object'] = True response_id = None out = self.recv(*args, **kwargs) if not return_message_object: if out.flag == CommBase.FLAG_SUCCESS: response_id = out.header['__meta__']['response_id'] out = (bool(out.flag), out.args, response_id) return out
[docs] def recv_message(self, *args, **kwargs): r"""Receive a message. Args: *args: Arguments are passed to the request comm's recv_message method. **kwargs: Keyword arguments are passed to the request comm's recv_message method. Returns: CommMessage: Received message. """ out = self.icomm.recv_message(*args, **kwargs) self.errors += self.icomm.errors return out
[docs] def finalize_message(self, msg, **kwargs): r"""Perform actions to decipher a message. Args: msg (CommMessage): Initial message object to be finalized. **kwargs: Keyword arguments are passed to the request comm's finalize_message method. Returns: CommMessage: Deserialized and annotated message. """ def check_for_client_info(msg): if msg.flag == CommBase.FLAG_SUCCESS: if isinstance(msg.args, bytes) and (msg.args == constants.YGG_CLIENT_EOF): self.debug("Client signed off: %s", msg.header['__meta__']['model']) self.closed_clients.append(msg.header['__meta__']['model']) msg.flag = CommBase.FLAG_SKIP else: self.create_response_comm(msg.header) return msg out = self.icomm.finalize_message(msg, **kwargs) return check_for_client_info(out)
# OLD STYLE ALIASES
[docs] def rpcSend(self, *args, **kwargs): r"""Alias for RPCComm.send""" return self.send(*args, **kwargs)
[docs] def rpcRecv(self, *args, **kwargs): r"""Alias for RPCComm.recv""" return self.recv(*args, **kwargs)
[docs] def drain_messages(self, direction='recv', **kwargs): r"""Sleep while waiting for messages to be drained.""" if direction == 'recv': self.icomm.drain_messages(direction='recv', **kwargs) self.errors += self.icomm.errors
[docs] def purge(self): r"""Purge input and output comms.""" self.icomm.purge() # Not sure if server should purge the response queue... # for ocomm in self.ocomm.values(): # ocomm.purge() super(ServerComm, self).purge()
[docs] def drain_server_signon_messages(self, **kwargs): r"""Drain server signon messages. This should only be used for testing purposes.""" self.icomm.drain_server_signon_messages(**kwargs)
[docs] def disconnect(self, *args, **kwargs): r"""Disconnect the comm.""" if hasattr(self, 'icomm'): self.icomm.disconnect() if hasattr(self, 'ocomm'): for k, v in self.ocomm.items(): v.disconnect() super(ServerComm, self).disconnect()
# ALIASED PROPERTIES WITH SETTERS @property def close_on_eof_recv(self): r"""bool: True if the comm will close when EOF is received.""" return self.icomm.close_on_eof_recv @close_on_eof_recv.setter def close_on_eof_recv(self, x): r"""Set close_on_eof_recv.""" self.icomm.close_on_eof_recv = x @property def filter(self): r"""FilterBase: filter for the communicator.""" return self.icomm.filter @filter.setter def filter(self, x): r"""Set the filter.""" self.icomm.filter = x