import subprocess
import os
import copy
import pprint
from contextlib import contextmanager
from yggdrasil.components import import_component
_temp_error_registry = {}
class AddressError(Exception):
r"""An exception class for errors in establishing a comm address."""
class TemporaryCommunicationError(Exception):
r"""Raised when the comm is open, but send/recv is temporarily disabled.
Args:
msg (str): Error message.
max_consecutive_allowed (int, optional): Maximum number of times this
error should be raised as a TemporaryCommunicationError before
being elevated to a FatalCommunicationError. Defaults to None and
is never elevated.
registry_key (str, optional): Key that should be used to register the
error. Defaults to msg.
Raises:
FatalCommunicationError: If the error is raised more than the number
of times specified via max_consecutive_allowed.
"""
def __init__(self, msg, max_consecutive_allowed=None,
registry_key=None, **kwargs):
super(TemporaryCommunicationError, self).__init__(msg, **kwargs)
self.max_consecutive_allowed = max_consecutive_allowed
if max_consecutive_allowed is not None:
global _temp_error_registry
assert registry_key is not None
_temp_error_registry.setdefault(registry_key, 0)
_temp_error_registry[registry_key] += 1
if ((_temp_error_registry[registry_key]
> max_consecutive_allowed)): # pragma: debug
raise FatalCommunicationError(msg, **kwargs)
@classmethod
def reset(cls, registry_key):
r"""Reset the registry for a TemporaryCommunicationError."""
global _temp_error_registry
_temp_error_registry.pop(registry_key, None)
class NoMessages(TemporaryCommunicationError):
r"""Raised when the comm is open, but there are no messages waiting."""
pass
class FatalCommunicationError(Exception):
r"""Raised when the comm cannot recover."""
pass
def check_env_for_address(env, name):
r"""Check for a channel name in a dictionary of environment variables.
Args:
env (dict): Environment variables to check.
name (str): Name of the channel to check for.
Returns:
str: The value stored in the environment variable for the channel.
Raises:
AddressError: If the channel cannot be located.
"""
check_names = [name, name.replace(':', '__COLON__')]
check_names += [x.upper() for x in copy.copy(check_names)]
for x in check_names:
if x in env:
return env[x]
raise AddressError(f'Cannot see {name} in env. '
f'Env:\n{pprint.pformat(env)}')
def import_comm(commtype=None):
r"""Import a comm class from its component subtype.
Args:
commtype (str, optional): Communication class subtype. Defaults to
the default comm type for the current OS.
Returns:
CommBase: Associated communication class.
"""
if commtype in ['server', 'client', 'fork']:
commtype = '%sComm' % commtype.title()
return import_component('comm', commtype)
def determine_suffix(no_suffix=False, reverse_names=False,
direction='send', **kwargs):
r"""Determine the suffix that should be used for the comm name.
Args:
no_suffix (bool, optional): If True, the suffix will be an empty
string. Defaults to False.
reverse_names (bool, optional): If True, the suffix will be
opposite that indicated by the direction. Defaults to False.
direction (str, optional): The direction that the comm will
processing messages. Defaults to 'send'.
**kwargs: Additional keyword arguments are ignored.
Returns:
str: Suffix that will be added to the comm name when producing
the name of the environment variable where information about
the comm will be stored.
Raises:
ValueError: If the direction is not 'recv' or 'send'.
"""
if direction not in ['send', 'recv']:
raise ValueError("Unrecognized message direction: %s" % direction)
if no_suffix:
suffix = ''
else:
if ((((direction == 'send') and (not reverse_names))
or ((direction == 'recv') and reverse_names))):
suffix = '_OUT'
else:
suffix = '_IN'
return suffix
[docs]def new_comm(name, commtype=None, use_async=False, **kwargs):
r"""Return a new communicator, creating necessary components for
communication (queues, sockets, channels, etc.).
Args:
name (str): Communicator name.
commtype (str, list, optional): Name of communicator type or a list
of specifiers for multiple communicators that should be joined.
Defaults to None.
use_async (bool, optional): If True, send/recv operations will
be performed asynchronously on new threads. Defaults to
False.
**kwargs: Additional keyword arguments are passed to communicator
class method new_comm.
Returns:
Comm: Communicator of given class.
"""
assert 'comm' not in kwargs
if isinstance(commtype, list):
if len(commtype) == 1:
kwargs.update(commtype[0])
kwargs.setdefault('name', name)
kwargs.setdefault('use_async', use_async)
return new_comm(**kwargs)
else:
kwargs['comm_list'] = commtype
commtype = 'fork'
if (commtype is None) and kwargs.get('filetype', None):
commtype = kwargs.pop('filetype')
comm_cls = import_comm(commtype)
if kwargs.get('is_interface', False):
use_async = False
async_kws = {}
if use_async:
from yggdrasil.communication.AsyncComm import AsyncComm
async_kws = {k: kwargs.pop(k) for k in AsyncComm._async_kws
if k in kwargs}
if use_async:
kwargs['is_async'] = True
out = comm_cls.new_comm(name, **kwargs)
if use_async and (out._commtype not in [None, 'client',
'server', 'fork']):
from yggdrasil.communication.AsyncComm import AsyncComm
out = AsyncComm(out, **async_kws)
return out
[docs]def get_comm(name, **kwargs):
r"""Return communicator for existing comm components.
Args:
name (str): Communicator name.
**kwargs: Additional keyword arguments are passed to new_comm.
Returns:
Comm: Communicator of given class.
"""
kwargs['dont_create'] = True
return new_comm(name, **kwargs)
@contextmanager
def open_file_comm(fname, mode, filetype='binary', **kwargs):
r"""Context manager to open a file comm in a way similar to how
Python opens file descriptors.
Args:
fname (str): Path to file that should be opened.
mode (str): Mode that file should be opened in. Supported values
include 'r', 'w', and 'a'.
filetype (str, optional): Type of file being opened. Defaults to
'binary'.
Returns:
CommBase: File comm.
"""
comm = None
try:
comm_cls = import_component('file', filetype)
if mode == 'r':
kwargs['direction'] = 'recv'
elif mode in ['w', 'a']:
kwargs['direction'] = 'send'
if mode == 'a':
kwargs['append'] = True
else:
raise ValueError("Unsupported mode: '%s'" % mode)
comm = comm_cls('file', address=fname, **kwargs)
yield comm
finally:
if comm is not None:
comm.close()
def get_open_fds(): # pragma: debug
'''
return the number of open file descriptors for current process
.. warning: will only work on UNIX-like os-es.
'''
procs = subprocess.check_output(["lsof", '-w', "-p", str(os.getpid())])
nprocs = len(procs.split(b'\n'))
# nprocs = len(
# list(filter(
# lambda s: s and s[ 0 ] == b'f' and s[1: ].isdigit(),
# procs.split( b'\n' ) ) )
# )
return nprocs
__all__ = ['new_comm', 'get_comm']