Source code for cis_interface.communication

import os
import glob
import importlib
from cis_interface import tools


def determine_suffix(no_suffix=False, reverse_names=False,
                     direction='send', **kwargs):
    r"""Determine the suffix that should be used for the comm name.

    Args:
        no_suffix (bool, optional): If True, the suffix will be an empty
            string. Defaults to False.
        reverse_names (bool, optional): If True, the suffix will be
            opposite that indicated by the direction. Defaults to False.
        direction (str, optional): The direction that the comm will
            processing messages. Defaults to 'send'.
        **kwargs: Additional keyword arguments are ignored.

    Returns:
        str: Suffix that will be added to the comm name when producing
            the name of the environment variable where information about
            the comm will be stored.

    Raises:
        ValueError: If the direction is not 'recv' or 'send'.

    """
    if direction not in ['send', 'recv']:
        raise ValueError("Unrecognized message direction: %s" % direction)
    if no_suffix:
        suffix = ''
    else:
        if ((((direction == 'send') and (not reverse_names))
             or ((direction == 'recv') and reverse_names))):
            suffix = '_OUT'
        else:
            suffix = '_IN'
    return suffix


[docs]def get_comm_class(comm=None): r"""Return a communication class given it's name. Args: comm (str, optional): Name of communicator class. Defaults to tools.get_default_comm() if not provided. Returns: class: Communicator class. """ if (comm is None) or (comm == 'DefaultComm'): comm = tools.get_default_comm() mod = importlib.import_module('cis_interface.communication.%s' % comm) comm_cls = getattr(mod, comm) return comm_cls
[docs]def new_comm(name, comm=None, **kwargs): r"""Return a new communicator, creating necessary components for communication (queues, sockets, channels, etc.). Args: name (str): Communicator name. comm (str, optional): Name of communicator class. **kwargs: Additional keyword arguments are passed to communicator class method new_comm. Returns: Comm: Communicator of given class. """ if isinstance(comm, list): if len(comm) == 1: name = comm[0].pop('name', name) kwargs.update(comm[0]) return new_comm(name, **kwargs) else: kwargs['comm'] = comm comm = 'ForkComm' comm_cls = get_comm_class(comm) return comm_cls.new_comm(name, **kwargs)
[docs]def get_comm(name, **kwargs): r"""Return communicator for existing comm components. Args: name (str): Communicator name. **kwargs: Additional keyword arguments are passed to new_comm. Returns: Comm: Communicator of given class. """ kwargs['dont_create'] = True return new_comm(name, **kwargs)
[docs]def DefaultComm(*args, **kwargs): r"""Construct a comm object of the default type.""" return get_comm_class()(*args, **kwargs)
[docs]def cleanup_comms(comm=None): r"""Call cleanup_comms for the appropriate communicator class. Args: comm (str, optional): Name of communicator class. Defaults to tools.get_default_comm() if not provided. Returns: int: Number of comms closed. """ return get_comm_class(comm).cleanup_comms()
def import_all_comms(): r"""Import all comms to ensure they are registered.""" for x in glob.glob(os.path.join(os.path.dirname(__file__), '*.py')): if not x.startswith('__'): get_comm_class(os.path.basename(x)[:-3]) __all__ = ['new_comm', 'get_comm', 'get_comm_class', 'cleanup_comms', 'DefaultComm']