Source code for cis_interface.communication.IPCComm

import sys
import logging
from subprocess import Popen, PIPE
from cis_interface import platform, tools
from cis_interface.schema import register_component
from cis_interface.communication import CommBase, AsyncComm
try:
    import sysv_ipc
    _ipc_installed = (platform._is_linux or platform._is_mac)
except ImportError:  # pragma: windows
    logging.warn("Could not import sysv_ipc. "
                 + "IPC support will be disabled.")
    sysv_ipc = None
    _ipc_installed = False


[docs]def get_queue(qid=None): r"""Create or return a sysv_ipc.MessageQueue and register it. Args: qid (int, optional): If provided, ID for existing queue that should be returned. Defaults to None and a new queue is returned. Returns: :class:`sysv_ipc.MessageQueue`: Message queue. """ if _ipc_installed: kwargs = dict(max_message_size=tools.get_CIS_MSG_MAX()) if qid is None: kwargs['flags'] = sysv_ipc.IPC_CREX mq = sysv_ipc.MessageQueue(qid, **kwargs) key = str(mq.key) CommBase.register_comm('IPCComm', key, mq) return mq else: # pragma: windows logging.warning("IPC not installed. Queue cannot be returned.") return None
[docs]def remove_queue(mq): r"""Remove a sysv_ipc.MessageQueue and unregister it. Args: mq (:class:`sysv_ipc.MessageQueue`) Message queue. Raises: KeyError: If the provided queue is not registered. """ key = str(mq.key) if not CommBase.is_registered('IPCComm', key): raise KeyError("Queue not registered.") CommBase.unregister_comm('IPCComm', key)
[docs]def ipcs(options=[]): r"""Get the output from running the ipcs command. Args: options (list): List of flags that should be used. Defaults to an empty list. Returns: str: Captured output. """ if _ipc_installed: cmd = ' '.join(['ipcs'] + options) p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True) output, err = p.communicate() exit_code = p.returncode if exit_code != 0: # pragma: debug if not err.isspace(): print(err.decode('utf-8')) raise Exception("Error on spawned process. See output.") return output.decode('utf-8') else: # pragma: windows logging.warn("IPC not installed. ipcs cannot be run.") return ''
[docs]def ipc_queues(): r"""Get a list of active IPC queues. Returns: list: List of IPC queues. """ skip_lines = [ # Linux '------ Message Queues --------', 'key msqid owner perms used-bytes messages ', # MacOS 'IPC status from', 'Message Queues:', 'T ID KEY MODE OWNER GROUP'] out = ipcs(['-q']).split('\n') qlist = [] for l in out: skip = False if len(l) == 0: skip = True else: for ls in skip_lines: if ls in l: skip = True break if not skip: if platform._is_linux: key_col = 0 elif platform._is_mac: key_col = 2 else: # pragma: debug raise NotImplementedError("Unsure what column the queue key " + "is in on this platform " + "(%s)" % sys.platform) qlist.append(l.split()[key_col]) return qlist
[docs]def ipcrm(options=[]): r"""Remove IPC constructs using the ipcrm command. Args: options (list): List of flags that should be used. Defaults to an empty list. """ if _ipc_installed: cmd = ' '.join(['ipcrm'] + options) p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True) output, err = p.communicate() exit_code = p.returncode if exit_code != 0: # pragma: debug if not err.isspace(): print(err.decode('utf-8')) raise Exception("Error on spawned process. See output.") if not output.isspace(): print(output.decode('utf-8')) else: # pragma: windows logging.warn("IPC not installed. ipcrm cannot be run.")
[docs]def ipcrm_queues(queue_keys=None): r"""Delete existing IPC queues. Args: queue_keys (list, str, optional): A list of keys for queues that should be removed. Defaults to all existing queues. """ if _ipc_installed: if queue_keys is None: queue_keys = ipc_queues() if isinstance(queue_keys, str): queue_keys = [queue_keys] for q in queue_keys: ipcrm(["-Q %s" % q]) else: # pragma: windows logging.warn("IPC not installed. ipcrm cannot be run.")
[docs]class IPCServer(CommBase.CommServer): r"""IPC server object for cleaning up server queue."""
[docs] def terminate(self, *args, **kwargs): CommBase.unregister_comm('IPCComm', self.srv_address) super(IPCServer, self).terminate(*args, **kwargs)
[docs]@register_component class IPCComm(AsyncComm.AsyncComm): r"""Class for handling I/O via IPC message queues. Attributes: q (:class:`sysv_ipc.MessageQueue`): Message queue. """ _commtype = 'ipc' _maxMsgSize = 2048 # Based on IPC limit on MacOS def _init_before_open(self, **kwargs): r"""Initialize empty queue and server class.""" self.q = None self._server_class = IPCServer super(IPCComm, self)._init_before_open(**kwargs)
[docs] @classmethod def is_installed(cls, language=None): r"""Determine if the necessary libraries are installed for this communication class. Args: language (str, optional): Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked. Returns: bool: Is the comm installed. """ if language in ['python', 'c']: out = _ipc_installed else: out = super(IPCComm, cls).is_installed(language=language) return out
[docs] @classmethod def underlying_comm_class(self): r"""str: Name of underlying communication class.""" return 'IPCComm'
[docs] @classmethod def close_registry_entry(cls, value): r"""Close a registry entry.""" try: value.remove() out = True except sysv_ipc.ExistentialError: # pragma: debug out = False return out
[docs] @classmethod def new_comm_kwargs(cls, *args, **kwargs): r"""Initialize communication with new queue.""" if 'address' not in kwargs: kwargs.setdefault('address', 'generate') return args, kwargs
[docs] def bind(self): r"""Bind to random queue if address is generate.""" if not self._bound: if self.address == 'generate': self._bound = True q = get_queue() self.address = str(q.key) super(IPCComm, self).bind()
[docs] def open_after_bind(self): r"""Open the connection by getting the queue from the bound address.""" qid = int(self.address) self.q = get_queue(qid)
def _open_direct(self): r"""Open the queue.""" if not self.is_open_direct: self.bind() self.open_after_bind() self.debug("qid: %s", self.q.key) def _close_direct(self, skip_remove=False): r"""Close the queue.""" if self._bound and (self.q is None): try: self.open_after_bind() except sysv_ipc.ExistentialError: # pragma: debug self.q = None self._bound = False # Remove the queue dont_close = (skip_remove or self.is_client) if (self.q is not None) and (not dont_close): # Dont close for client because server will not be able # to unregister the comm self.unregister_comm(self.address, dont_close=dont_close) self.q = None self._bound = False @property def is_open_direct(self): r"""bool: True if the queue is not None.""" if self.q is None: return False try: self.q.current_messages except AttributeError: # pragma: debug if self.q is not None: raise return False except sysv_ipc.ExistentialError: # pragma: debug self._close_direct() return False return True
[docs] def confirm_send(self, noblock=False): r"""Confirm that sent message was received.""" if noblock: return True return (self.n_msg_direct_send == 0)
[docs] def confirm_recv(self, noblock=False): r"""Confirm that message was received.""" return True
@property def n_msg_direct_send(self): r"""int: Number of messages in the queue to send.""" if self.is_open_direct: try: return self.q.current_messages except AttributeError: # pragma: debug if self.is_open_direct: raise return 0 except sysv_ipc.ExistentialError: # pragma: debug self._close_direct() return 0 else: return 0 @property def n_msg_direct_recv(self): r"""int: Number of messages in the queue to recv.""" return self.n_msg_direct_send def _send_direct(self, payload): r"""Send a message to the comm directly. Args: payload (str): Message to send. Returns: bool: Success or failure of sending the message. """ if not self.is_open_direct: # pragma: debug return False try: self.debug('Sending %d bytes', len(payload)) self.q.send(payload, block=False) self.debug('Sent %d bytes', len(payload)) except sysv_ipc.BusyError: # pragma: debug self.debug("IPC Queue Full") raise AsyncComm.AsyncTryAgain except OSError: # pragma: debug self.debug("Send failed") self._close_direct() return False except AttributeError: # pragma: debug if self.is_closed: self.debug("Comm closed") return False raise return True def _recv_direct(self): r"""Receive a message from the comm directly. Returns: tuple (bool, str): The success or failure of receiving a message and the message received. """ # Receive message self.debug("Message ready, reading it.") try: data, _ = self.q.receive() # ignore ident self.debug("Received %d bytes", len(data)) except sysv_ipc.ExistentialError: # pragma: debug self.debug("sysv_ipc.ExistentialError: closing") self._close_direct() return (False, self.empty_bytes_msg) except AttributeError: # pragma: debug if self.is_closed: self.debug("Queue closed") return (False, self.empty_bytes_msg) raise return (True, data)
[docs] def purge(self): r"""Purge all messages from the comm.""" super(IPCComm, self).purge() try: while self.n_msg_direct > 0: # pragma: debug self.q.receive() except AttributeError: # pragma: debug if self.is_open_direct: raise