yggdrasil.communication package

Subpackages

Submodules

yggdrasil.communication.AsciiFileComm module

class yggdrasil.communication.AsciiFileComm.AsciiFileComm(*args, **kwargs)[source]

Bases: FileComm

Class for handling I/O from/to a file on disk.

Parameters:
  • name (str) – The environment variable where communication address is stored.

  • comment (str, optional) – String indicating a comment. If ‘read_meth’ is ‘readline’ and this is provided, lines starting with a comment will be skipped.

  • **kwargs – Additional keywords arguments are passed to parent class.

classmethod get_testing_options(**kwargs)[source]

Method to return a dictionary of testing options for this class.

Returns:

Dictionary of variables to use for testing. Key/value pairs:
kwargs (dict): Keyword arguments for comms tested with the

provided content.

send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test

file that was sent the messages in ‘send’.

contents (bytes): Bytes contents of test file created by sending

the messages in ‘send’.

Return type:

dict

yggdrasil.communication.AsciiMapComm module

class yggdrasil.communication.AsciiMapComm.AsciiMapComm(*args, **kwargs)[source]

Bases: FileComm

Class for handling I/O from/to a ASCII map on disk.

Parameters:
  • name (str) – The environment variable where file path is stored.

  • **kwargs – Additional keywords arguments are passed to parent class.

yggdrasil.communication.AsciiTableComm module

class yggdrasil.communication.AsciiTableComm.AsciiTableComm(*args, **kwargs)[source]

Bases: FileComm

Class for handling I/O from/to a file on disk.

yggdrasil.communication.AsyncComm module

class yggdrasil.communication.AsyncComm.AsyncComm(wrapped, daemon=False, async_recv_method='recv', async_send_method='send_message', async_recv_kwargs=None, async_send_kwargs=None)[source]

Bases: ProxyObject, ComponentBaseUnregistered

Class for handling asynchronous I/O.

Parameters:
  • name (str) – The name of the message queue.

  • async_recv_method (str, optional) – The method that should be used to receive message into the backlog. Defaults to ‘recv’.

  • async_send_method (dict, optional) – The method that should be used to send message in the backlog. Defaults to ‘send_message’.

  • async_recv_kwargs (dict, optional) – Keyword arguments to pass to calls receiving messages into the backlog. Defaults to {}.

  • async_send_method – Keyword arguments to pass to calls sending message from the backlog. Defaults to {}.

  • **kwargs – Additional keyword arguments are passed to CommBase.

backlog_ready

Event set when there is a message in the recv backlog.

Type:

multitasking.Event

add_backlog(payload)[source]

Add a message to the backlog of messages.

Parameters:

payload (tuple) – Arguments and keyword argumetns for send or data from receive.

async_recv_kwargs
async_recv_method
async_send_kwargs
async_send_method
atexit()[source]

Close operations.

property backlog_buffer

Messages that have been received.

Type:

list

backlog_ready
property backlog_thread

Task that will handle sinding or receiving backlogged messages.

Type:

tools.YggTask

close(linger=False)[source]

Close the connection.

Parameters:

linger (bool, optional) – If True, drain messages before closing the comm. Defaults to False.

close_on_eof_recv
daemon
property errors

Errors raised by the wrapped comm or async thread.

Type:

list

finalize_message(msg, **kwargs)[source]

Perform actions to decipher a message.

Parameters:
  • msg (CommMessage) – Initial message object to be finalized.

  • **kwargs – Keyword arguments are passed to the request comm’s finalize_message method.

Returns:

Deserialized and annotated message.

Return type:

CommMessage

property is_closed

True if the connection is closed.

Type:

bool

property is_confirmed_send

True if all sent messages have been confirmed.

Type:

bool

property is_open

True if the backlog is open.

Type:

bool

property is_open_backlog

True if the backlog thread is running.

Type:

bool

property is_open_direct

True if the direct comm is not None.

Type:

bool

property n_msg

The number of messages in the connection.

Type:

int

property n_msg_backlog

Number of messages in the backlog.

Type:

int

property n_msg_backlog_recv

Number of messages in the receive backlog.

Type:

int

property n_msg_backlog_send

Number of messages in the send backlog.

Type:

int

property n_msg_direct

Number of messages currently being routed.

Type:

int

property n_msg_direct_recv

Number of messages currently being routed in recv.

Type:

int

property n_msg_direct_send

Number of messages currently being routed in send.

Type:

int

property n_msg_recv

Number of messages in the receive backlog.

Type:

int

property n_msg_recv_drain

Number of messages in the receive backlog and direct comm.

Type:

int

property n_msg_send

Number of messages in the send backlog.

Type:

int

property n_msg_send_drain

Number of messages in the send backlog and direct comm.

Type:

int

next(*args, **kwargs)
open()[source]

Open the connection by connecting to the queue.

pop_backlog()[source]

Pop a message from the front of the backlog.

Returns:

First backlogged send arguments/keyword arguments

or received data.

Return type:

tuple

precheck(*args, **kwargs)[source]
printStatus(*args, **kwargs)[source]

Print status of the communicator.

purge()[source]

Purge all messages from the comm.

recv(timeout=None, return_message_object=False, dont_finalize=False, **kwargs)[source]

Receive a message.

Parameters:
  • *args – All arguments are passed to comm _recv method.

  • return_message_object (bool, optional) – If True, the full wrapped CommMessage message object is returned instead of the tuple. Defaults to False.

  • dont_finalize (bool, optional) – If True, finalize_message will not be called even if async_recv_method is ‘recv_message’. Defaults to False.

  • **kwargs – All keywords arguments are passed to comm _recv method.

Returns:

Success or failure of receive and received

message.

Return type:

tuple (bool, obj)

recv_array(*args, **kwargs)[source]

Alias for recv_array on wrapped comm.

recv_backlog()[source]

Check for any messages in the queue and add them to the recv backlog.

recv_dict(*args, **kwargs)[source]

Alias for recv_dict on wrapped comm.

recv_direct()[source]

Receive a message directly from the underlying comm.

recv_message(timeout=None, **kwargs)[source]

Receive a message.

Parameters:
  • *args – Arguments are passed to the response comm’s recv_message method.

  • **kwargs – Keyword arguments are passed to the response comm’s recv_message method.

Returns:

Received message.

Return type:

CommMessage

recv_nolimit(*args, **kwargs)[source]

Alias for recv_nolimit on wrapped comm.

run_backlog_recv()[source]

Continue buffering received messages.

run_backlog_send()[source]

Continue trying to send buffered messages.

send(*args, dont_prepare=False, **kwargs)[source]

Send a message to the backlog.

Parameters:
  • *args – All arguments are assumed to be part of the message.

  • **kwargs – All keywords arguments are passed to comm _send method.

Returns:

Success or failure of sending the message.

Return type:

bool

send_array(*args, **kwargs)[source]

Alias for send_array on wrapped comm.

send_backlog()[source]

Send a message from the send backlog to the queue.

send_dict(*args, **kwargs)[source]

Alias for send_dict on wrapped comm.

send_direct(*args, **kwargs)[source]

Send a message directly to the underlying comm.

send_eof(*args, **kwargs)[source]

Send the EOF message as a short message.

Parameters:
  • *args – All arguments are passed to comm send.

  • **kwargs – All keywords arguments are passed to comm send.

Returns:

Success or failure of send.

Return type:

bool

send_message(msg, **kwargs)[source]

Send a message encapsulated in a CommMessage object.

Parameters:
  • msg (CommMessage) – Message to be sent.

  • **kwargs – Additional keyword arguments are passed to _safe_send.

Returns:

Success or failure of send.

Return type:

bool

send_nolimit(*args, **kwargs)[source]

Alias for send_nolimit on wrapped comm.

yggdrasil.communication.BufferComm module

class yggdrasil.communication.BufferComm.BufferComm(*args, **kwargs)[source]

Bases: CommBase

Class for handling I/O to an in-memory buffer.

Parameters:
  • name (str) – The name of the message queue.

  • address (LockedBuffer, optional) – Existing buffer that should be used. If not provided, a new buffer is created.

  • buffer_task_method (str, optional) – Type of tasks that buffer will be used to share information between. Defaults to ‘thread’.

  • **kwargs – Additional keyword arguments are passed to CommBase.

address

Buffer containing messages.

Type:

LockedBuffer

buffer_task_method

Type of tasks that buffer will be used to share information between.

Type:

str

bind()[source]

Bind to new buffer.

property is_confirmed_recv

True if all received messages have been confirmed.

Type:

bool

property is_confirmed_send

True if all sent messages have been confirmed.

Type:

bool

classmethod is_installed(language=None)[source]

Determine if the necessary libraries are installed for this communication class.

Parameters:

language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked. If set to ‘any’, the result will be True if this comm is installed for any of the supported languages.

Returns:

Is the comm installed.

Return type:

bool

property is_open

True if the connection is open.

Type:

bool

property n_msg_recv

Number of messages in the receive backlog.

Type:

int

property n_msg_send

Number of messages in the send backlog.

Type:

int

no_serialization = True
open()[source]

Open the buffer.

purge()[source]

Purge all messages from the comm.

class yggdrasil.communication.BufferComm.LockedBuffer(*args, **kwargs)[source]

Bases: Queue

Buffer intended to be shared between threads/processes.

append(x)[source]

Add an element to the queue.

clear()[source]

Remove all elements from the queue.

close(join=False)[source]

Close the buffer.

property closed

True if the queue is closed, False otherwise.

Type:

bool

disconnect()[source]

Disconnect from the aliased object by replacing it with a dummy object.

pop(index=0, default=None)[source]

Remove the first element from the queue.

yggdrasil.communication.CABOFileComm module

class yggdrasil.communication.CABOFileComm.CABOFileComm(*args, **kwargs)[source]

Bases: FileComm

Class for handling I/O from/to a CABO parameter file on disk.

yggdrasil.communication.ClientComm module

class yggdrasil.communication.ClientComm.ClientComm(*args, **kwargs)[source]

Bases: CommBase

Class for handling Client side communication.

Parameters:
  • name (str) – The environment variable where communication address is stored.

  • request_commtype (str, optional) – Comm class that should be used for the request comm. Defaults to None.

  • response_kwargs (dict, optional) – Keyword arguments for the response comm. Defaults to empty dict.

  • direct_connection (bool, optional) – If True, the comm will be directly connected to a ServerComm. Defaults to False.

  • **kwargs – Additional keywords arguments are passed to the output comm.

response_kwargs

Keyword arguments for the response comm.

Type:

dict

request_order

Order of request IDs.

Type:

list

responses

Mapping between request IDs and response messages.

Type:

dict

ocomm

Request comm.

Type:

Comm

icomm

Response comm.

Type:

Comm

atexit()[source]

Close operations.

call(*args, **kwargs)[source]

Do RPC call. The request message is sent to the output comm and the response is received from the input comm.

Parameters:
  • *args – Arguments are passed to output comm send method.

  • **kwargs – Keyword arguments are passed to output comm send method

Returns:

Output from input comm recv method.

Return type:

obj

call_nolimit(*args, **kwargs)[source]

Alias for call.

close(*args, **kwargs)[source]

Close the connection.

create_response_comm()[source]

Create a response comm based on information from the last header.

disconnect(*args, **kwargs)[source]

Disconnect the comm.

drain_messages(direction='send', **kwargs)[source]

Sleep while waiting for messages to be drained.

property filter

filter for the communicator.

Type:

FilterBase

finalize_message(msg, **kwargs)[source]

Perform actions to decipher a message.

Parameters:
  • msg (CommMessage) – Initial message object to be finalized.

  • **kwargs – Keyword arguments are passed to the response comm’s finalize_message method.

Returns:

Deserialized and annotated message.

Return type:

CommMessage

get_status_message(nindent=0, **kwargs)[source]

Return lines composing a status message.

Parameters:
  • nindent (int, optional) – Number of tabs that should be used to indent each line. Defaults to 0.

  • *kwargs – Additional arguments are passed to the parent class’s method.

Returns:

Lines composing the status message and the

prefix string used for the last message.

Return type:

tuple(list, prefix)

property is_closed

True if the connection is closed.

Type:

bool

classmethod is_installed(language=None)[source]

Determine if the necessary libraries are installed for this communication class.

Parameters:

language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked.

Returns:

Is the comm installed.

Return type:

bool

property is_open

True if the connection is open.

Type:

bool

property maxMsgSize

Maximum size of a single message that should be sent.

Type:

int

property n_msg_direct

Number of messages currently being routed.

Type:

int

property n_msg_send

The number of messages in the connection.

Type:

int

property n_msg_send_drain

The number of outgoing messages in the connection to drain.

Type:

int

classmethod new_comm_kwargs(name, request_commtype=None, **kwargs)[source]

Initialize communication with new comms.

Parameters:
  • name (str) – Name for new comm.

  • request_commtype (str, optional) – Name of class for new output comm. Defaults to None.

open()[source]

Open the connection.

property opp_address

Address for opposite comm.

Type:

str

opp_comm_kwargs(for_yaml=False)[source]

Get keyword arguments to initialize communication with opposite comm object.

Parameters:

for_yaml (bool, optional) – If True, the returned dict will only contain values that can be specified in a YAML file. Defaults to False.

Returns:

Keyword arguments for opposite comm object.

Return type:

dict

property opp_comms

Name/address pairs for opposite comms.

Type:

dict

prepare_message(*args, **kwargs)[source]

Perform actions preparing to send a message.

Parameters:
  • *args – Components of the outgoing message.

  • **kwargs – Keyword arguments are passed to the request comm’s prepare_message method.

Returns:

Serialized and annotated message.

Return type:

CommMessage

recv_message(*args, **kwargs)[source]

Receive a message.

Parameters:
  • *args – Arguments are passed to the response comm’s recv_message method.

  • **kwargs – Keyword arguments are passed to the response comm’s recv_message method.

Returns:

Received message.

Return type:

CommMessage

rpcCall(*args, **kwargs)[source]

Alias for RPCComm.call

rpcRecv(*args, **kwargs)[source]

Alias for RPCComm.recv

rpcSend(*args, **kwargs)[source]

Alias for RPCComm.send

send_message(msg, **kwargs)[source]

Send a message encapsulated in a CommMessage object.

Parameters:
  • msg (CommMessage) – Message to be sent.

  • **kwargs – Additional keyword arguments are passed to the request comm’s send_message method.

Returns:

Success or failure of send.

Return type:

bool

yggdrasil.communication.CommBase module

class yggdrasil.communication.CommBase.CommBase(*args, **kwargs)[source]

Bases: YggClass

Class for handling I/O.

Parameters:
  • name (str) – The environment variable where communication address is stored.

  • address (str, optional) – Communication info. Default to None and address is taken from the environment variable.

  • direction (str, optional) – The direction that messages should flow through the connection. ‘send’ if the connection will send messages, ‘recv’ if the connecton will receive messages. Defaults to ‘send’.

  • is_interface (bool, optional) – Set to True if this comm is a Python interface binding. Defaults to False.

  • language (str, optional) – Programming language of the calling model. Defaults to ‘python’.

  • env (dict, optional) – Environment variable that should be used. Defaults to os.environ if not provided.

  • partner_model (str, optional) – Name of model that this comm is partnered with. Default to None, indicating that the partner is not a model.

  • partner_language (str, optional) – Programming language of this comm’s partner comm. Defaults to ‘python’.

  • partner_mpi_ranks (list, optional) – Ranks of processes of this comm’s partner comm(s). Defaults to [].

  • datatype (schema, optional) – JSON schema (with expanded core types defined by yggdrasil) that constrains the type of data that should be sent/received by this object. Defaults to {‘type’: ‘bytes’}. Additional information on specifying datatypes can be found here.

  • field_names (list, optional) – [DEPRECATED] Field names that should be used to label fields in sent/received tables. This keyword is only valid for table-like datatypes. If not provided, field names are created based on the field order.

  • field_units (list, optional) – [DEPRECATED] Field units that should be used to convert fields in sent/received tables. This keyword is only valid for table-like datatypes. If not provided, all fields are assumed to be unitless.

  • as_array (bool, optional) – [DEPRECATED] If True and the datatype is table-like, tables are sent/recieved with either columns rather than row by row. Defaults to False.

  • ( (default_file) – class:.DefaultSerialize, optional): Class with serialize and deserialize methods that should be used to process sent and received messages or a dictionary describing a serializer that obeys the serializer schema.

  • format_str (str, optional) – String that should be used to format/parse messages. Default to None.

  • dont_open (bool, optional) – If True, the connection will not be opened. Defaults to False.

  • is_interface – Set to True if this comm is a Python interface binding. Defaults to False.

  • recv_timeout (float, optional) – Time that should be waited for an incoming message before returning None. Defaults to 0 (no wait). A value of False indicates that recv should block.

  • close_on_eof_recv (bool, optional) – If True, the comm will be closed when it receives an end-of-file messages. Otherwise, it will remain open. Defaults to True.

  • close_on_eof_send (bool, optional) – If True, the comm will be closed after it sends an end-of-file messages. Otherwise, it will remain open. Defaults to False.

  • single_use (bool, optional) – If True, the comm will only be used to send/recv a single message. Defaults to False.

  • reverse_names (bool, optional) – If True, the suffix added to the comm with be reversed. Defaults to False.

  • no_suffix (bool, optional) – If True, no directional suffix will be added to the comm name. Defaults to False.

  • allow_multiple_comms (bool, optional) – If True, initialize the comm such that mulitiple comms can connect to the same address. Defaults to False.

  • is_client (bool, optional) – If True, the comm is one of many potential clients that will be sending messages to one or more servers. Defaults to False.

  • is_response_client (bool, optional) – If True, the comm is a client-side response comm. Defaults to False.

  • is_server (bool, optional) – If True, the commis one of many potential servers that will be receiving messages from one or more clients. Defaults to False.

  • is_response_server (bool, optional) – If True, the comm is a server-side response comm. Defaults to False.

  • recv_converter (func, optional) – Converter that should be used on received objects. Defaults to None.

  • send_converter (func, optional) – Converter that should be used on sent objects. Defaults to None.

  • vars (list, optional) – Names of variables to be sent/received by this comm. Defaults to [].

  • length_map (dict, optional) – Map from pointer variable names to the names of variables where their length will be stored. Defaults to {}.

  • comm (str, optional) – The comm that should be created. This only serves as a check that the correct class is being created. Defaults to None.

  • ( – class:.FilterBase, optional): Filter that will be used to determine when messages should be sent/received. Ignored if not provided.

  • ( – class:.TransformBase, optional): One or more transformations that will be applied to messages that are sent/received. Ignored if not provided.

  • is_default (bool, optional) – If True, this comm was created to handle all input/output variables to/from a model. Defaults to False. This variable is used internally and should not be set explicitly in the YAML.

  • outside_loop (bool, optional) – If True, and the comm is an input/outputs to/from a model being wrapped. The receive/send calls for this comm will be outside the loop for the model. Defaults to False.

  • dont_copy (bool, optional) – If True, the comm will not be duplicated in the even a model is duplicated via the ‘copies’ parameter. Defaults to False except for in the case that a model is wrapped and the comm is inside the loop or that a model is a RPC input to a model server.

  • ( – class:FileComm, optional): Comm information for a file that input should be drawn from (for input comms) or that output should be sent to (for output comms) in the event that a yaml does not pair the comm with another model comm or a file.

  • default_value (object, optional) – Value that should be returned in the event that a yaml does not pair the comm with another model comm or a file.

  • for_service (bool, optional) – If True, this comm bridges the gap to an integration running as a service, possibly on a remote machine. Defaults to False.

  • **kwargs – Additional keywords arguments are passed to parent class.

Class Attributes:

is_file (bool): True if the comm accesses a file. _maxMsgSize (int): Maximum size of a single message that should be sent. address_description (str): Description of the information constituting

an address for this communication mechanism.

name

The environment variable where communication address is stored.

Type:

str

address

Communication info.

Type:

str

direction

The direction that messages should flow through the connection.

Type:

str

is_interface

True if this comm is a Python interface binding.

Type:

bool

language

Language that this comm is being called from.

Type:

str

env

Environment variable that should be used.

Type:

dict

partner_model

Name of model that this comm is partnered with.

Type:

str

partner_language

Programming language of this comm’s partner comm.

Type:

str

partner_mpi_ranks

Ranks of processes of this comm’s partner comm(s).

Type:

list

serializer (

class:.DefaultSerialize): Object that will be used to serialize/deserialize messages to/from python objects.

recv_timeout

Time that should be waited for an incoming message before returning None.

Type:

float

close_on_eof_recv

If True, the comm will be closed when it receives an end-of-file messages. Otherwise, it will remain open.

Type:

bool

close_on_eof_send

If True, the comm will be closed after it sends an end-of-file messages. Otherwise, it will remain open.

Type:

bool

single_use

If True, the comm will only be used to send/recv a single message.

Type:

bool

allow_multiple_comms

If True, initialize the comm such that mulitiple comms can connect to the same address.

Type:

bool

is_client

If True, the comm is one of many potential clients that will be sending messages to one or more servers.

Type:

bool

is_response_client

If True, the comm is a client-side response comm.

Type:

bool

is_server

If True, the comm is one of many potential servers that will be receiving messages from one or more clients.

Type:

bool

is_response_server

If True, the comm is a server-side response comm.

Type:

bool

recv_converter

Converter that should be used on received objects.

Type:

func

send_converter

Converter that should be used on sent objects.

Type:

func

filter (

class:.FilterBase): Callable class that will be used to determine when messages should be sent/received.

Raises:
  • RuntimeError – If the comm class is not installed.

  • AddressError – If there is not an environment variable with the specified name.

  • ValueError – If directions is not ‘send’ or ‘recv’.

add_work_comm(comm)[source]

Add work comm to dict.

Parameters:

( (comm) – class:.CommBase): Comm that should be added.

Raises:

KeyError – If there is already a comm associated with the key.

address_description = None
property any_files

True if the comm interfaces with any files.

Type:

bool

apply_transform(msg_in, for_empty=False, header=False)[source]

Evaluate the transform to alter the emssage being sent/received.

Parameters:
  • msg_in (object) – Message being transformed.

  • for_empty (bool, optional) – If True, the transformation is being used to check for an empty message and errors will be caught. Defaults to False.

  • header (dict, optional) – Header keyword arguments associated with a message. Defaults to False and is ignored.

  • typedef (dict, optiona) – Type to transform. Default to None and will be determined by the serializer if receiving.

Returns:

Transformed message.

Return type:

object

apply_transform_to_type(typedef)[source]

Evaluate the transform to alter the type definition.

Parameters:

typedef (dict) – Type definition to transform.

Returns:

Transformed type definition.

Return type:

dict

atexit()[source]

Close operations.

bind()[source]

Bind in place of open.

chunk_message(msg)[source]

Yield chunks of message of size maxMsgSize

Parameters:

msg (str, bytes) – Raw message bytes to be chunked.

Returns:

Chunks of message.

Return type:

str

classmethod cleanup_comms()[source]

Cleanup registered comms of this class.

close(linger=False, **kwargs)[source]

Close the connection.

Parameters:
  • linger (bool, optional) – If True, drain messages before closing the comm. Defaults to False.

  • **kwargs – Additional keyword arguments are passed to linger method if linger is True.

close_in_thread(no_wait=False, timeout=None)[source]

In a new thread, close the comm when it is empty.

Parameters:
  • no_wait (bool, optional) – If True, don’t wait for closing thread to stop.

  • timeout (float, optional) – Time that should be waited for the comm to close. Defaults to None and is set to self.timeout. If False, this will block until the comm is closed.

classmethod close_registry_entry(value)[source]

Close a registry entry.

coerce_to_dict(msg, key_order, metadata)[source]

Convert a message to a dictionary.

Parameters:
  • msg (object) – Message to convert to a dictionary.

  • key_order (list) – Key order to use for the output dictionary.

  • metadata (dict) – Header data to accompany the message.

Returns:

Converted message.

Return type:

dict

classmethod comm_count()[source]

int: Number of communication connections.

classmethod comm_registry()[source]

dict: Registry of comms of this class.

confirm(direction=None, noblock=False)[source]

Confirm message.

confirm_recv(noblock=False)[source]

Confirm that message was received.

confirm_send(noblock=False)[source]

Confirm that sent message was received.

create_work_comm(work_comm_name=None, **kwargs)[source]

Create a temporary work comm.

Parameters:
  • work_comm_name (str, optional) – Name that should be used for the work comm. If not provided, one is created from the header id and the comm class.

  • **kwargs – Keyword arguments for new_comm that should override work_comm_kwargs.

Returns:

class:.CommBase: Work comm.

property create_work_comm_kwargs

Keyword arguments for a new work comm.

Type:

dict

deserialize(*args, **kwargs)[source]

Deserialize a message using the associated deserializer.

drain_messages(direction=None, timeout=None, variable=None)[source]

Sleep while waiting for messages to be drained.

drain_server_signon_messages(**kwargs)[source]

Drain server signon messages. This should only be used for testing purposes.

property empty_bytes_msg

Empty serialized message.

Type:

str

property empty_obj_recv

Empty message object.

Type:

obj

property eof_msg

Message indicating EOF.

Type:

str

evaluate_filter(*msg_in)[source]

Evaluate the filter to determine how the message should be handled.

Parameters:

*msg_in (object) – Parts of message being evaluated.

Returns:

True if the filter evaluates to True, False otherwise.

Return type:

bool

extract_key_order(kwargs)[source]

Extract the key order from keyword arguments.

Parameters:

kwargs (dict) – Keyword arguments.

Returns:

Key order.

Return type:

list

finalize_message(msg, skip_processing=False, skip_python2language=False, after_finalize_message=None)[source]

Perform actions to decipher a message. The order of steps is

  1. Transform the message

  2. Filter

  3. python2language

  4. Close comm on EOF if close_on_eof_recv set

  5. Check for empty recv after processing

  6. Mark comm as used and close if single use

  7. Apply after_finalize_message functions

Parameters:
  • msg (CommMessage) – Initial message object to be finalized.

  • skip_processing (bool, optional) – If True, filters, transformations, and after_finalize_message funciton applications will not be performed. Defaults to False.

  • skip_python2language (bool, optional) – If True, python2language will not be applied. Defaults to False.

  • after_finalize_message (list, optional) – A set of function that should be applied to received CommMessage objects following the standard finalization. Defaults to None and is ignored.

Returns:

Deserialized and annotated message.

Return type:

CommMessage

property full_model_name

Name of the model using the comm w/ copy suffix.

Type:

str

property get_response_comm_kwargs

Keyword arguments to use for a response comm.

Type:

dict

get_status_message(nindent=0, extra_lines_before=None, extra_lines_after=None)[source]

Return lines composing a status message.

Parameters:
  • nindent (int, optional) – Number of tabs that should be used to indent each line. Defaults to 0.

  • extra_lines_before (list, optional) – Additional lines that should be added to the beginning of the default print message. Defaults to empty list if not provided.

  • extra_lines_after (list, optional) – Additional lines that should be added to the end of the default print message. Defaults to empty list if not provided.

Returns:

Lines composing the status message and the

prefix string used for the last message.

Return type:

tuple(list, prefix)

classmethod get_testing_options(serializer=None, test_dir=None, **kwargs)[source]

Method to return a dictionary of testing options for this class.

Parameters:

serializer (str, optional) – The name of the serializer that should be used. If not provided, the _default_serializer class attribute will be used.

Returns:

Dictionary of variables to use for testing. Key/value pairs:
kwargs (dict): Keyword arguments for comms tested with the

provided content.

send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test

file that was sent the messages in ‘send’.

contents (bytes): Bytes contents of test file created by sending

the messages in ‘send’.

Return type:

dict

get_work_comm(header, **kwargs)[source]

Get temporary work comm, creating as necessary.

Parameters:
  • header (dict) – Information that will be sent in the message header to the work comm.

  • **kwargs – Additional keyword arguments are passed to header2workcomm.

Returns:

class:.CommBase: Work comm.

property get_work_comm_kwargs

Keyword arguments for an existing work comm.

Type:

dict

header2workcomm(header, work_comm_name=None, **kwargs)[source]

Get a work comm based on header info.

Parameters:
  • header (dict) – Information that will be sent in the message header to the work comm.

  • work_comm_name (str, optional) – Name that should be used for the work comm. If not provided, one is created from the header id and the comm class.

  • **kwargs – Additional keyword arguments are added to the returned dictionary.

Returns:

class:.CommBase: Work comm.

property is_closed

True if the connection is closed.

Type:

bool

property is_confirmed

True if all messages have been confirmed.

Type:

bool

property is_confirmed_recv

True if all received messages have been confirmed.

Type:

bool

property is_confirmed_send

True if all sent messages have been confirmed.

Type:

bool

is_empty(msg, emsg)[source]

Check that a message matches an empty message object.

Parameters:
  • msg (object) – Message object.

  • emsg (object) – Empty message object.

Returns:

True if the object is empty, False otherwise.

Return type:

bool

is_empty_recv(msg)[source]

Check if a received message object is empty.

Parameters:

msg (obj) – Message object.

Returns:

True if the object is empty, False otherwise.

Return type:

bool

is_eof(msg)[source]

Determine if a message is an EOF.

Parameters:

msg (obj) – Message object to be tested.

Returns:

True if the message indicates an EOF, False otherwise.

Return type:

bool

is_file = False
classmethod is_installed(language=None)[source]

Determine if the necessary libraries are installed for this communication class.

Parameters:

language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked. If set to ‘any’, the result will be True if this comm is installed for any of the supported languages.

Returns:

Is the comm installed.

Return type:

bool

property is_open

True if the connection is open.

Type:

bool

classmethod is_registered(key)[source]

bool: True if the comm is registered, False otherwise.

language_atexit()[source]

Close operations specific to the language.

linger(active_confirm=False)[source]

Wait for messages to drain.

linger_close(**kwargs)[source]

Wait for messages to drain, then close.

property maxMsgSize

Maximum size of a single message that should be sent.

Type:

int

property model_copies

Number of copies of the model using the comm.

Type:

int

property model_env

Mapping between model name and opposite comm environment variables that need to be provided to the model.

Type:

dict

property model_name

Name of the model using the comm.

Type:

str

property n_msg

The number of messages in the connection.

Type:

int

property n_msg_recv

The number of incoming messages in the connection.

Type:

int

property n_msg_recv_drain

The number of incoming messages in the connection to drain.

Type:

int

property n_msg_send

The number of outgoing messages in the connection.

Type:

int

property n_msg_send_drain

The number of outgoing messages in the connection to drain.

Type:

int

classmethod new_comm(name, *args, **kwargs)[source]

Initialize communication with new queue.

classmethod new_comm_kwargs(*args, **kwargs)[source]

Get keyword arguments for new comm.

new_server(srv_address)[source]

Create a new server.

Parameters:

srv_address (str) – Address of server comm.

no_serialization = False
open()[source]

Open the connection.

property opp_address

Address for opposite comm.

Type:

str

opp_comm_kwargs(for_yaml=False)[source]

Get keyword arguments to initialize communication with opposite comm object.

Parameters:

for_yaml (bool, optional) – If True, the returned dict will only contain values that can be specified in a YAML file. Defaults to False.

Returns:

Keyword arguments for opposite comm object.

Return type:

dict

property opp_comms

Name/address pairs for opposite comms.

Type:

dict

property opp_name

Name that should be used for the opposite comm.

Type:

str

precheck(direction)[source]

Check that comm is ready for action in specified direction, raising errors if it is not.

Parameters:

direction (str) – Check that comm is ready to perform this action.

prepare_header(header_kwargs)[source]

Prepare header kwargs for the communicator.

prepare_message(*args, header_kwargs=None, skip_serialization=False, skip_processing=False, skip_language2python=False, after_prepare_message=None, flag=None)[source]

Perform actions preparing to send a message. The order of steps is

  1. Convert the message based on the language

  2. Isolate the message if there is only one

  3. Check if the message is EOF

  4. Check if the message should be filtered

  5. Transform the message

  6. Apply after_prepare_message functions

  7. Serialize the message

  8. Create a work comm if the message is too large to be sent all at once

Parameters:
  • *args – Components of the outgoing message.

  • header_kwargs (dict, optional) – Header options that should be set.

  • skip_serialization (bool, optional) – If True, serialization will not be performed. Defaults to False.

  • skip_processing (bool, optional) – If True, filters, transformations, and after_prepare_message function applications will not be performed. Defaults to False.

  • skip_language2python (bool, optional) – If True, language2python will be skipped. Defaults to False.

  • after_prepare_message (list, optional) – Functions that should be applied after transformation, but before serialization. Defaults to None and is ignored.

  • flag (int, optional) – Flag that should be added to the message before any additional processing is performed. Defaults to None and is ignored.

Returns:

Serialized and annotated message.

Return type:

CommMessage

printStatus(*args, level='info', return_str=False, **kwargs)[source]

Print status of the communicator.

purge()[source]

Purge all messages from the comm.

recv(*args, return_message_object=False, **kwargs)[source]

Receive a message.

Parameters:
  • *args – All arguments are passed to comm _recv method.

  • return_message_object (bool, optional) – If True, the full wrapped CommMessage message object is returned instead of the tuple. Defaults to False.

  • **kwargs – All keywords arguments are passed to comm _recv method.

Returns:

Success or failure of receive and received

message. If return_message_object is True, the CommMessage object will be returned instead.

Return type:

tuple (bool, obj)

recv_array(*args, **kwargs)[source]

Alias for recv.

recv_dict(*args, **kwargs)[source]

Return a received message as a dictionary of fields. If there are not any fields specified, the fields will have the form ‘f0’, ‘f1’, ‘f2’, …

Parameters:
  • *args – Arguments are passed to recv.

  • **kwargs – Keyword arguments are passed to recv.

Returns:

Success/failure of receive and a dictionar of

message fields.

Return type:

tuple(bool, dict)

Raises:

recv_message(*args, skip_deserialization=False, **kwargs)[source]

Receive a message.

Parameters:
  • *args – Arguments are passed to _safe_recv.

  • skip_deserialization (bool, optional) – If True, deserialization is not performed. Defaults to False.

  • **kwargs – Additional keyword arguments are passed to _safe_recv.

Returns:

Received message.

Return type:

CommMessage

recv_nolimit(*args, **kwargs)[source]

Alias for recv.

classmethod register_comm(key, value)[source]

Register a comm.

remove_work_comm(key, in_thread=False, linger=False)[source]

Close and remove a work comm.

Parameters:
  • key (str) – Key of comm that should be removed.

  • in_thread (bool, optional) – If True, close the work comm in a thread. Defaults to False.

  • linger (bool, optional) – If True, drain messages before closing the comm. Defaults to False.

send(*args, **kwargs)[source]

Send a message.

Parameters:
  • *args – All arguments are assumed to be part of the message.

  • **kwargs – All keywords arguments are passed to prepare_message or send_message.

Returns:

Success or failure of send.

Return type:

bool

send_array(*args, **kwargs)[source]

Alias for send.

send_dict(args_dict, **kwargs)[source]

Send a message with fields specified in the input dictionary.

Parameters:
  • args_dict (dict) – Dictionary of arguments to send.

  • **kwargs – Additiona keyword arguments are passed to send.

Returns:

Success/failure of send.

Return type:

bool

Raises:

RuntimeError – If the field order can not be determined.

send_eof(*args, **kwargs)[source]

Send the EOF message as a short message.

Parameters:
  • *args – All arguments are passed to comm send.

  • **kwargs – All keywords arguments are passed to comm send.

Returns:

Success or failure of send.

Return type:

bool

send_message(msg, skip_safe_send=False, **kwargs)[source]

Send a message encapsulated in a CommMessage object.

Parameters:
  • msg (CommMessage) – Message to be sent.

  • skip_safe_send (bool, optional) – If True, no actual send will take place. Defaults to False.

  • **kwargs – Additional keyword arguments are passed to _safe_send.

Returns:

Success or failure of send.

Return type:

bool

send_nolimit(*args, **kwargs)[source]

Alias for send.

serialize(*args, **kwargs)[source]

Serialize a message using the associated serializer.

server_exists(srv_address)[source]

Determine if a server exists.

Parameters:

srv_address (str) – Address of server comm.

Returns:

True if a server with the provided address exists, False

otherwise.

Return type:

bool

signoff_from_server()[source]

Remove a client from the server.

signon_to_server()[source]

Add a client to an existing server or create one.

classmethod underlying_comm_class()[source]

str: Name of underlying communication class.

classmethod unregister_comm(key, dont_close=False)[source]

Unregister a comm.

update_message_from_serializer(msg)[source]

Update a message with information about the serializer.

Parameters:

msg (CommMessage) – Incoming message.

update_serializer_from_message(msg)[source]

Update the serializer based on information stored in a message.

Parameters:

msg (CommMessage) – Outgoing message.

wait_for_confirm(timeout=None, direction=None, active_confirm=False, noblock=False)[source]

Sleep until all messages are confirmed.

wait_for_workers(timeout=None)[source]

Sleep until all workers are closed or have been used.

workcomm2header(work_comm, **kwargs)[source]

Get header information from a comm.

Parameters:
  • ( (work_comm) – class:.CommBase): Work comm that header describes.

  • **kwargs – Additional keyword arguments are added to the header.

Returns:

Header information that will be sent with a message.

Return type:

dict

class yggdrasil.communication.CommBase.CommMessage(msg=None, length=0, flag=None, args=None, header=None)[source]

Bases: object

Class for passing messages around with additional information required to send/receive them.

msg

The serialized message including the header.

Type:

bytes

length

The size of the message.

Type:

int

flag

Indicates the result of processing the message. Values are: FLAG_FAILURE: Processing was unsuccessful. FLAG_SUCCESS: Processing was successful. FLAG_SKIP: The message should be skipped. FLAG_EOF: The message indicates that there will be no more messages.

Type:

int

args

The unserialized message (post-transformation).

Type:

object

header

Parameters sent in the header of the message.

Type:

dict

additional_messages

Messages that should be sent along with this message as in the case that the message was an iterator.

Type:

list

worker

Worker communicator that should be used to send worker messages in the case that the original message had to be split.

Type:

CommBase

worker_messages

Messages that should be sent via the worker comm comm as the original message had to be split due to its size.

Type:

list

sent

True if the message has been sent, False otherwise.

Type:

bool

singular

True if there was only one argument.

Type:

bool

add_message(*args, **kwargs)[source]

Add a message to the list of additional messages that should be sent following this one.

Parameters:
  • *args – Arguments are passed to the CommMessage constructor.

  • *kwargs – Keyword arguments are passed to the CommMessage constructor.

add_worker_message(*args, **kwargs)[source]

Add a message to the list of messages that should be sent via work comm following this one.

Parameters:
  • *args – Arguments are passed to the CommMessage constructor.

  • *kwargs – Keyword arguments are passed to the CommMessage constructor.

additional_messages
apply_function(x)[source]

Apply a function to the message.

Parameters:

x (function) – Function to apply.

args
finalized
flag
header
length
msg
send_worker_messages(**kwargs)[source]

Send the worker messages via the worker comm.

Parameters:

**kwargs – Keyword arguments are passed to the send_message method of the worker comm for each message.

Returns:

Success of the send operations.

Return type:

bool

sent
sinfo
singular
stype
property tuple_args

Form that arguments were originally supplied.

Type:

tuple

worker
worker_messages
class yggdrasil.communication.CommBase.CommServer(*args, **kwargs)[source]

Bases: YggTaskLoop

Basic server object to keep track of clients.

cli_count

Number of clients that have connected to this server.

Type:

int

add_client()[source]

Increment the client count.

remove_client()[source]

Decrement the client count, closing the server if all clients done.

class yggdrasil.communication.CommBase.CommTaskLoop(*args, **kwargs)[source]

Bases: YggTaskLoop

Task loop for comms to ensure cleanup.

Parameters:
  • ( (comm) – class:.CommBase): Comm class that thread is for.

  • name (str, optional) – Name for the thread. If not provided, one is created by combining the comm name and the provided suffix.

  • suffix (str, optional) – Suffix that should be added to comm name to name the thread. Defaults to ‘CommTask’.

  • **kwargs – Additional keyword arguments are passed to the parent class.

comm (

class:.CommBase): Comm class that thread is for.

on_main_terminated()[source]

Actions taken on the backlog thread when the main thread stops.

exception yggdrasil.communication.CommBase.IncompleteBaseComm[source]

Bases: Exception

An exception class for methods that are incomplete for base classes.

exception yggdrasil.communication.CommBase.NeverMatch[source]

Bases: Exception

An exception class that is never raised by any code anywhere

yggdrasil.communication.CommBase.cleanup_comms(commtype, close_func=None)[source]

Clean up comms of a certain type.

Parameters:

commtype (str) – Comm class that should be cleaned up.

Returns:

Number of comms closed.

Return type:

int

yggdrasil.communication.CommBase.get_comm_registry(commtype)[source]

Get the comm registry for a comm class.

Parameters:

commtype (str) – Comm class to get registry for.

Returns:

Dictionary of registered comm objects.

Return type:

dict

yggdrasil.communication.CommBase.is_registered(commtype, key)[source]

Determine if a comm object has been registered under the specified key.

Parameters:
  • commtype (str) – Comm class to check for the key under.

  • key (str) – Key that should be checked.

yggdrasil.communication.CommBase.register_comm(commtype, key, value)[source]

Add a comm object to the global registry.

Parameters:
  • commtype (str) – Comm class to register the object under.

  • key (str) – Key that should be used to register the object.

  • value (obj) – Object being registered.

yggdrasil.communication.CommBase.unregister_comm(commtype, key, dont_close=False)[source]

Remove a comm object from the global registry and close it.

Parameters:
  • commtype (str) – Comm class to check for key under.

  • key (str) – Key for object that should be removed from the registry.

  • dont_close (bool, optional) – If True, the comm will be removed from the registry, but it won’t be closed. Defaults to False.

Returns:

True if an object was closed.

Return type:

bool

yggdrasil.communication.DedicatedFileBase module

class yggdrasil.communication.DedicatedFileBase.DedicatedFileBase(*args, **kwargs)[source]

Bases: FileComm

Base class for handling I/O via a dedicated library.

property concats_as_str

True if concatenating file contents result in a valid file.

Type:

bool

deserialize(msg, **kwargs)[source]

Don’t deserialize for dedicated comms since using a serializer is inefficient.

file_flush()[source]

Flush the file.

file_seek(pos, whence=0)[source]

Move in the file to the specified position.

Parameters:
  • pos (int) – Position (in bytes) to move file to.

  • whence (int, optional) – Flag indicating position that pos is relative to. 0 for the beginning of the file, 1 for from the current location, and 2 from the end of the file.

property file_size

Current size of file.

Type:

int

file_tell()[source]

int: Current position in the file.

classmethod get_testing_options(**kwargs)[source]

Method to return a dictionary of testing options for this class.

Returns:

Dictionary of variables to use for testing. Items:
kwargs (dict): Keyword arguments for comms tested with

the provided content.

send (list): List of objects to send to test file. recv (list): List of objects that will be received from a

test file that was sent the messages in ‘send’.

contents (bytes): Bytes contents of test file created by

sending the messages in ‘send’.

Return type:

dict

property is_open

True if the connection is open.

Type:

bool

property remaining_bytes

Remaining bytes in the file.

Type:

int

property requires_refresh

True if a refresh is necessary.

Type:

bool

serialize(obj, **kwargs)[source]

Don’t serialize for dedicated comms since using a serializer is inefficient.

yggdrasil.communication.DefaultComm module

class yggdrasil.communication.DefaultComm.DefaultComm(*args, **kwargs)[source]

Bases: CommBase

Simple wrapper for default class that allows it to be registered.

yggdrasil.communication.ExcelFileComm module

class yggdrasil.communication.ExcelFileComm.ExcelFileComm(*args, **kwargs)[source]

Bases: DedicatedFileBase

Class for handling I/O from/to an Excel file.

advance_in_series(series_index=None)[source]

Advance to a certain file in a series.

Parameters:

series_index (int, optional) – Index of file in the series that should be moved to. Defaults to None and call will advance to the next file in the series.

Returns:

True if the file was advanced in the series, False otherwise.

Return type:

bool

concats_as_str = True
property current_sheet

Current sheet name.

Type:

str

file_seek(*args, **kwargs)[source]

Move in the file to the specified position.

property file_size

Current size of file.

Type:

int

classmethod get_testing_options(sheets=None, columns=None, **kwargs)[source]

Method to return a dictionary of testing options for this class.

Parameters:
  • sheets (list, optional) – Sheet names that should be specified.

  • columns (list, optional) – Column names that should be specified.

Returns:

Dictionary of variables to use for testing. Items:
kwargs (dict): Keyword arguments for comms tested with

the provided content.

send (list): List of objects to send to test file. recv (list): List of objects that will be received from a

test file that was sent the messages in ‘send’.

contents (bytes): Bytes contents of test file created by

sending the messages in ‘send’.

Return type:

dict

classmethod is_installed(language=None)[source]

Determine if the necessary libraries are installed for this communication class.

Parameters:

language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked. If set to ‘any’, the result will be True if this comm is installed for any of the supported languages.

Returns:

Is the comm installed.

Return type:

bool

no_serialization = True
pop_current_sheet()[source]

Remove the current sheet from the list of remaining sheets.

read_header()[source]

Read header lines from the file and update serializer info.

write_header()[source]

Write header lines to the file based on the serializer info.

yggdrasil.communication.FileComm module

class yggdrasil.communication.FileComm.FileComm(*args, **kwargs)[source]

Bases: CommBase

Class for handling I/O from/to a file on disk.

>>> x = FileComm('test_send', address='test_file.txt', direction='send')
>>> x.send('Test message')
True
>>> with open('test_file.txt', 'r') as fd:
...     print(fd.read())
Test message
>>> x = FileComm('test_recv', address='test_file.txt', direction='recv')
>>> x.recv()
(True, b'Test message')
Parameters:
  • name (str) – The environment variable where communication address is stored.

  • read_meth (str, optional) – Method that should be used to read data from the file. Defaults to ‘read’. Ignored if direction is ‘send’.

  • append (bool, optional) – If True and writing, file is openned in append mode. If True and reading, file is kept open even if the end of the file is reached to allow for another process to write to the file in append mode. Defaults to False.

  • in_temp (bool, optional) – If True, the path will be considered relative to the platform temporary directory. Defaults to False.

  • is_series (bool, optional) – If True, input/output will be done to a series of files. If reading, each file will be processed until the end is reached. If writing, each output will be to a new file in the series. The addressed is assumed to contain a format for the index of the file. Defaults to False.

  • count (int, optional) – When reading a file, read the file this many of times. Defaults to 0.

  • wait_for_creation (float, optional) – Time (in seconds) that should be waited before opening for the file to be created if it dosn’t exist. Defaults to 0 s and file will attempt to be opened immediately.

  • **kwargs – Additional keywords arguments are passed to parent class.

fd

File that should be read/written.

Type:

file

read_meth

Method that should be used to read data from the file.

Type:

str

append

If True and writing, file is openned in append mode.

Type:

bool

in_temp

If True, the path will be considered relative to the platform temporary directory.

Type:

bool

is_series

If True, input/output will be done to a series of files. If reading, each file will be processed until the end is reached. If writing, each output will be to a new file in the series.

Type:

bool

platform_newline

String indicating a newline on the current platform.

Type:

str

Raises:

ValueError – If the read_meth is not one of the supported values.

advance_in_file(file_pos)[source]

Advance to a certain position in the current file.

Parameters:

file_pos (int) – Position that should be moved to in the current. file.

advance_in_series(series_index=None)[source]

Advance to a certain file in a series.

Parameters:

series_index (int, optional) – Index of file in the series that should be moved to. Defaults to None and call will advance to the next file in the series.

Returns:

True if the file was advanced in the series, False otherwise.

Return type:

bool

static before_registration(cls)[source]

Operations that should be performed to modify class attributes prior to registration.

change_position(file_pos, series_index=None, header_was_read=None, header_was_written=None)[source]

Change the position in the file/series.

Parameters:
  • file_pos (int) – Position that should be moved to in the file.

  • series_index (int, optinal) – Index of the file in the series that should be moved to. Defaults to None and will be set to the current series index.

  • header_was_read (bool, optional) – Status of if header has been read or not. Defaults to None and will be set to the current value.

  • header_was_written (bool, optional) – Status of if header has been written or not. Defaults to None and will be set to the current value.

classmethod close_registry_entry(value)[source]

Close a registry entry.

property concats_as_str

True if concatenating file contents result in a valid file.

Type:

bool

property current_address

Address of file currently being used.

Type:

str

disable_header()[source]

Turn off header so that it will not be written.

dump(obj, **kwargs)[source]

Serialize to a file.

Parameters:

**kwargs – Keyword arguments are passed to the send call.

Raises:

SerializationError – If the send call fails.

enable_header()[source]

Turn on header so that it will be written.

property fd

Associated file identifier.

file_flush()[source]

Flush the file.

file_seek(pos, whence=0)[source]

Move in the file to the specified position.

Parameters:
  • pos (int) – Position (in bytes) to move file to.

  • whence (int, optional) – Flag indicating position that pos is relative to. 0 for the beginning of the file, 1 for from the current location, and 2 from the end of the file.

property file_size

Current size of file.

Type:

int

file_tell()[source]

int: Current position in the file.

get_series_address(index=None)[source]

Get the address of a file in the series.

Parameters:

index (int, optional) – Index in series to get address for. Defaults to None and the current index is used.

Returns:

Address for the file in the series.

Return type:

str

classmethod get_test_contents(data, **kwargs)[source]

Method for returning the serialized form of a set of test data.

Parameters:

data (list) – List of test data objects to serialize.

Returns:

Serialized test data.

Return type:

bytes

classmethod get_testing_options(read_meth=None, serializer=None, **kwargs)[source]

Method to return a dictionary of testing options for this class.

Parameters:
  • read_meth (str, optional) – Read method that will be used by the test class. Defaults to None and will be set by the serialier.

  • **kwargs – Additional keyword arguments are passed to the parent class’s method and the serializers methods for determining the default read_meth and concatenating the sent objects into the objects that are expected to be received.

Returns:

Dictionary of variables to use for testing. Key/value pairs:
kwargs (dict): Keyword arguments for comms tested with the

provided content.

send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test

file that was sent the messages in ‘send’.

contents (bytes): Bytes contents of test file created by sending

the messages in ‘send’.

Return type:

dict

is_file = True
classmethod is_installed(language=None)[source]

Determine if the necessary libraries are installed for this communication class.

Parameters:

language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked. If set to ‘any’, the result will be True if this comm is installed for any of the supported languages.

Returns:

Is the comm installed.

Return type:

bool

property is_open

True if the connection is open.

Type:

bool

load(return_message_object=False, **kwargs)[source]

Deserialize all contents from a file.

Parameters:

**kwargs – Keyword arguments are passed to recv calls.

Returns:

The deserialized data object or a list of

deserialized data objects if there is more than one.

Return type:

object

Raises:

SerializationError – If the first recv call fails.

property n_msg_recv

The number of messages in the file.

Type:

int

classmethod new_comm_kwargs(*args, **kwargs)[source]

Initialize communication with new queue.

open()[source]

Open the file.

property open_mode

Mode that should be used to open the file.

Type:

str

opp_comm_kwargs(for_yaml=False)[source]

Get keyword arguments to initialize communication with opposite comm object.

Parameters:

for_yaml (bool, optional) – If True, the returned dict will only contain values that can be specified in a YAML file. Defaults to False.

Returns:

Keyword arguments for opposite comm object.

Return type:

dict

prepare_message(*args, **kwargs)[source]

Perform actions preparing to send a message.

Parameters:
  • *args – Components of the outgoing message.

  • **kwargs – Keyword arguments are passed to the parent class’s method.

Returns:

Serialized and annotated message.

Return type:

CommMessage

purge()[source]

Purge all messages from the comm.

read_header()[source]

Read header lines from the file and update serializer info.

record_position()[source]

Record the current position in the file/series.

property registry_key

String used to register the socket.

Type:

str

property remaining_bytes

Remaining bytes in the file.

Type:

int

classmethod remove_companion_files(address)[source]

Remove companion files that are created when writing to a file

Parameters:

address (str) – Address for the filename.

remove_file()[source]

Remove the file.

reset_position(truncate=False)[source]

Move to the front of the file and allow header to be read again.

Parameters:

truncate (bool, optional) – If True, the file will be truncated after moving to the beginning, effectively erasing the file. Defaults to False.

serialize(obj, **kwargs)[source]

Serialize a message using the associated serializer.

series_file_size(fname)[source]

int: Size of file in series.

classmethod underlying_comm_class()[source]

str: Name of underlying communication class.

write_header()[source]

Write header lines to the file based on the serializer info.

yggdrasil.communication.FileComm.convert_file(src, dst, src_type=None, dst_type=None, src_kwargs=None, dst_kwargs=None, transform=None)[source]

Convert from one file type to another.

Parameters:
  • src (str) – Path to source file to convert.

  • dst (str) – Path to destination file that should be created.

  • src_type (str, dict, optional) – Name of source file type. If not provided, an attempt will be made to identify the file type from the extension.

  • dst_type (str, dict, optional) – Name of destination file type. If not provided, an attempt will be made to identify the file type from the extension.

  • transform (dict, optional) – Transform parameters for transforming messages between the soruce and destination file.

Raises:
  • IOError – If the source file does not exist.

  • IOError – If the destination file exists.

yggdrasil.communication.FileComm.is_file_like(x)[source]

Check if an object is file-like via duck typing.

Parameters:

x (object) – Object to check.

Returns:

True if file-like, False otherwise.

Return type:

bool

yggdrasil.communication.ForkComm module

class yggdrasil.communication.ForkComm.ForkComm(*args, **kwargs)[source]

Bases: CommBase

Class for receiving/sending messages from/to multiple comms.

Parameters:
  • name (str) – The environment variable where communication address is stored.

  • comm_list (list, optional) – The list of options for the comms that should be bundled. If not provided, the bundle will be empty.

  • pattern (str, optional) –

    The communication pattern that should be used to handle outgoing/incoming messages. Options include:

    ’cycle’: Receive from or send to the next available comm in

    the list (default for receiving comms).

    ’broadcast’: [SEND ONLY] Send the same message to each comm

    (default for sending comms).

    ’scatter’: [SEND ONLY] Send part of message (must be a list)

    to each comm.

    ’gather’: [RECV ONLY] Receive lists of messages from each

    comm where a message is only returned when there is a message from each.

  • **kwargs – Additional keyword arguments are passed to the parent class.

comm_list

Comms included in this fork.

Type:

list

curr_comm_index

Index comm that next receive will be from.

Type:

int

property any_files

True if the comm interfaces with any files.

Type:

bool

bind()[source]

Bind in place of open.

child_keys = ['serializer_class', 'serializer_kwargs', 'format_str', 'field_names', 'field_units', 'as_array', 'partner_copies']
close(*args, **kwargs)[source]

Close the connection.

close_in_thread(*args, **kwargs)[source]

In a new thread, close the comm when it is empty.

coerce_to_dict(msg, key_order, metadata)[source]

Convert a message to a dictionary.

Parameters:
  • msg (object) – Message to convert to a dictionary.

  • key_order (list) – Key order to use for the output dictionary.

  • metadata (dict) – Header data to accompany the message.

Returns:

Converted message.

Return type:

dict

confirm_recv(noblock=False)[source]

Confirm that message was received.

confirm_send(noblock=False)[source]

Confirm that sent message was received.

property curr_comm

Current comm.

Type:

CommBase

disconnect()[source]

Disconnect attributes that are aliases.

drain_server_signon_messages(**kwargs)[source]

Drain server signon messages. This should only be used for testing purposes.

property empty_obj_recv

Empty message object.

Type:

obj

finalize_message(msg, **kwargs)[source]

Perform actions to decipher a message.

Parameters:
  • msg (CommMessage) – Initial message object to be finalized.

  • **kwargs – Keyword arguments are passed to the forked comm’s finalize_message method.

Returns:

Deserialized and annotated message.

Return type:

CommMessage

property get_response_comm_kwargs

Keyword arguments to use for a response comm.

Type:

dict

get_status_message(**kwargs)[source]

Return lines composing a status message.

Parameters:

**kwargs – Keyword arguments are passed on to the parent class’s method.

Returns:

Lines composing the status message and the

prefix string used for the last message.

Return type:

tuple(list, prefix)

property is_confirmed_recv

True if all received messages have been confirmed.

Type:

bool

property is_confirmed_send

True if all sent messages have been confirmed.

Type:

bool

property is_open

True if the connection is open.

Type:

bool

property last_comm

Last comm that was used.

Type:

CommBase

property maxMsgSize

Maximum size of a single message that should be sent.

Type:

int

property model_env

Mapping between model name and opposite comm environment variables that need to be provided to the model.

Type:

dict

property n_msg_direct

Number of messages currently being routed.

Type:

int

property n_msg_direct_recv

Number of messages currently being routed in recv.

Type:

int

property n_msg_direct_send

Number of messages currently being routed in send.

Type:

int

property n_msg_recv

The number of incoming messages in the connection.

Type:

int

property n_msg_recv_drain

The number of incoming messages in the connection to drain.

Type:

int

property n_msg_send

The number of outgoing messages in the connection.

Type:

int

property n_msg_send_drain

The number of outgoing messages in the connection to drain.

Type:

int

classmethod new_comm_kwargs(name, *args, **kwargs)[source]

Get keyword arguments for new comm.

noprop_keys = ['send_converter', 'recv_converter', 'filter', 'transform']
open()[source]

Open the connection.

opp_comm_kwargs(for_yaml=False)[source]

Get keyword arguments to initialize communication with opposite comm object.

Parameters:

for_yaml (bool, optional) – If True, the returned dict will only contain values that can be specified in a YAML file. Defaults to False.

Returns:

Keyword arguments for opposite comm object.

Return type:

dict

property opp_comms

Name/address pairs for opposite comms.

Type:

dict

prepare_message(*args, **kwargs)[source]

Perform actions preparing to send a message.

Parameters:
  • *args – Components of the outgoing message.

  • **kwargs – Additional keyword arguments are passed to the prepare_message methods for the forked comms.

Returns:

Serialized and annotated message.

Return type:

CommMessage

purge()[source]

Purge all messages from the comm.

recv_message(*args, **kwargs)[source]

Receive a message.

Parameters:
  • *args – Arguments are passed to the forked comm’s recv_message method.

  • **kwargs – Keyword arguments are passed to the forked comm’s recv_message method.

Returns:

Received message.

Return type:

CommMessage

send_message(msg, **kwargs)[source]

Send a message encapsulated in a CommMessage object.

Parameters:
  • msg (CommMessage) – Message to be sent.

  • **kwargs – Additional keyword arguments are passed to _safe_send.

Returns:

Success or failure of send.

Return type:

bool

property suppress_special_debug
update_serializer_from_message(msg)[source]

Update the serializer based on information stored in a message.

Parameters:

msg (CommMessage) – Outgoing message.

class yggdrasil.communication.ForkComm.ForkedCommMessage(msg, comm_list, pattern='broadcast', **kwargs)[source]

Bases: CommMessage

Class for forked comm messages.

Parameters:
  • msg (CommBase.CommMessage) – Message being distributed.

  • comm_list (list) – List of communicators that the message is being distributed to.

  • **kwargs – Additional keyword arguments are passed to the ‘prepare_message’ method for each communicator.

orig
yggdrasil.communication.ForkComm.get_comm_name(name, i)[source]

Get the name of the ith comm in the series.

Parameters:
  • name (str) – Name of the fork comm.

  • i (int) – Index of comm in fork bundle.

Returns:

Name of ith comm in fork bundle.

Return type:

str

yggdrasil.communication.IPCComm module

class yggdrasil.communication.IPCComm.IPCComm(*args, **kwargs)[source]

Bases: CommBase

Class for handling I/O via IPC message queues.

q

Message queue.

Type:

sysv_ipc.MessageQueue

Developer Notes:

The default size limit for IPC message queues is 2048 bytes on Mac operating systems so it is important that implementation of this communication mechanism properly split and send messages larger than this limit as more than one message.

address_description = 'An IPC message queue key.'
atexit()[source]

Close operations.

bind()[source]

Bind to random queue if address is generate.

classmethod close_registry_entry(value)[source]

Close a registry entry.

confirm_recv(noblock=False)[source]

Confirm that message was received.

confirm_send(noblock=False)[source]

Confirm that sent message was received.

property is_open

True if the queue is not None.

Type:

bool

property n_msg_recv

Number of messages in the queue to recv.

Type:

int

property n_msg_send

Number of messages in the queue to send.

Type:

int

classmethod new_comm_kwargs(*args, **kwargs)[source]

Initialize communication with new queue.

open()[source]

Open the queue.

open_after_bind()[source]

Open the connection by getting the queue from the bound address.

purge()[source]

Purge all messages from the comm.

class yggdrasil.communication.IPCComm.IPCServer(*args, **kwargs)[source]

Bases: CommServer

IPC server object for cleaning up server queue.

terminate(*args, **kwargs)[source]

Also set break flag.

yggdrasil.communication.IPCComm.get_queue(qid=None)[source]

Create or return a sysv_ipc.MessageQueue and register it.

Parameters:

qid (int, optional) – If provided, ID for existing queue that should be returned. Defaults to None and a new queue is returned.

Returns:

Message queue.

Return type:

sysv_ipc.MessageQueue

yggdrasil.communication.IPCComm.ipc_queues(by_id=False)[source]

Get a list of active IPC queues.

Returns:

List of IPC queues.

Return type:

list

yggdrasil.communication.IPCComm.ipcrm(options=[])[source]

Remove IPC constructs using the ipcrm command.

Parameters:

options (list) – List of flags that should be used. Defaults to an empty list.

yggdrasil.communication.IPCComm.ipcrm_queues(queue_keys=None, by_id=False)[source]

Delete existing IPC queues.

Parameters:

queue_keys (list, str, optional) – A list of keys for queues that should be removed. Defaults to all existing queues.

yggdrasil.communication.IPCComm.ipcs(options=[])[source]

Get the output from running the ipcs command.

Parameters:

options (list) – List of flags that should be used. Defaults to an empty list.

Returns:

Captured output.

Return type:

str

yggdrasil.communication.IPCComm.remove_queue(mq)[source]

Remove a sysv_ipc.MessageQueue and unregister it.

Parameters:

mq (sysv_ipc.MessageQueue) –

Raises:

KeyError – If the provided queue is not registered.

yggdrasil.communication.ImageFileBase module

class yggdrasil.communication.ImageFileBase.BMPFileComm(*args, **kwargs)

Bases: PILFileBase

class yggdrasil.communication.ImageFileBase.EPSFileComm(*args, **kwargs)

Bases: PILFileBase

class yggdrasil.communication.ImageFileBase.GIFFileComm(*args, **kwargs)

Bases: PILFileBase

class yggdrasil.communication.ImageFileBase.JPEGFileComm(*args, **kwargs)

Bases: PILFileBase

class yggdrasil.communication.ImageFileBase.PILFileBase(*args, **kwargs)[source]

Bases: DedicatedFileBase

Class for handling I/O from/to a JPEG file.

static before_registration(cls)[source]

Operations that should be performed to modify class attributes prior to registration.

classmethod get_testing_options(**kwargs)[source]

Method to return a dictionary of testing options for this class.

Returns:

Dictionary of variables to use for testing. Items:
kwargs (dict): Keyword arguments for comms tested with

the provided content.

send (list): List of objects to send to test file. recv (list): List of objects that will be received from a

test file that was sent the messages in ‘send’.

contents (bytes): Bytes contents of test file created by

sending the messages in ‘send’.

Return type:

dict

classmethod is_installed(language=None)[source]

Determine if the necessary libraries are installed for this communication class.

Parameters:

language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked. If set to ‘any’, the result will be True if this comm is installed for any of the supported languages.

Returns:

Is the comm installed.

Return type:

bool

class yggdrasil.communication.ImageFileBase.PNGFileComm(*args, **kwargs)

Bases: PILFileBase

class yggdrasil.communication.ImageFileBase.TIFFFileComm(*args, **kwargs)

Bases: PILFileBase

yggdrasil.communication.JSONFileComm module

class yggdrasil.communication.JSONFileComm.JSONFileComm(*args, **kwargs)[source]

Bases: FileComm

Class for handling I/O from/to a JSON file on disk.

Parameters:
  • name (str) – The environment variable where file path is stored.

  • **kwargs – Additional keywords arguments are passed to parent class.

yggdrasil.communication.MPIComm module

exception yggdrasil.communication.MPIComm.MPIClosedError[source]

Bases: BaseException

Exception to raise when the MPI connection has been closed.

class yggdrasil.communication.MPIComm.MPIComm(*args, **kwargs)[source]

Bases: CommBase

Class for handling I/O via MPI communicators.

Parameters:
  • tag_start (int, optional) – Tag that MPI messages should start with. Defaults to 0.

  • tag_stride (int, optional) – Amount that tag should be advanced after each message to avoid conflicts w/ other MPIComm communicators. Defaults to 1.

  • partner_mpi_ranks (list, optional) – Rank of MPI processes that partner models are running on. Defaults to None.

tag

Tag that should be used for the next MPI message.

Type:

int

tag_stride

Amount that tag should be advanced after each each message to avoid conflicts w/ other MPIComm communicators.

Type:

int

add_request(on_empty=False, **kwargs)[source]

Add a request to the queue.

address_description = 'The partner communicator ID(s).'
advance_tag(request)[source]

Advance to the next tag.

Parameters:

request (MPIRequest, MPIMultiRequest) – Request advancing the tag.

bind()[source]

Bind to random queue if address is generate.

cache_tag(request)[source]

Store a tag for an uncompleted request.

Parameters:

request (MPIRequest, MPIMultiRequest) – Request to cache.

cancel_requests()[source]

Cancel requests that have not yet been completed.

confirm_recv(noblock=False)[source]

Confirm that message was received.

confirm_send(noblock=False)[source]

Confirm that sent message was received.

property create_work_comm_kwargs

Keyword arguments for a new work comm.

Type:

dict

classmethod format_address(ranks, tag_start, tag_stride)[source]

Format an MPI address.

Parameters:
  • ranks (tuple) – Ranks of the partner MPI processes.

  • tag_start (int) – Tag that the comm starts at.

  • tag_stride (int) – Tag that the comm advances by.

Returns:

Formatted address.

Return type:

str

property get_response_comm_kwargs

Keyword arguments to use for a response comm.

Type:

dict

get_tag(rank=None)[source]

Get the next tag for a rank.

Parameters:

rank (int) – Rank to get tag for.

Returns:

Tag that should be used next for the rank.

Return type:

int

property is_open

True if the queue is not None.

Type:

bool

property model_env

Mapping between model name and opposite comm environment variables that need to be provided to the model.

Type:

dict

property n_msg_recv

Number of messages in the queue to recv.

Type:

int

property n_msg_send

Number of messages in the queue to send.

Type:

int

next_rank()[source]

Get the rank that should be used next.

open()[source]

Open the queue.

property opp_address

Address for opposite comm.

Type:

str

classmethod parse_address(address)[source]

Parse an MPI address for information about the partner process ranks and how the tags should be iterated.

Parameters:

address (str) – Address to parse.

Returns:

The ranks, starting tag, and tag stride contained in the

address.

Return type:

tuple

purge()[source]

Purge all messages from the comm.

recv_message(*args, **kwargs)[source]

Receive a message.

Parameters:
  • *args – Arguments are passed to the forked comm’s recv_message method.

  • **kwargs – Keyword arguments are passed to the forked comm’s recv_message method.

Returns:

Received message.

Return type:

CommMessage

send_message(msg, **kwargs)[source]

Send a message encapsulated in a CommMessage object.

Parameters:
  • msg (CommMessage) – Message to be sent.

  • **kwargs – Additional keyword arguments are passed to _safe_send.

Returns:

Success or failure of send.

Return type:

bool

property tag

Tag for the next message.

Type:

int

class yggdrasil.communication.MPIComm.MPIMultiRequest(*args, **kwargs)[source]

Bases: MPIRequest

Container for MPI request for multiple partner comms.

cancel()[source]

Cancel a request.

property complete

True if the request has been completed, False otherwise.

Type:

bool

make_request(previous_requests=None)[source]

Complete a request.

remainder
class yggdrasil.communication.MPIComm.MPIRequest(comm, direction, address, tag, **kwargs)[source]

Bases: object

Container for MPI request.

address
cancel()[source]

Cancel a request.

comm
property complete

True if the request has been completed, False otherwise.

Type:

bool

property data

Data returned by a request.

Type:

object

direction
make_request(payload=None)[source]

Complete a request.

req
size
size_req
tag

yggdrasil.communication.MatFileComm module

class yggdrasil.communication.MatFileComm.MatFileComm(*args, **kwargs)[source]

Bases: FileComm

Class for handling I/O from/to a Matlab .mat file on disk.

Parameters:
  • name (str) – The environment variable where file path is stored.

  • **kwargs – Additional keywords arguments are passed to parent class.

yggdrasil.communication.NetCDFFileComm module

class yggdrasil.communication.NetCDFFileComm.NetCDFFileComm(*args, **kwargs)[source]

Bases: DedicatedFileBase

Class for handling I/O from/to an netCDF file.

Parameters:
  • read_attributes (bool, optional) – If True, the attributes are read in as well as the variables. Defaults to False.

  • variables (list, optional) – List of variables to read in. If not provided, all variables will be read.

  • version (int, optional) – Version of netCDF format that should be used. Defaults to 1. Options are 1 (classic format) and 2 (64-bit offset format).

  • **kwargs – Additional keywords arguments are passed to parent class.

property fd

Associated file identifier.

classmethod get_testing_options(**kwargs)[source]

Method to return a dictionary of testing options for this class.

Returns:

Dictionary of variables to use for testing. Items:
kwargs (dict): Keyword arguments for comms tested with

the provided content.

send (list): List of objects to send to test file. recv (list): List of objects that will be received from a

test file that was sent the messages in ‘send’.

contents (bytes): Bytes contents of test file created by

sending the messages in ‘send’.

Return type:

dict

transform_type_recv(x)[source]
transform_type_send(x)[source]

yggdrasil.communication.ObjFileComm module

class yggdrasil.communication.ObjFileComm.ObjFileComm(*args, **kwargs)[source]

Bases: PlyFileComm

Class for handling I/O from/to a .obj file on disk.

Parameters:
  • name (str) – The environment variable where communication address is stored.

  • **kwargs – Additional keywords arguments are passed to parent class.

yggdrasil.communication.PandasFileComm module

class yggdrasil.communication.PandasFileComm.PandasFileComm(*args, **kwargs)[source]

Bases: AsciiTableComm

Class for handling I/O from/to a pandas csv file on disk.

Parameters:
  • name (str) – The environment variable where communication address is stored.

  • delimiter (str, optional) – String that should be used to separate columns. Defaults to ‘t’.

  • **kwargs – Additional keywords arguments are passed to parent class.

yggdrasil.communication.PickleFileComm module

class yggdrasil.communication.PickleFileComm.PickleFileComm(*args, **kwargs)[source]

Bases: FileComm

Class for handling I/O from/to a pickled file on disk.

Parameters:
  • name (str) – The environment variable where file path is stored.

  • **kwargs – Additional keywords arguments are passed to parent class.

yggdrasil.communication.PlyFileComm module

class yggdrasil.communication.PlyFileComm.PlyFileComm(*args, **kwargs)[source]

Bases: FileComm

Class for handling I/O from/to a .ply file on disk.

Parameters:
  • name (str) – The environment variable where communication address is stored.

  • **kwargs – Additional keywords arguments are passed to parent class.

yggdrasil.communication.RESTComm module

class yggdrasil.communication.RESTComm.RESTComm(*args, **kwargs)[source]

Bases: CommBase

Class for handling I/O via a RESTful API. The provided address should be an HTTP address to a server running a flask app that has been equipped to respond to send/receive calls via the add_comm_server_to_app method.

Parameters:
  • params (dict, optional) – Parameters that should be passed via URL. Defaults to None and is ignored.

  • cookies (dict, optional) – Cookies to send to the server. Defaults to None and is ignored.

  • **kwargs – Additional keyword arguments will be passed to the base class.

atexit()[source]

Close operations.

bind(*args, **kwargs)[source]

Bind to address based on information provided.

property is_open

True if the connection is open.

Type:

bool

property n_msg_recv

The number of incoming messages in the connection.

Type:

int

property n_msg_send

The number of outgoing messages in the connection.

Type:

int

open(*args, **kwargs)[source]

Open the connection.

opp_comm_kwargs(for_yaml=False)[source]

Get keyword arguments to initialize communication with opposite comm object.

Parameters:

for_yaml (bool, optional) – If True, the returned dict will only contain values that can be specified in a YAML file. Defaults to False.

Returns:

Keyword arguments for opposite comm object.

Return type:

dict

purge()[source]

Purge all messages from the comm.

yggdrasil.communication.RESTComm.add_comm_server_to_app(app)[source]

Add methods for handling send/receive calls server-side.

Parameters:

app (flask.Flask) – Flask app to add methods to.

yggdrasil.communication.RMQAsyncComm module

class yggdrasil.communication.RMQAsyncComm.RMQAsyncComm(*args, **kwargs)[source]

Bases: RMQComm

Class for handling asynchronous RabbitMQ communications. It is not recommended to use this class as it can leave hanging threads if not closed propertly. The normal RMQComm will cover most use cases.

Parameters:
  • name (str) – The environment variable where the comm address is stored.

  • dont_open (bool, optional) – If True, the connection will not be opened. Defaults to False.

  • **kwargs – Additional keyword arguments are passed to CommBase.

times_connected

Number of times that this connections has been established.

Type:

int

rmq_thread

Thread used to run IO loop.

Type:

multitasking.YggTask

add_on_cancel_callback()[source]
add_on_channel_close_callback()[source]

This method tells pika to call the on_channel_closed method if RabbitMQ unexpectedly closes the channel.

atexit()[source]

Close operations.

bind()[source]

Declare queue to get random new queue.

check_for_close()[source]

bool: Check if close has been called from the main process.

close_channel()[source]

Close the channel if it exists.

close_connection(*args, **kwargs)[source]

Close the connection.

connect()[source]

Establish the connection.

enable_delivery_confirmations()[source]

Turn on delivery confirmations.

property is_open

True if the connection and channel are open.

Type:

bool

property n_msg_recv

Number of messages in the queue.

Type:

int

property n_msg_send

Number of messages in the queue.

Type:

int

new_run_thread(name=None)[source]

Get a new thread for running.

on_basic_qos_ok(unused_frame)[source]

Actions to perform one the qos is set.

on_bindok(unused_frame, userdata)[source]

Actions to perform once the queue is succesfully bound. Start consuming messages.

on_cancelok(unused_frame, userdata)[source]

Actions to perform after succesfully cancelling consumption. Closes the channel.

on_channel_closed(channel, reason)[source]

Actions to perform when the channel is closed. Close the connection.

on_channel_closeok(_unused_frame)[source]
on_channel_open(channel)[source]

Actions to perform after a channel is opened. Add the channel close callback and setup the exchange.

on_connection_closed(connection, reason)[source]

Actions that must be taken when the connection is closed. Set the channel to None. If the connection is meant to be closing, stop the IO loop. Otherwise, wait 5 seconds and try to reconnect.

on_connection_open(connection)[source]

Actions that must be taken when the connection is opened. Add the close connection callback and open the RabbitMQ channel.

on_connection_open_error(unused_connection, err)[source]

Actions that must be taken when the connection fails to open.

on_consumer_cancelled(method_frame)[source]
on_delivery_confirmation(method_frame)[source]

Actions performed when a sent message is confirmed.

on_exchange_declareok(unused_frame, userdata)[source]

Actions to perform once an exchange is succesfully declared. Set up the queue.

on_message(_unused_channel, basic_deliver, properties, body)[source]

Buffer received messages.

on_queue_declareok(method_frame, userdata)[source]

Actions to perform once the queue is succesfully declared. Bind the queue.

open_channel()[source]

Open a RabbitMQ channel.

publish_message()[source]

Publish the next message in the list.

reconnect()[source]

Try to re-establish a connection and resume a new IO loop.

reset_for_reconnection()[source]

Reset variables in preparation for reconnection.

property rmq_lock

Lock associated with RMQ ioloop thread.

run_thread()[source]

Connect to the connection and begin the IO loop.

set_qos()[source]
setup_exchange(exchange_name)[source]

Setup the exchange.

setup_queue(queue_name)[source]

Set up the message queue.

start_consuming()[source]
start_publishing()[source]

Enable confirmations and begin sending messages.

start_run_thread()[source]

Start the run thread and wait for it to finish.

stop(call_on_thread=False)[source]

Stop the ioloop.

stop_consuming()[source]
class yggdrasil.communication.RMQAsyncComm.RMQTaskLoop(*args, **kwargs)[source]

Bases: YggTaskLoop

Task loop for RMQ consumer.

atexit()[source]

Actions performed when python exits.

yggdrasil.communication.RMQComm module

class yggdrasil.communication.RMQComm.RMQComm(*args, **kwargs)[source]

Bases: CommBase

Class for handling basic RabbitMQ communications.

connection

RabbitMQ connection.

Type:

pika.Connection

channel

RabbitMQ channel.

Type:

pika.Channel

Raises:

RuntimeError – If a connection cannot be established.

Developer Notes:

It is not advised that new language implement a RabbitMQ communication interface. Rather RMQ communication is included explicitly for connections between models that are not co-located on the same machine and are used by the yggdrasil framework connections on the Python side.

address_description = 'AMPQ queue address of the form ``<url>_RMQPARAM_<exchange>_RMQPARAM_<queue>`` where ``url`` is the broker address (see explanation `here <https://pika.readthedocs.io/en/stable/examples/using_urlparameters.html>`_), ``exchange`` is the name of the exchange on the queue that should be used, and ``queue`` is the name of the queue.'
atexit()[source]

Close operations.

bind()[source]

Declare queue to get random new queue.

close_channel()[source]

Close the channel if it exists.

close_connection(*args, **kwargs)[source]

Close the connection.

close_queue(skip_unbind=False)[source]

Close the queue if the channel exists.

property create_work_comm_kwargs

Keyword arguments for a new work comm.

Type:

dict

property exchange

AMQP exchange.

Type:

str

get_queue_result()[source]

Get the fram from passive queue declare.

property get_work_comm_kwargs

Keyword arguments for an existing work comm.

Type:

dict

property is_confirmed_recv

True if all received messages have been confirmed.

Type:

bool

property is_confirmed_send

True if all sent messages have been confirmed.

Type:

bool

property is_open

True if the connection and channel are open.

Type:

bool

property n_msg_recv

Number of messages in the queue.

Type:

int

property n_msg_send

Number of messages in the queue.

Type:

int

classmethod new_comm_kwargs(name, user=None, password=None, host=None, virtual_host=None, port=None, exchange=None, queue='', **kwargs)[source]

Initialize communication with new connection.

Parameters:
  • name (str) – Name of new connection.

  • user (str, optional) – RabbitMQ server username. Defaults to config option ‘user’ in section ‘rmq’ if it exists and ‘guest’ if it does not.

  • password (str, optional) – RabbitMQ server password. Defaults to config option ‘password’ in section ‘rmq’ if it exists and ‘guest’ if it does not.

  • host (str, optional) – RabbitMQ server host. Defaults to config option ‘host’ in section ‘rmq’ if it exists and _localhost if it does not. If _localhost, the output of socket.gethostname() is used.

  • virtual_host (str, optional) – RabbitMQ server virtual host. Defaults to config option ‘vhost’ in section ‘rmq’ if it exists and ‘/’ if it does not.

  • port (str, optional) – Port on host to use. Defaults to config option ‘port’ in section ‘rmq’ if it exists and ‘5672’ if it does not.

  • exchange (str, optional) – RabbitMQ exchange. Defaults to config option ‘namespace’ in section ‘rmq’ if it exits and ‘’ if it does not.

  • queue (str, optional) – Name of the queue that messages will be send to or received from. If an empty string, the queue will be a random string and exclusive to a receiving comm. Defaults to ‘’.

  • **kwargs – Additional keywords arguments are returned as keyword arguments for the new comm.

Returns:

Arguments and keyword arguments for new comm.

Return type:

tuple(tuple, dict)

opp_comm_kwargs(for_yaml=False)[source]

Get keyword arguments to initialize communication with opposite comm object.

Parameters:

for_yaml (bool, optional) – If True, the returned dict will only contain values that can be specified in a YAML file. Defaults to False.

Returns:

Keyword arguments for opposite comm object.

Return type:

dict

purge()[source]

Remove all messages from the associated queue.

property queue

AMQP queue.

Type:

str

property url

AMQP server address.

Type:

str

class yggdrasil.communication.RMQComm.RMQServer(*args, **kwargs)[source]

Bases: CommServer

RMQ server object for cleaning up server connections.

terminate(*args, **kwargs)[source]

Also set break flag.

yggdrasil.communication.RMQComm.check_rmq_server(url=None, **kwargs)[source]

Check that connection to a RabbitMQ server is possible.

Parameters:
  • url (str, optional) – Address of RMQ server that includes all the necessary information. If this is provided, the remaining arguments are ignored. Defaults to None and the connection parameters are taken from the other arguments.

  • username (str, optional) – RMQ server username. Defaults to config value.

  • password (str, optional) – RMQ server password. Defaults to config value.

  • host (str, optional) – RMQ hostname or IP Address to connect to. Defaults to config value.

  • port (str, optional) – RMQ server TCP port to connect to. Defaults to config value.

  • vhost (str, optional) – RMQ virtual host to use. Defaults to config value.

Returns:

True if connection to RabbitMQ server is possible, False

otherwise.

Return type:

bool

yggdrasil.communication.RMQComm.get_rmq_parameters(url=None, user=None, username=None, password=None, host=None, virtual_host=None, vhost=None, port=None, exchange=None, queue='')[source]

Get RabbitMQ connection parameters.

Parameters:
  • url (str, optional) – Address of RMQ server that includes all the necessary information. If this is provided, the remaining arguments are ignored. Defaults to None and the connection parameters are taken from the other arguments.

  • user (str, optional) – RabbitMQ server username. Defaults to config option ‘user’ in section ‘rmq’ if it exists and ‘guest’ if it does not.

  • username (str, optional) – Alias for user.

  • password (str, optional) – RabbitMQ server password. Defaults to config option ‘password’ in section ‘rmq’ if it exists and ‘guest’ if it does not.

  • host (str, optional) – RabbitMQ server host. Defaults to config option ‘host’ in section ‘rmq’ if it exists and _localhost if it does not. If _localhost, the output of socket.gethostname() is used.

  • virtual_host (str, optional) – RabbitMQ server virtual host. Defaults to config option ‘vhost’ in section ‘rmq’ if it exists and ‘/’ if it does not.

  • vhost (str, optional) – Alias for virtual_host.

  • port (str, optional) – Port on host to use. Defaults to config option ‘port’ in section ‘rmq’ if it exists and ‘5672’ if it does not.

  • exchange (str, optional) – RabbitMQ exchange. Defaults to config option ‘namespace’ in section ‘rmq’ if it exits and ‘’ if it does not.

  • queue (str, optional) – Name of the queue that messages will be send to or received from. If an empty string, the queue will be a random string and exclusive to a receiving comm. Defaults to ‘’.

Returns:

The connection url, exchange, & queue.

Return type:

tuple

yggdrasil.communication.SequenceFileBase module

class yggdrasil.communication.SequenceFileBase.BAMFileComm(*args, **kwargs)

Bases: PySamFileBase

class yggdrasil.communication.SequenceFileBase.BCFFileComm(*args, **kwargs)

Bases: PySamFileBase

class yggdrasil.communication.SequenceFileBase.BioPythonFileBase(*args, **kwargs)[source]

Bases: SequenceFileBase

Base class for nucleotide/protein sequence I/O using biopython.

dict2record(x)[source]

Covert a dictionary to a SeqRecord.

classmethod get_testing_options(piecemeal=False, **kwargs)[source]

Method to return a dictionary of testing options for this class.

Parameters:

piecemeal (bool, optional) – If True, the test data will be piecemeal.

Returns:

Dictionary of variables to use for testing. Items:
kwargs (dict): Keyword arguments for comms tested with

the provided content.

send (list): List of objects to send to test file. recv (list): List of objects that will be received from a

test file that was sent the messages in ‘send’.

contents (bytes): Bytes contents of test file created by

sending the messages in ‘send’.

Return type:

dict

classmethod is_installed(language=None)[source]

Determine if the necessary libraries are installed for this communication class.

Parameters:

language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked. If set to ‘any’, the result will be True if this comm is installed for any of the supported languages.

Returns:

Is the comm installed.

Return type:

bool

record2dict(x)[source]

Convert a SeqRecord to a dictionary.

property records_iterator

SeqRecord iterator.

Type:

iterator

class yggdrasil.communication.SequenceFileBase.CRAMFileComm(*args, **kwargs)

Bases: PySamFileBase

class yggdrasil.communication.SequenceFileBase.FASTAFileComm(*args, **kwargs)

Bases: BioPythonFileBase

class yggdrasil.communication.SequenceFileBase.FASTQFileComm(*args, **kwargs)

Bases: BioPythonFileBase

class yggdrasil.communication.SequenceFileBase.PySamFileBase(*args, **kwargs)[source]

Bases: SequenceFileBase

Base class for nucleotide/protein sequence I/O using pysam.

advance_in_series(series_index=None)[source]

Advance to a certain file in a series.

Parameters:

series_index (int, optional) – Index of file in the series that should be moved to. Defaults to None and call will advance to the next file in the series.

Returns:

True if the file was advanced in the series, False otherwise.

Return type:

bool

concats_as_str = True
create_index(address, overwrite=False)[source]

Create an index file to accompany the file begin written/read if one is required.

property current_region

Current region.

Type:

dict

classmethod dict2header(x)[source]
classmethod dict2region(x, header=None)[source]
classmethod get_testing_options(test_dir=None, **kwargs)[source]

Method to return a dictionary of testing options for this class.

classmethod header2dict(x)[source]
property header_size
classmethod index_filename(address)[source]

Get the name of the index file.

classmethod is_installed(language=None)[source]

Determine if the necessary libraries are installed for this communication class.

Parameters:

language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked. If set to ‘any’, the result will be True if this comm is installed for any of the supported languages.

Returns:

Is the comm installed.

Return type:

bool

property open_mode

Mode that should be used to open the file.

Type:

str

classmethod record2dict(x)[source]
classmethod region2dict(x)[source]
classmethod remove_companion_files(address)[source]

Remove companion files that are created when writing to a file

Parameters:

address (str) – Address for the filename.

series_file_size(fname)[source]

int: Size of file in series.

class yggdrasil.communication.SequenceFileBase.SAMFileComm(*args, **kwargs)

Bases: PySamFileBase

class yggdrasil.communication.SequenceFileBase.SequenceFileBase(*args, **kwargs)[source]

Bases: DedicatedFileBase

Base class for nucleotide/protein sequence I/O.

static before_registration(cls)[source]

Operations that should be performed to modify class attributes prior to registration.

class yggdrasil.communication.SequenceFileBase.VCFFileComm(*args, **kwargs)

Bases: PySamFileBase

yggdrasil.communication.ServerComm module

class yggdrasil.communication.ServerComm.Request(response_address, request_id)[source]

Bases: object

request_id
response_address
class yggdrasil.communication.ServerComm.ServerComm(*args, **kwargs)[source]

Bases: CommBase

Class for handling Server side communication.

Parameters:
  • name (str) – The environment variable where communication address is stored.

  • request_commtype (str, optional) – Comm class that should be used for the request comm. Defaults to None.

  • response_kwargs (dict, optional) – Keyword arguments for the response comm. Defaults to empty dict.

  • direct_connection (bool, optional) – If True, the comm will be directly connected to a ServerComm. Defaults to False.

  • **kwargs – Additional keywords arguments are passed to the input comm.

response_kwargs

Keyword arguments for the response comm.

Type:

dict

icomm

Request comm.

Type:

Comm

ocomm

Response comms for each request.

Type:

OrderedDict

property all_clients_connected

True if all expected clients have connected. False otherwise.

Type:

bool

close(*args, **kwargs)[source]

Close the connection.

property close_on_eof_recv

True if the comm will close when EOF is received.

Type:

bool

create_response_comm(header)[source]

Create a response comm based on information from the last header.

disconnect(*args, **kwargs)[source]

Disconnect the comm.

drain_messages(direction='recv', **kwargs)[source]

Sleep while waiting for messages to be drained.

drain_server_signon_messages(**kwargs)[source]

Drain server signon messages. This should only be used for testing purposes.

property filter

filter for the communicator.

Type:

FilterBase

finalize_message(msg, **kwargs)[source]

Perform actions to decipher a message.

Parameters:
  • msg (CommMessage) – Initial message object to be finalized.

  • **kwargs – Keyword arguments are passed to the request comm’s finalize_message method.

Returns:

Deserialized and annotated message.

Return type:

CommMessage

get_status_message(nindent=0, **kwargs)[source]

Return lines composing a status message.

Parameters:
  • nindent (int, optional) – Number of tabs that should be used to indent each line. Defaults to 0.

  • *kwargs – Additional arguments are passed to the parent class’s method.

Returns:

Lines composing the status message and the

prefix string used for the last message.

Return type:

tuple(list, prefix)

property is_closed

True if the connection is closed.

Type:

bool

classmethod is_installed(language=None)[source]

Determine if the necessary libraries are installed for this communication class.

Parameters:

language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked.

Returns:

Is the comm installed.

Return type:

bool

property is_open

True if the connection is open.

Type:

bool

property maxMsgSize

Maximum size of a single message that should be sent.

Type:

int

property n_msg_direct

Number of messages currently being routed.

Type:

int

property n_msg_recv

The number of messages in the connection.

Type:

int

property n_msg_recv_drain

The number of messages in the connection to drain.

Type:

int

classmethod new_comm_kwargs(name, request_commtype=None, **kwargs)[source]

Initialize communication with new comms.

Parameters:
  • name (str) – Name for new comm.

  • request_commtype (str, optional) – Name of class for new input comm. Defaults to None.

open()[source]

Open the connection.

property open_clients

Available open clients.

Type:

list

opp_comm_kwargs(for_yaml=False)[source]

Get keyword arguments to initialize communication with opposite comm object.

Parameters:

for_yaml (bool, optional) – If True, the returned dict will only contain values that can be specified in a YAML file. Defaults to False.

Returns:

Keyword arguments for opposite comm object.

Return type:

dict

property opp_comms

Name/address pairs for opposite comms.

Type:

dict

prepare_message(*args, **kwargs)[source]

Perform actions preparing to send a message.

Parameters:
  • *args – Components of the outgoing message.

  • **kwargs – Keyword arguments are passed to the request comm’s prepare_message method.

Returns:

Serialized and annotated message.

Return type:

CommMessage

purge()[source]

Purge input and output comms.

recv_from(*args, **kwargs)[source]

Receive a message from the input comm and open a new response comm for output using address from the header, returning the response_id.

Parameters:
  • *args – Arguments are passed to input comm recv method.

  • **kwargs – Keyword arguments are passed to input comm recv method.

Returns:

Success or failure of recv call,

output from input comm recv method, and response_id that response should be sent to.

Return type:

tuple(bool, obj, str)

recv_message(*args, **kwargs)[source]

Receive a message.

Parameters:
  • *args – Arguments are passed to the request comm’s recv_message method.

  • **kwargs – Keyword arguments are passed to the request comm’s recv_message method.

Returns:

Received message.

Return type:

CommMessage

rpcRecv(*args, **kwargs)[source]

Alias for RPCComm.recv

rpcSend(*args, **kwargs)[source]

Alias for RPCComm.send

send_message(msg, **kwargs)[source]

Send a message encapsulated in a CommMessage object.

Parameters:
  • msg (CommMessage) – Message to be sent.

  • **kwargs – Additional keyword arguments are passed to the response comm’s send_message method.

Returns:

Success or failure of send.

Return type:

bool

send_to(response_id, *args, **kwargs)[source]

Send a message to a specific response comm.

Parameters:
  • response_id (str) – ID used to register the response comm.

  • *args – Arguments are passed to output comm send method.

  • **kwargs – Keyword arguments are passed to output comm send method.

Returns:

Output from output comm send method.

Return type:

obj

yggdrasil.communication.ValueComm module

class yggdrasil.communication.ValueComm.ValueComm(*args, **kwargs)[source]

Bases: CommBase

classmethod get_testing_options(**kwargs)[source]

Method to return a dictionary of testing options for this class.

Parameters:

serializer (str, optional) – The name of the serializer that should be used. If not provided, the _default_serializer class attribute will be used.

Returns:

Dictionary of variables to use for testing. Key/value pairs:
kwargs (dict): Keyword arguments for comms tested with the

provided content.

send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test

file that was sent the messages in ‘send’.

contents (bytes): Bytes contents of test file created by sending

the messages in ‘send’.

Return type:

dict

classmethod is_installed(language=None)[source]

Determine if the necessary libraries are installed for this communication class.

Parameters:

language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked. If set to ‘any’, the result will be True if this comm is installed for any of the supported languages.

Returns:

Is the comm installed.

Return type:

bool

property is_open

True if the connection is open.

Type:

bool

property n_msg_recv

The number of incoming messages in the connection.

Type:

int

no_serialization = True
open()[source]

Open the connection.

purge()[source]

Purge all messages from the comm.

send(*args, **kwargs)[source]

Send a message.

yggdrasil.communication.YAMLFileComm module

class yggdrasil.communication.YAMLFileComm.YAMLFileComm(*args, **kwargs)[source]

Bases: FileComm

Class for handling I/O from/to a YAML file on disk.

Parameters:
  • name (str) – The environment variable where file path is stored.

  • **kwargs – Additional keywords arguments are passed to parent class.

yggdrasil.communication.ZMQComm module

class yggdrasil.communication.ZMQComm.ZMQComm(*args, **kwargs)[source]

Bases: CommBase

Class for handling I/O using ZeroMQ sockets.

Parameters:
  • name (str) – The environment variable where the socket address is stored.

  • context (zmq.Context, optional) – ZeroMQ context that should be used. Defaults to None and the global context is used.

  • socket_type (str, optional) – The type of socket that should be created. Defaults to _default_socket_type. See zmq for all options.

  • socket_action (str, optional) – The action that the socket should perform. Defaults to action based on the direction (‘connect’ for ‘recv’, ‘bind’ for ‘send’.)

  • topic_filter (str, optional) – Message filter to use when subscribing. This is only used for ‘SUB’ socket types. Defaults to ‘’ which is all messages.

  • dealer_identity (str, optional) – Identity that should be used to route messages to a dealer socket. Defaults to ‘0’.

  • **kwargs – Additional keyword arguments are passed to :class:.CommBase.

context

ZeroMQ context that will be used.

Type:

zmq.Context

socket

ZeroMQ socket.

Type:

zmq.Socket

socket_type_name

The type of socket that should be created.

Type:

str

socket_type

ZeroMQ socket type.

Type:

int

socket_action

The action that the socket should perform.

Type:

str, optional

topic_filter

Message filter to use when subscribing.

Type:

str

dealer_identity

Identity that should be used to route messages to a dealer socket.

Type:

str

Developer Notes:

yggdrasil uses the tcp transport by default with a PAIR socket type. For every connection, yggdrasil establishes a second request/reply connection that is used to confirm messages passed between the primary PAIR of sockets. On the first send, the model should create a REP socket on an open tcp address and send that address in the header of the first message under the key ‘zmq_reply’. Receiving models should check message headers for this key and, on receipt, establish the partner REQ socket with the specified address (receiving comms can receive from more than one source so they can have more than one request addresses at at time for this purpose). Following every message, the sending model should wait for a message on the reply socket and, on receipt, return the message. Following every message, the receiving model should send the message ‘YGG_REPLY’ on the request socket and wait for a reply. When creating worker comms for sending large messages, the sending model should create the reply comm for the worker in advanced and send it in the header with the worker address under the key ‘zmq_reply_worker’.

address_description = 'A ZeroMQ endpoint of the form <transport>://<address>, where the format of address depends on the transport. Additional information can be found `here <http://api.zeromq.org/3-2:zmq-bind>`_.'
property address_param

Address parameters.

Type:

dict

bind()[source]

Bind to address, getting random port as necessary.

check_reply_socket_recv(msg)[source]

Check incoming message for reply address.

Parameters:

msg (str) – Incoming message to check.

Returns:

Messages with reply address removed if present.

Return type:

str

check_reply_socket_send(msg)[source]

Append reply socket address if it

Parameters:

msg (str) – Message that will be piggy backed on.

Returns:

Message with reply address if it has not been sent.

Return type:

str

classmethod close_registry_entry(value)[source]

Close a registry entry.

confirm_recv(noblock=False)[source]

Confirm that message was received.

confirm_send(noblock=False)[source]

Confirm that sent message was received.

connect()[source]

Connect to address.

property create_work_comm_kwargs

Keyword arguments for a new work comm.

Type:

dict

disconnect_socket(dont_close=False)[source]

Disconnect from address.

drain_server_signon_messages(**kwargs)[source]

Drain server signon messages. This should only be used for testing purposes.

get_status_message(nindent=0, **kwargs)[source]

Return lines composing a status message.

Parameters:
  • nindent (int, optional) – Number of tabs that should be used to indent each line. Defaults to 0.

  • **kwargs – Additional keyword arguments are passed to the parent class’s method.

Returns:

Lines composing the status message and the

prefix string used for the last message.

Return type:

tuple(list, prefix)

property get_work_comm_kwargs

Keyword arguments for an existing work comm.

Type:

dict

header2workcomm(header, **kwargs)[source]

Get a work comm based on header info.

Parameters:
  • header (dict) – Information that will be sent in the message header to the work comm.

  • **kwargs – Additional keyword arguments are passed to the parent method.

Returns:

class:.CommBase: Work comm.

property host

Host that socket is connected to.

Type:

str

property is_confirmed_recv

True if all received messages have been confirmed.

Type:

bool

is_message(flags)[source]

Poll the socket for a message.

Parameters:

flags (int) – ZMQ poll flags.

Returns:

True if there is a message matching the flags, False otherwise.

Return type:

bool

property is_open

True if the socket is open.

Type:

bool

property n_msg_recv

The number of incoming messages in the connection.

Type:

int

property n_msg_send

The number of outgoing messages in the connection.

Type:

int

classmethod new_comm_kwargs(name, protocol=None, host=None, port=None, **kwargs)[source]

Initialize communication with new queue.

Parameters:
  • name (str) – Name of new socket.

  • protocol (str, optional) – The protocol that should be used. Defaults to None and is set to _default_protocol. See zmq for details.

  • host (str, optional) – The host that should be used. Invalid for ‘inproc’ protocol. Defaults to ‘localhost’.

  • port (int, optional) – The port used. Invalid for ‘inproc’ protocol. Defaults to None and a random port is choosen.

  • **kwargs – Additional keywords arguments are returned as keyword arguments for the new comm.

Returns:

Arguments and keyword arguments for new socket.

Return type:

tuple(tuple, dict)

open()[source]

Open connection by binding/connect to the specified socket.

property opp_address

Address for opposite comm.

Type:

str

opp_comm_kwargs(for_yaml=False)[source]

Get keyword arguments to initialize communication with opposite comm object.

Parameters:

for_yaml (bool, optional) – If True, the returned dict will only contain values that can be specified in a YAML file. Defaults to False.

Returns:

Keyword arguments for opposite comm object.

Return type:

dict

property port

Port that socket is connected to.

Type:

str

prepare_header(header_kwargs)[source]

Prepare header kwargs for the communicator.

property protocol

Protocol that socket uses.

Type:

str

recv(*args, **kwargs)[source]

Receive a message.

property registry_key

String used to register the socket.

Type:

str

property reply_thread

Task that will handle sending or receiving backlogged messages.

Type:

tools.YggTask

send(*args, **kwargs)[source]

Send a message.

server_exists(srv_address)[source]

Determine if a server exists.

Parameters:

srv_address (str) – Address of server comm.

Returns:

True if a server with the provided address exists, False

otherwise.

Return type:

bool

set_reply_socket_recv(address)[source]

Set the recv reply socket if the address dosn’t exist.

set_reply_socket_send()[source]

Set the send reply socket if it dosn’t exist.

unbind(dont_close=False)[source]

Unbind from address.

workcomm2header(work_comm, **kwargs)[source]

Get header information from a comm.

Parameters:
  • ( (work_comm) – class:.CommBase): Work comm that header describes.

  • **kwargs – Additional keyword arguments are added to the header.

Returns:

Header information that will be sent with a message.

Return type:

dict

class yggdrasil.communication.ZMQComm.ZMQProxy(*args, **kwargs)[source]

Bases: CommServer

Start a proxy in a new thread for a server address. A client-side address will be randomly generated.

Parameters:
  • srv_address (str) – Address that should face the server(s).

  • context (zmq.Context, optional) – ZeroMQ context that should be used. Defaults to None and the global context is used.

  • protocol (str, optional) – Protocol that should be used for the sockets. Defaults to None and is set to _default_protocol.

  • host (str, optional) – Host for socket address. Defaults to ‘localhost’.

  • retry_timeout (float, optional) – Time (in seconds) that should be waited before retrying to bind the sockets to the addresses. If negative, a retry will not be attempted and an error will be raised. Defaults to -1.

  • nretry (int, optional) – Number of times to try binding the sockets to the addresses. Defaults to 1.

  • **kwargs – Additional keyword arguments are passed to the parent class.

srv_address

Address that faces the server(s).

Type:

str

cli_address

Address that faces the client(s).

Type:

str

context

ZeroMQ context that will be used.

Type:

zmq.Context

srv_socket

Socket facing client(s).

Type:

zmq.Socket

cli_socket

Socket facing server(s).

Type:

zmq.Socket

cli_count

Number of clients that have connected to this proxy.

Type:

int

after_loop()[source]

Close sockets after the loop finishes.

cleanup()[source]

Clean up sockets on exit.

client_recv()[source]

Receive single message from the client.

client_signon_msg = b'ZMQ_CLIENT_SIGNED_ON::'
close_sockets()[source]

Close the sockets.

poll()[source]

Check if the process is finished and return the return code if it is.

run_loop()[source]

Forward messages from client to server.

server_send(msg)[source]

Send single message to the server.

server_signon_msg = b'ZMQ_SERVER_SIGNING_ON::'
yggdrasil.communication.ZMQComm.bind_socket(socket, address, retry_timeout=-1, nretry=1)[source]

Bind a socket to an address, getting a random port as necessary.

Parameters:
  • socket (zmq.Socket) – Socket that should be bound.

  • address (str) – Address that socket should be bound to.

  • retry_timeout (float, optional) – Time (in seconds) that should be waited before retrying to bind the socket to the address. If negative, a retry will not be attempted and an error will be raised. Defaults to -1.

  • nretry (int, optional) – Number of times to try binding the socket to the addresses. Defaults to 1.

Returns:

Address that socket was bound to, including random port if one

was used.

Return type:

str

yggdrasil.communication.ZMQComm.create_socket(context, socket_type)[source]

Create a socket w/ some default options to improve cleanup.

Parameters:
  • context (zmq.Context) – ZeroMQ context.

  • socket_type (int) – ZeroMQ socket type.

Returns:

New socket.

Return type:

zmq.Socket

yggdrasil.communication.ZMQComm.format_address(protocol, host, port=None)[source]

Format an address based on its parts.

Parameters:
  • protocol (str) – Communication protocol that should be used.

  • host (str) – Host that address should point to.

  • port (int, optional) – Port that address should point to. Defaults to None and is not added to the address.

Returns:

Complete address.

Return type:

str

Raises:

ValueError – If the protocol is not recognized.

yggdrasil.communication.ZMQComm.get_ipc_host()[source]

Get an IPC host using uuid.

Returns:

File path for IPC transport created using uuid.

Return type:

str

yggdrasil.communication.ZMQComm.get_socket_type_mate(t_in)[source]

Find the counterpart socket type.

Parameters:

t_in (str) – Socket type.

Returns:

Counterpart socket type.

Return type:

str

Raises:

ValueError – If t_in is not a recognized socket type.

yggdrasil.communication.ZMQComm.parse_address(address)[source]

Split an address into its parts.

Parameters:

address (str) – Address to be split.

Returns:

Parameters extracted from the address.

Return type:

dict

Raises:
  • ValueError – If the address dosn’t contain ‘://’.

  • ValueError – If the protocol is not supported.

yggdrasil.communication.ZMQComm.set_context_opts(context)[source]

Module contents

yggdrasil.communication.get_comm(name, **kwargs)[source]

Return communicator for existing comm components.

Parameters:
  • name (str) – Communicator name.

  • **kwargs – Additional keyword arguments are passed to new_comm.

Returns:

Communicator of given class.

Return type:

Comm

yggdrasil.communication.new_comm(name, commtype=None, use_async=False, **kwargs)[source]

Return a new communicator, creating necessary components for communication (queues, sockets, channels, etc.).

Parameters:
  • name (str) – Communicator name.

  • commtype (str, list, optional) – Name of communicator type or a list of specifiers for multiple communicators that should be joined. Defaults to None.

  • use_async (bool, optional) – If True, send/recv operations will be performed asynchronously on new threads. Defaults to False.

  • **kwargs – Additional keyword arguments are passed to communicator class method new_comm.

Returns:

Communicator of given class.

Return type:

Comm