import sys
import logging
from subprocess import Popen, PIPE
from yggdrasil import platform, tools
from yggdrasil.communication import (
CommBase, TemporaryCommunicationError, NoMessages)
logger = logging.getLogger(__name__)
try:
import sysv_ipc
_ipc_installed = (platform._is_linux or platform._is_mac)
except ImportError: # pragma: windows
logger.debug("Could not import sysv_ipc."
" IPC support will be disabled.")
sysv_ipc = None
_ipc_installed = False
[docs]def get_queue(qid=None):
r"""Create or return a sysv_ipc.MessageQueue and register it.
Args:
qid (int, optional): If provided, ID for existing queue that should be
returned. Defaults to None and a new queue is returned.
Returns:
:class:`sysv_ipc.MessageQueue`: Message queue.
"""
if _ipc_installed:
kwargs = dict(max_message_size=tools.get_YGG_MSG_MAX())
if qid is None:
kwargs['flags'] = sysv_ipc.IPC_CREX
try:
mq = sysv_ipc.MessageQueue(qid, **kwargs)
except sysv_ipc.ExistentialError as e: # pragma: debug
raise sysv_ipc.ExistentialError(f"{e}: {qid}")
key = str(mq.key)
IPCComm.register_comm(key, mq)
return mq
else: # pragma: windows
logger.warning("IPC not installed. Queue cannot be returned.")
return None
[docs]def remove_queue(mq):
r"""Remove a sysv_ipc.MessageQueue and unregister it.
Args:
mq (:class:`sysv_ipc.MessageQueue`) Message queue.
Raises:
KeyError: If the provided queue is not registered.
"""
key = str(mq.key)
if not IPCComm.is_registered(key):
raise KeyError("Queue not registered.")
IPCComm.unregister_comm(key)
[docs]def ipcs(options=[]):
r"""Get the output from running the ipcs command.
Args:
options (list): List of flags that should be used. Defaults to an empty
list.
Returns:
str: Captured output.
"""
if _ipc_installed:
cmd = ' '.join(['ipcs'] + options)
p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True)
output, err = p.communicate()
exit_code = p.returncode
if exit_code != 0: # pragma: debug
if not err.isspace():
print(err.decode('utf-8'))
raise Exception("Error on spawned process. See output.")
return output.decode('utf-8')
else: # pragma: windows
logger.warn("IPC not installed. ipcs cannot be run.")
return ''
[docs]def ipc_queues(by_id=False):
r"""Get a list of active IPC queues.
Returns:
list: List of IPC queues.
"""
skip_lines = [
# Linux
'------ Message Queues --------',
'key msqid owner perms used-bytes messages ',
# MacOS
'IPC status from',
'Message Queues:',
'T ID KEY MODE OWNER GROUP']
out = ipcs(['-q']).split('\n')
qlist = []
for line in out:
skip = False
if len(line) == 0:
skip = True
else:
for ls in skip_lines:
if ls in line:
skip = True
break
if not skip:
if platform._is_linux:
if by_id:
key_col = 1
else:
key_col = 0
elif platform._is_mac:
if by_id:
key_col = 1
else:
key_col = 2
else: # pragma: debug
raise NotImplementedError("Unsure what column the queue key "
+ "is in on this platform "
+ "(%s)" % sys.platform)
qlist.append(line.split()[key_col])
return qlist
[docs]def ipcrm(options=[]):
r"""Remove IPC constructs using the ipcrm command.
Args:
options (list): List of flags that should be used. Defaults to an empty
list.
"""
if _ipc_installed:
cmd = ' '.join(['ipcrm'] + options)
p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True)
output, err = p.communicate()
exit_code = p.returncode
if exit_code != 0: # pragma: debug
if not err.isspace():
print(err.decode('utf-8'))
raise Exception("Error on spawned process. See output.")
if not output.isspace():
print(output.decode('utf-8'))
else: # pragma: windows
logger.warn("IPC not installed. ipcrm cannot be run.")
[docs]def ipcrm_queues(queue_keys=None, by_id=False):
r"""Delete existing IPC queues.
Args:
queue_keys (list, str, optional): A list of keys for queues that should
be removed. Defaults to all existing queues.
"""
if _ipc_installed:
if queue_keys is None:
queue_keys = ipc_queues(by_id=by_id)
if isinstance(queue_keys, str):
queue_keys = [queue_keys]
if by_id:
flags = '-q'
else:
flags = '-Q'
for q in queue_keys:
ipcrm([f"{flags} {q}"])
else: # pragma: windows
logger.warn("IPC not installed. ipcrm cannot be run.")
[docs]class IPCServer(CommBase.CommServer):
r"""IPC server object for cleaning up server queue."""
[docs] def terminate(self, *args, **kwargs):
IPCComm.unregister_comm(self.srv_address)
super(IPCServer, self).terminate(*args, **kwargs)
[docs]class IPCComm(CommBase.CommBase):
r"""Class for handling I/O via IPC message queues.
Attributes:
q (:class:`sysv_ipc.MessageQueue`): Message queue.
Developer Notes:
The default size limit for IPC message queues is 2048 bytes on Mac
operating systems so it is important that implementation of this
communication mechanism properly split and send messages larger than
this limit as more than one message.
"""
_commtype = 'ipc'
_schema_subtype_description = ('Interprocess communication (IPC) queue.')
_maxMsgSize = 2048 # Based on IPC limit on MacOS
address_description = ("An IPC message queue key.")
_deprecated_drivers = ['IPCInputDriver', 'IPCOutputDriver']
def _init_before_open(self, **kwargs):
r"""Initialize empty queue and server class."""
self.q = None
self._server_class = IPCServer
super(IPCComm, self)._init_before_open(**kwargs)
[docs] @classmethod
def close_registry_entry(cls, value):
r"""Close a registry entry."""
try:
value.remove()
out = True
except sysv_ipc.ExistentialError: # pragma: debug
out = False
return out
[docs] @classmethod
def new_comm_kwargs(cls, *args, **kwargs):
r"""Initialize communication with new queue."""
if 'address' not in kwargs:
kwargs.setdefault('address', 'generate')
return args, kwargs
[docs] def bind(self):
r"""Bind to random queue if address is generate."""
if not self._bound:
if self.address == 'generate':
self._bound = True
q = get_queue()
self.address = str(q.key)
super(IPCComm, self).bind()
[docs] def open_after_bind(self):
r"""Open the connection by getting the queue from the bound address."""
qid = int(self.address)
self.q = get_queue(qid)
[docs] def open(self):
r"""Open the queue."""
super(IPCComm, self).open()
if not self.is_open:
self.open_after_bind()
self.debug("qid: %s", self.q.key)
def _close(self, linger=False, skip_remove=False):
r"""Close the queue."""
if self._bound and (self.q is None):
try:
self.open_after_bind()
except sysv_ipc.ExistentialError: # pragma: debug
self.q = None
self._bound = False
# Remove the queue
dont_close = (skip_remove or self.is_client)
if (self.q is not None) and (not dont_close):
# Dont close for client because server will not be able
# to unregister the comm
self.unregister_comm(self.address, dont_close=dont_close)
self.q = None
self._bound = False
super(IPCComm, self)._close(linger=linger)
[docs] def atexit(self): # pragma: debug
r"""Close operations."""
if self.direction == 'send':
self.linger()
super(IPCComm, self).atexit()
@property
def is_open(self):
r"""bool: True if the queue is not None."""
if self.q is None:
return False
try:
self.q.current_messages
except AttributeError: # pragma: debug
if self.q is not None:
raise
return False
except sysv_ipc.ExistentialError: # pragma: debug
self._close()
return False
return True
[docs] def confirm_send(self, noblock=False):
r"""Confirm that sent message was received."""
if noblock:
return True
return (self.n_msg_send == 0)
[docs] def confirm_recv(self, noblock=False):
r"""Confirm that message was received."""
return True
@property
def n_msg_send(self):
r"""int: Number of messages in the queue to send."""
if self.is_open:
try:
return self.q.current_messages
except AttributeError: # pragma: debug
if self.is_open:
raise
return 0
except sysv_ipc.ExistentialError: # pragma: debug
self._close()
return 0
else:
return 0
@property
def n_msg_recv(self):
r"""int: Number of messages in the queue to recv."""
return self.n_msg_send
def _send(self, payload):
r"""Send a message.
Args:
payload (str): Message to send.
Returns:
bool: Success or failure of sending the message.
"""
try:
self.q.send(payload, block=False)
except sysv_ipc.BusyError: # pragma: debug
raise TemporaryCommunicationError("Queue full.")
return True
def _recv(self):
r"""Receive a message from the IPC queue.
Returns:
tuple (bool, str): The success or failure of receiving a message
and the message received.
"""
try:
data, _ = self.q.receive(block=False) # ignore ident
except sysv_ipc.BusyError: # pragma: debug
raise NoMessages("No messages in queue.")
return (True, data)
[docs] def purge(self):
r"""Purge all messages from the comm."""
super(IPCComm, self).purge()
try:
while self.n_msg > 0: # pragma: debug
self.q.receive()
except AttributeError: # pragma: debug
if self.is_open:
raise