yggdrasil.communication package

Subpackages

Submodules

yggdrasil.communication.AsciiFileComm module

class yggdrasil.communication.AsciiFileComm.AsciiFileComm(*args, **kwargs)[source]

Bases: FileComm

Class for handling I/O from/to a file on disk.

Parameters
  • name (str) – The environment variable where communication address is stored.

  • comment (str, optional) – String indicating a comment. If ‘read_meth’ is ‘readline’ and this is provided, lines starting with a comment will be skipped.

  • **kwargs – Additional keywords arguments are passed to parent class.

classmethod get_testing_options(**kwargs)[source]

Method to return a dictionary of testing options for this class.

Returns

Dictionary of variables to use for testing. Key/value pairs:
kwargs (dict): Keyword arguments for comms tested with the

provided content.

send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test

file that was sent the messages in ‘send’.

contents (bytes): Bytes contents of test file created by sending

the messages in ‘send’.

Return type

dict

yggdrasil.communication.AsciiMapComm module

class yggdrasil.communication.AsciiMapComm.AsciiMapComm(*args, **kwargs)[source]

Bases: FileComm

Class for handling I/O from/to a ASCII map on disk.

Parameters
  • name (str) – The environment variable where file path is stored.

  • **kwargs – Additional keywords arguments are passed to parent class.

yggdrasil.communication.AsciiTableComm module

class yggdrasil.communication.AsciiTableComm.AsciiTableComm(*args, **kwargs)[source]

Bases: FileComm

Class for handling I/O from/to a file on disk.

yggdrasil.communication.AsyncComm module

class yggdrasil.communication.AsyncComm.AsyncComm(wrapped, daemon=False, async_recv_method='recv', async_send_method='send_message', async_recv_kwargs=None, async_send_kwargs=None)[source]

Bases: ProxyObject, ComponentBaseUnregistered

Class for handling asynchronous I/O.

Parameters
  • name (str) – The name of the message queue.

  • async_recv_method (str, optional) – The method that should be used to receive message into the backlog. Defaults to ‘recv’.

  • async_send_method (dict, optional) – The method that should be used to send message in the backlog. Defaults to ‘send_message’.

  • async_recv_kwargs (dict, optional) – Keyword arguments to pass to calls receiving messages into the backlog. Defaults to {}.

  • async_send_method – Keyword arguments to pass to calls sending message from the backlog. Defaults to {}.

  • **kwargs – Additional keyword arguments are passed to CommBase.

backlog_ready

Event set when there is a message in the recv backlog.

Type

multitasking.Event

add_backlog(payload)[source]

Add a message to the backlog of messages.

Parameters

payload (tuple) – Arguments and keyword argumetns for send or data from receive.

async_recv_kwargs
async_recv_method
async_send_kwargs
async_send_method
atexit()[source]

Close operations.

property backlog_buffer

Messages that have been received.

Type

list

backlog_ready
property backlog_thread

Task that will handle sinding or receiving backlogged messages.

Type

tools.YggTask

close(linger=False)[source]

Close the connection.

Parameters

linger (bool, optional) – If True, drain messages before closing the comm. Defaults to False.

close_on_eof_recv
daemon
property errors

Errors raised by the wrapped comm or async thread.

Type

list

finalize_message(msg, **kwargs)[source]

Perform actions to decipher a message.

Parameters
  • msg (CommMessage) – Initial message object to be finalized.

  • **kwargs – Keyword arguments are passed to the request comm’s finalize_message method.

Returns

Deserialized and annotated message.

Return type

CommMessage

property is_closed

True if the connection is closed.

Type

bool

property is_confirmed_send

True if all sent messages have been confirmed.

Type

bool

property is_open

True if the backlog is open.

Type

bool

property is_open_backlog

True if the backlog thread is running.

Type

bool

property is_open_direct

True if the direct comm is not None.

Type

bool

property n_msg

The number of messages in the connection.

Type

int

property n_msg_backlog

Number of messages in the backlog.

Type

int

property n_msg_backlog_recv

Number of messages in the receive backlog.

Type

int

property n_msg_backlog_send

Number of messages in the send backlog.

Type

int

property n_msg_direct

Number of messages currently being routed.

Type

int

property n_msg_direct_recv

Number of messages currently being routed in recv.

Type

int

property n_msg_direct_send

Number of messages currently being routed in send.

Type

int

property n_msg_recv

Number of messages in the receive backlog.

Type

int

property n_msg_recv_drain

Number of messages in the receive backlog and direct comm.

Type

int

property n_msg_send

Number of messages in the send backlog.

Type

int

property n_msg_send_drain

Number of messages in the send backlog and direct comm.

Type

int

next(*args, **kwargs)
open()[source]

Open the connection by connecting to the queue.

pop_backlog()[source]

Pop a message from the front of the backlog.

Returns

First backlogged send arguments/keyword arguments

or received data.

Return type

tuple

precheck(*args, **kwargs)[source]
printStatus(*args, **kwargs)[source]

Print status of the communicator.

purge()[source]

Purge all messages from the comm.

recv(timeout=None, return_message_object=False, dont_finalize=False, **kwargs)[source]

Receive a message.

Parameters
  • *args – All arguments are passed to comm _recv method.

  • return_message_object (bool, optional) – If True, the full wrapped CommMessage message object is returned instead of the tuple. Defaults to False.

  • dont_finalize (bool, optional) – If True, finalize_message will not be called even if async_recv_method is ‘recv_message’. Defaults to False.

  • **kwargs – All keywords arguments are passed to comm _recv method.

Returns

Success or failure of receive and received

message.

Return type

tuple (bool, obj)

recv_array(*args, **kwargs)[source]

Alias for recv_array on wrapped comm.

recv_backlog()[source]

Check for any messages in the queue and add them to the recv backlog.

recv_dict(*args, **kwargs)[source]

Alias for recv_dict on wrapped comm.

recv_direct()[source]

Receive a message directly from the underlying comm.

recv_message(timeout=None, **kwargs)[source]

Receive a message.

Parameters
  • *args – Arguments are passed to the response comm’s recv_message method.

  • **kwargs – Keyword arguments are passed to the response comm’s recv_message method.

Returns

Received message.

Return type

CommMessage

recv_nolimit(*args, **kwargs)[source]

Alias for recv_nolimit on wrapped comm.

run_backlog_recv()[source]

Continue buffering received messages.

run_backlog_send()[source]

Continue trying to send buffered messages.

send(*args, dont_prepare=False, **kwargs)[source]

Send a message to the backlog.

Parameters
  • *args – All arguments are assumed to be part of the message.

  • **kwargs – All keywords arguments are passed to comm _send method.

Returns

Success or failure of sending the message.

Return type

bool

send_array(*args, **kwargs)[source]

Alias for send_array on wrapped comm.

send_backlog()[source]

Send a message from the send backlog to the queue.

send_dict(*args, **kwargs)[source]

Alias for send_dict on wrapped comm.

send_direct(*args, **kwargs)[source]

Send a message directly to the underlying comm.

send_eof(*args, **kwargs)[source]

Send the EOF message as a short message.

Parameters
  • *args – All arguments are passed to comm send.

  • **kwargs – All keywords arguments are passed to comm send.

Returns

Success or failure of send.

Return type

bool

send_message(msg, **kwargs)[source]

Send a message encapsulated in a CommMessage object.

Parameters
  • msg (CommMessage) – Message to be sent.

  • **kwargs – Additional keyword arguments are passed to _safe_send.

Returns

Success or failure of send.

Return type

bool

send_nolimit(*args, **kwargs)[source]

Alias for send_nolimit on wrapped comm.

yggdrasil.communication.BufferComm module

class yggdrasil.communication.BufferComm.BufferComm(*args, **kwargs)[source]

Bases: CommBase

Class for handling I/O to an in-memory buffer.

Parameters
  • name (str) – The name of the message queue.

  • address (LockedBuffer, optional) – Existing buffer that should be used. If not provided, a new buffer is created.

  • buffer_task_method (str, optional) – Type of tasks that buffer will be used to share information between. Defaults to ‘thread’.

  • **kwargs – Additional keyword arguments are passed to CommBase.

address

Buffer containing messages.

Type

LockedBuffer

buffer_task_method

Type of tasks that buffer will be used to share information between.

Type

str

bind()[source]

Bind to new buffer.

property is_confirmed_recv

True if all received messages have been confirmed.

Type

bool

property is_confirmed_send

True if all sent messages have been confirmed.

Type

bool

classmethod is_installed(language=None)[source]

Determine if the necessary libraries are installed for this communication class.

Parameters

language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked. If set to ‘any’, the result will be True if this comm is installed for any of the supported languages.

Returns

Is the comm installed.

Return type

bool

property is_open

True if the connection is open.

Type

bool

property n_msg_recv

Number of messages in the receive backlog.

Type

int

property n_msg_send

Number of messages in the send backlog.

Type

int

no_serialization = True
open()[source]

Open the buffer.

purge()[source]

Purge all messages from the comm.

class yggdrasil.communication.BufferComm.LockedBuffer(*args, **kwargs)[source]

Bases: Queue

Buffer intended to be shared between threads/processes.

append(x)[source]

Add an element to the queue.

clear()[source]

Remove all elements from the queue.

close(join=False)[source]

Close the buffer.

property closed

True if the queue is closed, False otherwise.

Type

bool

disconnect()[source]

Disconnect from the aliased object by replacing it with a dummy object.

pop(index=0, default=None)[source]

Remove the first element from the queue.

yggdrasil.communication.ClientComm module

class yggdrasil.communication.ClientComm.ClientComm(*args, **kwargs)[source]

Bases: CommBase

Class for handling Client side communication.

Parameters
  • name (str) – The environment variable where communication address is stored.

  • request_commtype (str, optional) – Comm class that should be used for the request comm. Defaults to None.

  • response_kwargs (dict, optional) – Keyword arguments for the response comm. Defaults to empty dict.

  • direct_connection (bool, optional) – If True, the comm will be directly connected to a ServerComm. Defaults to False.

  • **kwargs – Additional keywords arguments are passed to the output comm.

response_kwargs

Keyword arguments for the response comm.

Type

dict

request_order

Order of request IDs.

Type

list

responses

Mapping between request IDs and response messages.

Type

dict

ocomm

Request comm.

Type

Comm

icomm

Response comm.

Type

Comm

atexit()[source]

Close operations.

call(*args, **kwargs)[source]

Do RPC call. The request message is sent to the output comm and the response is received from the input comm.

Parameters
  • *args – Arguments are passed to output comm send method.

  • **kwargs – Keyword arguments are passed to output comm send method

Returns

Output from input comm recv method.

Return type

obj

call_nolimit(*args, **kwargs)[source]

Alias for call.

close(*args, **kwargs)[source]

Close the connection.

create_response_comm()[source]

Create a response comm based on information from the last header.

disconnect(*args, **kwargs)[source]

Disconnect the comm.

drain_messages(direction='send', **kwargs)[source]

Sleep while waiting for messages to be drained.

property filter

filter for the communicator.

Type

FilterBase

finalize_message(msg, **kwargs)[source]

Perform actions to decipher a message.

Parameters
  • msg (CommMessage) – Initial message object to be finalized.

  • **kwargs – Keyword arguments are passed to the response comm’s finalize_message method.

Returns

Deserialized and annotated message.

Return type

CommMessage

get_status_message(nindent=0, **kwargs)[source]

Return lines composing a status message.

Parameters
  • nindent (int, optional) – Number of tabs that should be used to indent each line. Defaults to 0.

  • *kwargs – Additional arguments are passed to the parent class’s method.

Returns

Lines composing the status message and the

prefix string used for the last message.

Return type

tuple(list, prefix)

property is_closed

True if the connection is closed.

Type

bool

classmethod is_installed(language=None)[source]

Determine if the necessary libraries are installed for this communication class.

Parameters

language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked.

Returns

Is the comm installed.

Return type

bool

property is_open

True if the connection is open.

Type

bool

property maxMsgSize

Maximum size of a single message that should be sent.

Type

int

property n_msg_direct

Number of messages currently being routed.

Type

int

property n_msg_send

The number of messages in the connection.

Type

int

property n_msg_send_drain

The number of outgoing messages in the connection to drain.

Type

int

classmethod new_comm_kwargs(name, request_commtype=None, **kwargs)[source]

Initialize communication with new comms.

Parameters
  • name (str) – Name for new comm.

  • request_commtype (str, optional) – Name of class for new output comm. Defaults to None.

open()[source]

Open the connection.

property opp_address

Address for opposite comm.

Type

str

opp_comm_kwargs(for_yaml=False)[source]

Get keyword arguments to initialize communication with opposite comm object.

Parameters

for_yaml (bool, optional) – If True, the returned dict will only contain values that can be specified in a YAML file. Defaults to False.

Returns

Keyword arguments for opposite comm object.

Return type

dict

property opp_comms

Name/address pairs for opposite comms.

Type

dict

prepare_message(*args, **kwargs)[source]

Perform actions preparing to send a message.

Parameters
  • *args – Components of the outgoing message.

  • **kwargs – Keyword arguments are passed to the request comm’s prepare_message method.

Returns

Serialized and annotated message.

Return type

CommMessage

recv_message(*args, **kwargs)[source]

Receive a message.

Parameters
  • *args – Arguments are passed to the response comm’s recv_message method.

  • **kwargs – Keyword arguments are passed to the response comm’s recv_message method.

Returns

Received message.

Return type

CommMessage

rpcCall(*args, **kwargs)[source]

Alias for RPCComm.call

rpcRecv(*args, **kwargs)[source]

Alias for RPCComm.recv

rpcSend(*args, **kwargs)[source]

Alias for RPCComm.send

send_message(msg, **kwargs)[source]

Send a message encapsulated in a CommMessage object.

Parameters
  • msg (CommMessage) – Message to be sent.

  • **kwargs – Additional keyword arguments are passed to the request comm’s send_message method.

Returns

Success or failure of send.

Return type

bool

yggdrasil.communication.CommBase module

class yggdrasil.communication.CommBase.CommBase(*args, **kwargs)[source]

Bases: YggClass

Class for handling I/O.

Parameters
  • name (str) – The environment variable where communication address is stored.

  • address (str, optional) – Communication info. Default to None and address is taken from the environment variable.

  • direction (str, optional) – The direction that messages should flow through the connection. ‘send’ if the connection will send messages, ‘recv’ if the connecton will receive messages. Defaults to ‘send’.

  • is_interface (bool, optional) – Set to True if this comm is a Python interface binding. Defaults to False.

  • language (str, optional) – Programming language of the calling model. Defaults to ‘python’.

  • env (dict, optional) – Environment variable that should be used. Defaults to os.environ if not provided.

  • partner_model (str, optional) – Name of model that this comm is partnered with. Default to None, indicating that the partner is not a model.

  • partner_language (str, optional) – Programming language of this comm’s partner comm. Defaults to ‘python’.

  • partner_mpi_ranks (list, optional) – Ranks of processes of this comm’s partner comm(s). Defaults to [].

  • datatype (schema, optional) – JSON schema (with expanded core types defined by yggdrasil) that constrains the type of data that should be sent/received by this object. Defaults to {‘type’: ‘bytes’}. Additional information on specifying datatypes can be found here.

  • field_names (list, optional) – [DEPRECATED] Field names that should be used to label fields in sent/received tables. This keyword is only valid for table-like datatypes. If not provided, field names are created based on the field order.

  • field_units (list, optional) – [DEPRECATED] Field units that should be used to convert fields in sent/received tables. This keyword is only valid for table-like datatypes. If not provided, all fields are assumed to be unitless.

  • as_array (bool, optional) – [DEPRECATED] If True and the datatype is table-like, tables are sent/recieved with either columns rather than row by row. Defaults to False.

  • ( (default_file) – class:.DefaultSerialize, optional): Class with serialize and deserialize methods that should be used to process sent and received messages. Defaults to None and is constructed using provided ‘serializer_kwargs’.

  • serializer_kwargs (dict, optional) – Keyword arguments that should be passed to :class:.DefaultSerialize to create serializer. Defaults to {}.

  • format_str (str, optional) – String that should be used to format/parse messages. Default to None.

  • dont_open (bool, optional) – If True, the connection will not be opened. Defaults to False.

  • is_interface – Set to True if this comm is a Python interface binding. Defaults to False.

  • recv_timeout (float, optional) – Time that should be waited for an incoming message before returning None. Defaults to 0 (no wait). A value of False indicates that recv should block.

  • close_on_eof_recv (bool, optional) – If True, the comm will be closed when it receives an end-of-file messages. Otherwise, it will remain open. Defaults to True.

  • close_on_eof_send (bool, optional) – If True, the comm will be closed after it sends an end-of-file messages. Otherwise, it will remain open. Defaults to False.

  • single_use (bool, optional) – If True, the comm will only be used to send/recv a single message. Defaults to False.

  • reverse_names (bool, optional) – If True, the suffix added to the comm with be reversed. Defaults to False.

  • no_suffix (bool, optional) – If True, no directional suffix will be added to the comm name. Defaults to False.

  • allow_multiple_comms (bool, optional) – If True, initialize the comm such that mulitiple comms can connect to the same address. Defaults to False.

  • is_client (bool, optional) – If True, the comm is one of many potential clients that will be sending messages to one or more servers. Defaults to False.

  • is_response_client (bool, optional) – If True, the comm is a client-side response comm. Defaults to False.

  • is_server (bool, optional) – If True, the commis one of many potential servers that will be receiving messages from one or more clients. Defaults to False.

  • is_response_server (bool, optional) – If True, the comm is a server-side response comm. Defaults to False.

  • recv_converter (func, optional) – Converter that should be used on received objects. Defaults to None.

  • send_converter (func, optional) – Converter that should be used on sent objects. Defaults to None.

  • vars (list, optional) – Names of variables to be sent/received by this comm. Defaults to [].

  • length_map (dict, optional) – Map from pointer variable names to the names of variables where their length will be stored. Defaults to {}.

  • comm (str, optional) – The comm that should be created. This only serves as a check that the correct class is being created. Defaults to None.

  • ( – class:.FilterBase, optional): Filter that will be used to determine when messages should be sent/received. Ignored if not provided.

  • ( – class:.TransformBase, optional): One or more transformations that will be applied to messages that are sent/received. Ignored if not provided.

  • is_default (bool, optional) – If True, this comm was created to handle all input/output variables to/from a model. Defaults to False. This variable is used internally and should not be set explicitly in the YAML.

  • outside_loop (bool, optional) – If True, and the comm is an input/outputs to/from a model being wrapped. The receive/send calls for this comm will be outside the loop for the model. Defaults to False.

  • dont_copy (bool, optional) – If True, the comm will not be duplicated in the even a model is duplicated via the ‘copies’ parameter. Defaults to False except for in the case that a model is wrapped and the comm is inside the loop or that a model is a RPC input to a model server.

  • ( – class:FileComm, optional): Comm information for a file that input should be drawn from (for input comms) or that output should be sent to (for output comms) in the event that a yaml does not pair the comm with another model comm or a file.

  • default_value (object, optional) – Value that should be returned in the event that a yaml does not pair the comm with another model comm or a file.

  • for_service (bool, optional) – If True, this comm bridges the gap to an integration running as a service, possibly on a remote machine. Defaults to False.

  • **kwargs – Additional keywords arguments are passed to parent class.

Class Attributes:

is_file (bool): True if the comm accesses a file. _maxMsgSize (int): Maximum size of a single message that should be sent. address_description (str): Description of the information constituting

an address for this communication mechanism.

name

The environment variable where communication address is stored.

Type

str

address

Communication info.

Type

str

direction

The direction that messages should flow through the connection.

Type

str

is_interface

True if this comm is a Python interface binding.

Type

bool

language

Language that this comm is being called from.

Type

str

env

Environment variable that should be used.

Type

dict

partner_model

Name of model that this comm is partnered with.

Type

str

partner_language

Programming language of this comm’s partner comm.

Type

str

partner_mpi_ranks

Ranks of processes of this comm’s partner comm(s).

Type

list

serializer (

class:.DefaultSerialize): Object that will be used to serialize/deserialize messages to/from python objects.

recv_timeout

Time that should be waited for an incoming message before returning None.

Type

float

close_on_eof_recv

If True, the comm will be closed when it receives an end-of-file messages. Otherwise, it will remain open.

Type

bool

close_on_eof_send

If True, the comm will be closed after it sends an end-of-file messages. Otherwise, it will remain open.

Type

bool

single_use

If True, the comm will only be used to send/recv a single message.

Type

bool

allow_multiple_comms

If True, initialize the comm such that mulitiple comms can connect to the same address.

Type

bool

is_client

If True, the comm is one of many potential clients that will be sending messages to one or more servers.

Type

bool

is_response_client

If True, the comm is a client-side response comm.

Type

bool

is_server

If True, the comm is one of many potential servers that will be receiving messages from one or more clients.

Type

bool

is_response_server

If True, the comm is a server-side response comm.

Type

bool

recv_converter

Converter that should be used on received objects.

Type

func

send_converter

Converter that should be used on sent objects.

Type

func

filter (

class:.FilterBase): Callable class that will be used to determine when messages should be sent/received.

Raises
  • RuntimeError – If the comm class is not installed.

  • RuntimeError – If there is not an environment variable with the specified name.

  • ValueError – If directions is not ‘send’ or ‘recv’.

add_work_comm(comm)[source]

Add work comm to dict.

Parameters

( (comm) – class:.CommBase): Comm that should be added.

Raises

KeyError – If there is already a comm associated with the key.

address_description = None
property any_files

True if the comm interfaces with any files.

Type

bool

apply_transform(msg_in, for_empty=False, header=False)[source]

Evaluate the transform to alter the emssage being sent/received.

Parameters
  • msg_in (object) – Message being transformed.

  • for_empty (bool, optional) – If True, the transformation is being used to check for an empty message and errors will be caught. Defaults to False.

  • header (dict, optional) – Header keyword arguments associated with a message. Defaults to False and is ignored.

  • typedef (dict, optiona) – Type to transform. Default to None and will be determined by the serializer if receiving.

Returns

Transformed message.

Return type

object

apply_transform_to_type(typedef)[source]

Evaluate the transform to alter the type definition.

Parameters

typedef (dict) – Type definition to transform.

Returns

Transformed type definition.

Return type

dict

atexit()[source]

Close operations.

bind()[source]

Bind in place of open.

chunk_message(msg)[source]

Yield chunks of message of size maxMsgSize

Parameters

msg (str, bytes) – Raw message bytes to be chunked.

Returns

Chunks of message.

Return type

str

classmethod cleanup_comms()[source]

Cleanup registered comms of this class.

close(linger=False, **kwargs)[source]

Close the connection.

Parameters
  • linger (bool, optional) – If True, drain messages before closing the comm. Defaults to False.

  • **kwargs – Additional keyword arguments are passed to linger method if linger is True.

close_in_thread(no_wait=False, timeout=None)[source]

In a new thread, close the comm when it is empty.

Parameters
  • no_wait (bool, optional) – If True, don’t wait for closing thread to stop.

  • timeout (float, optional) – Time that should be waited for the comm to close. Defaults to None and is set to self.timeout. If False, this will block until the comm is closed.

classmethod close_registry_entry(value)[source]

Close a registry entry.

coerce_to_dict(msg, key_order, metadata)[source]

Convert a message to a dictionary.

Parameters
  • msg (object) – Message to convert to a dictionary.

  • key_order (list) – Key order to use for the output dictionary.

  • metadata (dict) – Header data to accompany the message.

Returns

Converted message.

Return type

dict

classmethod comm_count()[source]

int: Number of communication connections.

classmethod comm_registry()[source]

dict: Registry of comms of this class.

confirm(direction=None, noblock=False)[source]

Confirm message.

confirm_recv(noblock=False)[source]

Confirm that message was received.

confirm_send(noblock=False)[source]

Confirm that sent message was received.

create_work_comm(work_comm_name=None, **kwargs)[source]

Create a temporary work comm.

Parameters
  • work_comm_name (str, optional) – Name that should be used for the work comm. If not provided, one is created from the header id and the comm class.

  • **kwargs – Keyword arguments for new_comm that should override work_comm_kwargs.

Returns

class:.CommBase: Work comm.

property create_work_comm_kwargs

Keyword arguments for a new work comm.

Type

dict

deserialize(*args, **kwargs)[source]

Deserialize a message using the associated deserializer.

drain_messages(direction=None, timeout=None, variable=None)[source]

Sleep while waiting for messages to be drained.

drain_server_signon_messages(**kwargs)[source]

Drain server signon messages. This should only be used for testing purposes.

property empty_bytes_msg

Empty serialized message.

Type

str

property empty_obj_recv

Empty message object.

Type

obj

property eof_msg

Message indicating EOF.

Type

str

evaluate_filter(*msg_in)[source]

Evaluate the filter to determine how the message should be handled.

Parameters

*msg_in (object) – Parts of message being evaluated.

Returns

True if the filter evaluates to True, False otherwise.

Return type

bool

extract_key_order(kwargs)[source]

Extract the key order from keyword arguments.

Parameters

kwargs (dict) – Keyword arguments.

Returns

Key order.

Return type

list

finalize_message(msg, skip_processing=False, skip_python2language=False, after_finalize_message=None)[source]

Perform actions to decipher a message. The order of steps is

  1. Transform the message

  2. Filter

  3. python2language

  4. Close comm on EOF if close_on_eof_recv set

  5. Check for empty recv after processing

  6. Mark comm as used and close if single use

  7. Apply after_finalize_message functions

Parameters
  • msg (CommMessage) – Initial message object to be finalized.

  • skip_processing (bool, optional) – If True, filters, transformations, and after_finalize_message funciton applications will not be performed. Defaults to False.

  • skip_python2language (bool, optional) – If True, python2language will not be applied. Defaults to False.

  • after_finalize_message (list, optional) – A set of function that should be applied to received CommMessage objects following the standard finalization. Defaults to None and is ignored.

Returns

Deserialized and annotated message.

Return type

CommMessage

property full_model_name

Name of the model using the comm w/ copy suffix.

Type

str

property get_response_comm_kwargs

Keyword arguments to use for a response comm.

Type

dict

get_status_message(nindent=0, extra_lines_before=None, extra_lines_after=None)[source]

Return lines composing a status message.

Parameters
  • nindent (int, optional) – Number of tabs that should be used to indent each line. Defaults to 0.

  • extra_lines_before (list, optional) – Additional lines that should be added to the beginning of the default print message. Defaults to empty list if not provided.

  • extra_lines_after (list, optional) – Additional lines that should be added to the end of the default print message. Defaults to empty list if not provided.

Returns

Lines composing the status message and the

prefix string used for the last message.

Return type

tuple(list, prefix)

classmethod get_testing_options(serializer=None, **kwargs)[source]

Method to return a dictionary of testing options for this class.

Parameters

serializer (str, optional) – The name of the serializer that should be used. If not provided, the _default_serializer class attribute will be used.

Returns

Dictionary of variables to use for testing. Key/value pairs:
kwargs (dict): Keyword arguments for comms tested with the

provided content.

send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test

file that was sent the messages in ‘send’.

contents (bytes): Bytes contents of test file created by sending

the messages in ‘send’.

Return type

dict

get_work_comm(header, **kwargs)[source]

Get temporary work comm, creating as necessary.

Parameters
  • header (dict) – Information that will be sent in the message header to the work comm.

  • **kwargs – Additional keyword arguments are passed to header2workcomm.

Returns

class:.CommBase: Work comm.

property get_work_comm_kwargs

Keyword arguments for an existing work comm.

Type

dict

header2workcomm(header, work_comm_name=None, **kwargs)[source]

Get a work comm based on header info.

Parameters
  • header (dict) – Information that will be sent in the message header to the work comm.

  • work_comm_name (str, optional) – Name that should be used for the work comm. If not provided, one is created from the header id and the comm class.

  • **kwargs – Additional keyword arguments are added to the returned dictionary.

Returns

class:.CommBase: Work comm.

property is_closed

True if the connection is closed.

Type

bool

property is_confirmed

True if all messages have been confirmed.

Type

bool

property is_confirmed_recv

True if all received messages have been confirmed.

Type

bool

property is_confirmed_send

True if all sent messages have been confirmed.

Type

bool

is_empty(msg, emsg)[source]

Check that a message matches an empty message object.

Parameters
  • msg (object) – Message object.

  • emsg (object) – Empty message object.

Returns

True if the object is empty, False otherwise.

Return type

bool

is_empty_recv(msg)[source]

Check if a received message object is empty.

Parameters

msg (obj) – Message object.

Returns

True if the object is empty, False otherwise.

Return type

bool

is_eof(msg)[source]

Determine if a message is an EOF.

Parameters

msg (obj) – Message object to be tested.

Returns

True if the message indicates an EOF, False otherwise.

Return type

bool

is_file = False
classmethod is_installed(language=None)[source]

Determine if the necessary libraries are installed for this communication class.

Parameters

language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked. If set to ‘any’, the result will be True if this comm is installed for any of the supported languages.

Returns

Is the comm installed.

Return type

bool

property is_open

True if the connection is open.

Type

bool

classmethod is_registered(key)[source]

bool: True if the comm is registered, False otherwise.

language_atexit()[source]

Close operations specific to the language.

linger(active_confirm=False)[source]

Wait for messages to drain.

linger_close(**kwargs)[source]

Wait for messages to drain, then close.

property maxMsgSize

Maximum size of a single message that should be sent.

Type

int

property model_copies

Number of copies of the model using the comm.

Type

int

property model_env

Mapping between model name and opposite comm environment variables that need to be provided to the model.

Type

dict

property model_name

Name of the model using the comm.

Type

str

property n_msg

The number of messages in the connection.

Type

int

property n_msg_recv

The number of incoming messages in the connection.

Type

int

property n_msg_recv_drain

The number of incoming messages in the connection to drain.

Type

int

property n_msg_send

The number of outgoing messages in the connection.

Type

int

property n_msg_send_drain

The number of outgoing messages in the connection to drain.

Type

int

classmethod new_comm(name, *args, **kwargs)[source]

Initialize communication with new queue.

classmethod new_comm_kwargs(*args, **kwargs)[source]

Get keyword arguments for new comm.

new_server(srv_address)[source]

Create a new server.

Parameters

srv_address (str) – Address of server comm.

no_serialization = False
open()[source]

Open the connection.

property opp_address

Address for opposite comm.

Type

str

opp_comm_kwargs(for_yaml=False)[source]

Get keyword arguments to initialize communication with opposite comm object.

Parameters

for_yaml (bool, optional) – If True, the returned dict will only contain values that can be specified in a YAML file. Defaults to False.

Returns

Keyword arguments for opposite comm object.

Return type

dict

property opp_comms

Name/address pairs for opposite comms.

Type

dict

property opp_name

Name that should be used for the opposite comm.

Type

str

precheck(direction)[source]

Check that comm is ready for action in specified direction, raising errors if it is not.

Parameters

direction (str) – Check that comm is ready to perform this action.

prepare_message(*args, header_kwargs=None, skip_serialization=False, skip_processing=False, skip_language2python=False, after_prepare_message=None, flag=None)[source]

Perform actions preparing to send a message. The order of steps is

  1. Convert the message based on the language

  2. Isolate the message if there is only one

  3. Check if the message is EOF

  4. Check if the message should be filtered

  5. Transform the message

  6. Apply after_prepare_message functions

  7. Serialize the message

  8. Create a work comm if the message is too large to be sent all at once

Parameters
  • *args – Components of the outgoing message.

  • header_kwargs (dict, optional) – Header options that should be set.

  • skip_serialization (bool, optional) – If True, serialization will not be performed. Defaults to False.

  • skip_processing (bool, optional) – If True, filters, transformations, and after_prepare_message function applications will not be performed. Defaults to False.

  • skip_language2python (bool, optional) – If True, language2python will be skipped. Defaults to False.

  • after_prepare_message (list, optional) – Functions that should be applied after transformation, but before serialization. Defaults to None and is ignored.

  • flag (int, optional) – Flag that should be added to the message before any additional processing is performed. Defaults to None and is ignored.

Returns

Serialized and annotated message.

Return type

CommMessage

printStatus(*args, level='info', return_str=False, **kwargs)[source]

Print status of the communicator.

purge()[source]

Purge all messages from the comm.

recv(*args, return_message_object=False, **kwargs)[source]

Receive a message.

Parameters
  • *args – All arguments are passed to comm _recv method.

  • return_message_object (bool, optional) – If True, the full wrapped CommMessage message object is returned instead of the tuple. Defaults to False.

  • **kwargs – All keywords arguments are passed to comm _recv method.

Returns

Success or failure of receive and received

message. If return_message_object is True, the CommMessage object will be returned instead.

Return type

tuple (bool, obj)

recv_array(*args, **kwargs)[source]

Alias for recv.

recv_dict(*args, **kwargs)[source]

Return a received message as a dictionary of fields. If there are not any fields specified, the fields will have the form ‘f0’, ‘f1’, ‘f2’, …

Parameters
  • *args – Arguments are passed to recv.

  • **kwargs – Keyword arguments are passed to recv.

Returns

Success/failure of receive and a dictionar of

message fields.

Return type

tuple(bool, dict)

Raises:

recv_message(*args, skip_deserialization=False, **kwargs)[source]

Receive a message.

Parameters
  • *args – Arguments are passed to _safe_recv.

  • skip_deserialization (bool, optional) – If True, deserialization is not performed. Defaults to False.

  • **kwargs – Additional keyword arguments are passed to _safe_recv.

Returns

Received message.

Return type

CommMessage

recv_nolimit(*args, **kwargs)[source]

Alias for recv.

classmethod register_comm(key, value)[source]

Register a comm.

remove_work_comm(key, in_thread=False, linger=False)[source]

Close and remove a work comm.

Parameters
  • key (str) – Key of comm that should be removed.

  • in_thread (bool, optional) – If True, close the work comm in a thread. Defaults to False.

  • linger (bool, optional) – If True, drain messages before closing the comm. Defaults to False.

send(*args, **kwargs)[source]

Send a message.

Parameters
  • *args – All arguments are assumed to be part of the message.

  • **kwargs – All keywords arguments are passed to prepare_message or send_message.

Returns

Success or failure of send.

Return type

bool

send_array(*args, **kwargs)[source]

Alias for send.

send_dict(args_dict, **kwargs)[source]

Send a message with fields specified in the input dictionary.

Parameters
  • args_dict (dict) – Dictionary of arguments to send.

  • **kwargs – Additiona keyword arguments are passed to send.

Returns

Success/failure of send.

Return type

bool

Raises

RuntimeError – If the field order can not be determined.

send_eof(*args, **kwargs)[source]

Send the EOF message as a short message.

Parameters
  • *args – All arguments are passed to comm send.

  • **kwargs – All keywords arguments are passed to comm send.

Returns

Success or failure of send.

Return type

bool

send_message(msg, skip_safe_send=False, **kwargs)[source]

Send a message encapsulated in a CommMessage object.

Parameters
  • msg (CommMessage) – Message to be sent.

  • skip_safe_send (bool, optional) – If True, no actual send will take place. Defaults to False.

  • **kwargs – Additional keyword arguments are passed to _safe_send.

Returns

Success or failure of send.

Return type

bool

send_nolimit(*args, **kwargs)[source]

Alias for send.

serialize(*args, **kwargs)[source]

Serialize a message using the associated serializer.

server_exists(srv_address)[source]

Determine if a server exists.

Parameters

srv_address (str) – Address of server comm.

Returns

True if a server with the provided address exists, False

otherwise.

Return type

bool

signoff_from_server()[source]

Remove a client from the server.

signon_to_server()[source]

Add a client to an existing server or create one.

classmethod underlying_comm_class()[source]

str: Name of underlying communication class.

classmethod unregister_comm(key, dont_close=False)[source]

Unregister a comm.

update_message_from_serializer(msg)[source]

Update a message with information about the serializer.

Parameters

msg (CommMessage) – Incoming message.

update_serializer_from_message(msg)[source]

Update the serializer based on information stored in a message.

Parameters

msg (CommMessage) – Outgoing message.

wait_for_confirm(timeout=None, direction=None, active_confirm=False, noblock=False)[source]

Sleep until all messages are confirmed.

wait_for_workers(timeout=None)[source]

Sleep until all workers are closed or have been used.

workcomm2header(work_comm, **kwargs)[source]

Get header information from a comm.

Parameters
  • ( (work_comm) – class:.CommBase): Work comm that header describes.

  • **kwargs – Additional keyword arguments are added to the header.

Returns

Header information that will be sent with a message.

Return type

dict

class yggdrasil.communication.CommBase.CommMessage(msg=None, length=0, flag=None, args=None, header=None)[source]

Bases: object

Class for passing messages around with additional information required to send/receive them.

msg

The serialized message including the header.

Type

bytes

length

The size of the message.

Type

int

flag

Indicates the result of processing the message. Values are: FLAG_FAILURE: Processing was unsuccessful. FLAG_SUCCESS: Processing was successful. FLAG_SKIP: The message should be skipped. FLAG_EOF: The message indicates that there will be no more messages.

Type

int

args

The unserialized message (post-transformation).

Type

object

header

Parameters sent in the header of the message.

Type

dict

additional_messages

Messages that should be sent along with this message as in the case that the message was an iterator.

Type

list

worker

Worker communicator that should be used to send worker messages in the case that the original message had to be split.

Type

CommBase

worker_messages

Messages that should be sent via the worker comm comm as the original message had to be split due to its size.

Type

list

sent

True if the message has been sent, False otherwise.

Type

bool

singular

True if there was only one argument.

Type

bool

add_message(*args, **kwargs)[source]

Add a message to the list of additional messages that should be sent following this one.

Parameters
  • *args – Arguments are passed to the CommMessage constructor.

  • *kwargs – Keyword arguments are passed to the CommMessage constructor.

add_worker_message(*args, **kwargs)[source]

Add a message to the list of messages that should be sent via work comm following this one.

Parameters
  • *args – Arguments are passed to the CommMessage constructor.

  • *kwargs – Keyword arguments are passed to the CommMessage constructor.

additional_messages
apply_function(x)[source]

Apply a function to the message.

Parameters

x (function) – Function to apply.

args
finalized
flag
header
length
msg
send_worker_messages(**kwargs)[source]

Send the worker messages via the worker comm.

Parameters

**kwargs – Keyword arguments are passed to the send_message method of the worker comm for each message.

Returns

Success of the send operations.

Return type

bool

sent
sinfo
singular
stype
property tuple_args

Form that arguments were originally supplied.

Type

tuple

worker
worker_messages
class yggdrasil.communication.CommBase.CommServer(*args, **kwargs)[source]

Bases: YggTaskLoop

Basic server object to keep track of clients.

cli_count

Number of clients that have connected to this server.

Type

int

add_client()[source]

Increment the client count.

remove_client()[source]

Decrement the client count, closing the server if all clients done.

class yggdrasil.communication.CommBase.CommTaskLoop(*args, **kwargs)[source]

Bases: YggTaskLoop

Task loop for comms to ensure cleanup.

Parameters
  • ( (comm) – class:.CommBase): Comm class that thread is for.

  • name (str, optional) – Name for the thread. If not provided, one is created by combining the comm name and the provided suffix.

  • suffix (str, optional) – Suffix that should be added to comm name to name the thread. Defaults to ‘CommTask’.

  • **kwargs – Additional keyword arguments are passed to the parent class.

comm (

class:.CommBase): Comm class that thread is for.

on_main_terminated()[source]

Actions taken on the backlog thread when the main thread stops.

exception yggdrasil.communication.CommBase.IncompleteBaseComm[source]

Bases: Exception

An exception class for methods that are incomplete for base classes.

exception yggdrasil.communication.CommBase.NeverMatch[source]

Bases: Exception

An exception class that is never raised by any code anywhere

yggdrasil.communication.CommBase.cleanup_comms(commtype, close_func=None)[source]

Clean up comms of a certain type.

Parameters

commtype (str) – Comm class that should be cleaned up.

Returns

Number of comms closed.

Return type

int

yggdrasil.communication.CommBase.get_comm_registry(commtype)[source]

Get the comm registry for a comm class.

Parameters

commtype (str) – Comm class to get registry for.

Returns

Dictionary of registered comm objects.

Return type

dict

yggdrasil.communication.CommBase.is_registered(commtype, key)[source]

Determine if a comm object has been registered under the specified key.

Parameters
  • commtype (str) – Comm class to check for the key under.

  • key (str) – Key that should be checked.

yggdrasil.communication.CommBase.register_comm(commtype, key, value)[source]

Add a comm object to the global registry.

Parameters
  • commtype (str) – Comm class to register the object under.

  • key (str) – Key that should be used to register the object.

  • value (obj) – Object being registered.

yggdrasil.communication.CommBase.unregister_comm(commtype, key, dont_close=False)[source]

Remove a comm object from the global registry and close it.

Parameters
  • commtype (str) – Comm class to check for key under.

  • key (str) – Key for object that should be removed from the registry.

  • dont_close (bool, optional) – If True, the comm will be removed from the registry, but it won’t be closed. Defaults to False.

Returns

True if an object was closed.

Return type

bool

yggdrasil.communication.DefaultComm module

class yggdrasil.communication.DefaultComm.DefaultComm(*args, **kwargs)[source]

Bases: CommBase

Simple wrapper for default class that allows it to be registered.

yggdrasil.communication.FileComm module

class yggdrasil.communication.FileComm.FileComm(*args, **kwargs)[source]

Bases: CommBase

Class for handling I/O from/to a file on disk.

>>> x = FileComm('test_send', address='test_file.txt', direction='send')
>>> x.send('Test message')
True
>>> with open('test_file.txt', 'r') as fd:
...     print(fd.read())
Test message
>>> x = FileComm('test_recv', address='test_file.txt', direction='recv')
>>> x.recv()
(True, b'Test message')
Parameters
  • name (str) – The environment variable where communication address is stored.

  • read_meth (str, optional) – Method that should be used to read data from the file. Defaults to ‘read’. Ignored if direction is ‘send’.

  • append (bool, optional) – If True and writing, file is openned in append mode. If True and reading, file is kept open even if the end of the file is reached to allow for another process to write to the file in append mode. Defaults to False.

  • in_temp (bool, optional) – If True, the path will be considered relative to the platform temporary directory. Defaults to False.

  • is_series (bool, optional) – If True, input/output will be done to a series of files. If reading, each file will be processed until the end is reached. If writing, each output will be to a new file in the series. The addressed is assumed to contain a format for the index of the file. Defaults to False.

  • wait_for_creation (float, optional) – Time (in seconds) that should be waited before opening for the file to be created if it dosn’t exist. Defaults to 0 s and file will attempt to be opened immediately.

  • **kwargs – Additional keywords arguments are passed to parent class.

fd

File that should be read/written.

Type

file

read_meth

Method that should be used to read data from the file.

Type

str

append

If True and writing, file is openned in append mode.

Type

bool

in_temp

If True, the path will be considered relative to the platform temporary directory.

Type

bool

is_series

If True, input/output will be done to a series of files. If reading, each file will be processed until the end is reached. If writing, each output will be to a new file in the series.

Type

bool

platform_newline

String indicating a newline on the current platform.

Type

str

Raises

ValueError – If the read_meth is not one of the supported values.

advance_in_file(file_pos)[source]

Advance to a certain position in the current file.

Parameters

file_pos (int) – Position that should be moved to in the current. file.

advance_in_series(series_index=None)[source]

Advance to a certain file in a series.

Parameters

series_index (int, optional) – Index of file in the series that should be moved to. Defaults to None and call will advance to the next file in the series.

Returns

True if the file was advanced in the series, False otherwise.

Return type

bool

static before_registration(cls)[source]

Operations that should be performed to modify class attributes prior to registration.

change_position(file_pos, series_index=None, header_was_read=None, header_was_written=None)[source]

Change the position in the file/series.

Parameters
  • file_pos (int) – Position that should be moved to in the file.

  • series_index (int, optinal) – Index of the file in the series that should be moved to. Defaults to None and will be set to the current series index.

  • header_was_read (bool, optional) – Status of if header has been read or not. Defaults to None and will be set to the current value.

  • header_was_written (bool, optional) – Status of if header has been written or not. Defaults to None and will be set to the current value.

classmethod close_registry_entry(value)[source]

Close a registry entry.

property concats_as_str

True if concatenating file contents result in a valid file.

Type

bool

property current_address

Address of file currently being used.

Type

str

disable_header()[source]

Turn off header so that it will not be written.

enable_header()[source]

Turn on header so that it will be written.

property fd

Associated file identifier.

file_flush()[source]

Flush the file.

file_seek(pos, whence=0)[source]

Move in the file to the specified position.

Parameters
  • pos (int) – Position (in bytes) to move file to.

  • whence (int, optional) – Flag indicating position that pos is relative to. 0 for the beginning of the file, 1 for from the current location, and 2 from the end of the file.

property file_size

Current size of file.

Type

int

file_tell()[source]

int: Current position in the file.

get_series_address(index=None)[source]

Get the address of a file in the series.

Parameters

index (int, optional) – Index in series to get address for. Defaults to None and the current index is used.

Returns

Address for the file in the series.

Return type

str

classmethod get_testing_options(read_meth=None, **kwargs)[source]

Method to return a dictionary of testing options for this class.

Parameters
  • read_meth (str, optional) – Read method that will be used by the test class. Defaults to None and will be set by the serialier.

  • **kwargs – Additional keyword arguments are passed to the parent class’s method and the serializers methods for determining the default read_meth and concatenating the sent objects into the objects that are expected to be received.

Returns

Dictionary of variables to use for testing. Key/value pairs:
kwargs (dict): Keyword arguments for comms tested with the

provided content.

send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test

file that was sent the messages in ‘send’.

contents (bytes): Bytes contents of test file created by sending

the messages in ‘send’.

Return type

dict

is_file = True
classmethod is_installed(language=None)[source]

Determine if the necessary libraries are installed for this communication class.

Parameters

language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked. If set to ‘any’, the result will be True if this comm is installed for any of the supported languages.

Returns

Is the comm installed.

Return type

bool

property is_open

True if the connection is open.

Type

bool

property n_msg_recv

The number of messages in the file.

Type

int

classmethod new_comm_kwargs(*args, **kwargs)[source]

Initialize communication with new queue.

open()[source]

Open the file.

property open_mode

Mode that should be used to open the file.

Type

str

opp_comm_kwargs(for_yaml=False)[source]

Get keyword arguments to initialize communication with opposite comm object.

Parameters

for_yaml (bool, optional) – If True, the returned dict will only contain values that can be specified in a YAML file. Defaults to False.

Returns

Keyword arguments for opposite comm object.

Return type

dict

prepare_message(*args, **kwargs)[source]

Perform actions preparing to send a message.

Parameters
  • *args – Components of the outgoing message.

  • **kwargs – Keyword arguments are passed to the parent class’s method.

Returns

Serialized and annotated message.

Return type

CommMessage

purge()[source]

Purge all messages from the comm.

read_header()[source]

Read header lines from the file and update serializer info.

record_position()[source]

Record the current position in the file/series.

property registry_key

String used to register the socket.

Type

str

property remaining_bytes

Remaining bytes in the file.

Type

int

remove_file()[source]

Remove the file.

reset_position(truncate=False)[source]

Move to the front of the file and allow header to be read again.

Parameters

truncate (bool, optional) – If True, the file will be truncated after moving to the beginning, effectively erasing the file. Defaults to False.

serialize(obj, **kwargs)[source]

Serialize a message using the associated serializer.

classmethod underlying_comm_class()[source]

str: Name of underlying communication class.

write_header()[source]

Write header lines to the file based on the serializer info.

yggdrasil.communication.ForkComm module

class yggdrasil.communication.ForkComm.ForkComm(*args, **kwargs)[source]

Bases: CommBase

Class for receiving/sending messages from/to multiple comms.

Parameters
  • name (str) – The environment variable where communication address is stored.

  • comm_list (list, optional) – The list of options for the comms that should be bundled. If not provided, the bundle will be empty.

  • pattern (str, optional) –

    The communication pattern that should be used to handle outgoing/incoming messages. Options include:

    ’cycle’: Receive from or send to the next available comm in

    the list (default for receiving comms).

    ’broadcast’: [SEND ONLY] Send the same message to each comm

    (default for sending comms).

    ’scatter’: [SEND ONLY] Send part of message (must be a list)

    to each comm.

    ’gather’: [RECV ONLY] Receive lists of messages from each

    comm where a message is only returned when there is a message from each.

  • **kwargs – Additional keyword arguments are passed to the parent class.

comm_list

Comms included in this fork.

Type

list

curr_comm_index

Index comm that next receive will be from.

Type

int

property any_files

True if the comm interfaces with any files.

Type

bool

bind()[source]

Bind in place of open.

child_keys = ['serializer_class', 'serializer_kwargs', 'format_str', 'field_names', 'field_units', 'as_array', 'partner_copies']
close(*args, **kwargs)[source]

Close the connection.

close_in_thread(*args, **kwargs)[source]

In a new thread, close the comm when it is empty.

coerce_to_dict(msg, key_order, metadata)[source]

Convert a message to a dictionary.

Parameters
  • msg (object) – Message to convert to a dictionary.

  • key_order (list) – Key order to use for the output dictionary.

  • metadata (dict) – Header data to accompany the message.

Returns

Converted message.

Return type

dict

confirm_recv(noblock=False)[source]

Confirm that message was received.

confirm_send(noblock=False)[source]

Confirm that sent message was received.

property curr_comm

Current comm.

Type

CommBase

disconnect()[source]

Disconnect attributes that are aliases.

drain_server_signon_messages(**kwargs)[source]

Drain server signon messages. This should only be used for testing purposes.

property empty_obj_recv

Empty message object.

Type

obj

finalize_message(msg, **kwargs)[source]

Perform actions to decipher a message.

Parameters
  • msg (CommMessage) – Initial message object to be finalized.

  • **kwargs – Keyword arguments are passed to the forked comm’s finalize_message method.

Returns

Deserialized and annotated message.

Return type

CommMessage

property get_response_comm_kwargs

Keyword arguments to use for a response comm.

Type

dict

get_status_message(**kwargs)[source]

Return lines composing a status message.

Parameters

**kwargs – Keyword arguments are passed on to the parent class’s method.

Returns

Lines composing the status message and the

prefix string used for the last message.

Return type

tuple(list, prefix)

property is_confirmed_recv

True if all received messages have been confirmed.

Type

bool

property is_confirmed_send

True if all sent messages have been confirmed.

Type

bool

property is_open

True if the connection is open.

Type

bool

property last_comm

Last comm that was used.

Type

CommBase

property maxMsgSize

Maximum size of a single message that should be sent.

Type

int

property model_env

Mapping between model name and opposite comm environment variables that need to be provided to the model.

Type

dict

property n_msg_direct

Number of messages currently being routed.

Type

int

property n_msg_direct_recv

Number of messages currently being routed in recv.

Type

int

property n_msg_direct_send

Number of messages currently being routed in send.

Type

int

property n_msg_recv

The number of incoming messages in the connection.

Type

int

property n_msg_recv_drain

The number of incoming messages in the connection to drain.

Type

int

property n_msg_send

The number of outgoing messages in the connection.

Type

int

property n_msg_send_drain

The number of outgoing messages in the connection to drain.

Type

int

classmethod new_comm_kwargs(name, *args, **kwargs)[source]

Get keyword arguments for new comm.

noprop_keys = ['send_converter', 'recv_converter', 'filter', 'transform']
open()[source]

Open the connection.

opp_comm_kwargs(for_yaml=False)[source]

Get keyword arguments to initialize communication with opposite comm object.

Parameters

for_yaml (bool, optional) – If True, the returned dict will only contain values that can be specified in a YAML file. Defaults to False.

Returns

Keyword arguments for opposite comm object.

Return type

dict

property opp_comms

Name/address pairs for opposite comms.

Type

dict

prepare_message(*args, **kwargs)[source]

Perform actions preparing to send a message.

Parameters
  • *args – Components of the outgoing message.

  • **kwargs – Additional keyword arguments are passed to the prepare_message methods for the forked comms.

Returns

Serialized and annotated message.

Return type

CommMessage

purge()[source]

Purge all messages from the comm.

recv_message(*args, **kwargs)[source]

Receive a message.

Parameters
  • *args – Arguments are passed to the forked comm’s recv_message method.

  • **kwargs – Keyword arguments are passed to the forked comm’s recv_message method.

Returns

Received message.

Return type

CommMessage

send_message(msg, **kwargs)[source]

Send a message encapsulated in a CommMessage object.

Parameters
  • msg (CommMessage) – Message to be sent.

  • **kwargs – Additional keyword arguments are passed to _safe_send.

Returns

Success or failure of send.

Return type

bool

property suppress_special_debug
update_serializer_from_message(msg)[source]

Update the serializer based on information stored in a message.

Parameters

msg (CommMessage) – Outgoing message.

class yggdrasil.communication.ForkComm.ForkedCommMessage(msg, comm_list, pattern='broadcast', **kwargs)[source]

Bases: CommMessage

Class for forked comm messages.

Parameters
  • msg (CommBase.CommMessage) – Message being distributed.

  • comm_list (list) – List of communicators that the message is being distributed to.

  • **kwargs – Additional keyword arguments are passed to the ‘prepare_message’ method for each communicator.

orig
yggdrasil.communication.ForkComm.get_comm_name(name, i)[source]

Get the name of the ith comm in the series.

Parameters
  • name (str) – Name of the fork comm.

  • i (int) – Index of comm in fork bundle.

Returns

Name of ith comm in fork bundle.

Return type

str

yggdrasil.communication.IPCComm module

class yggdrasil.communication.IPCComm.IPCComm(*args, **kwargs)[source]

Bases: CommBase

Class for handling I/O via IPC message queues.

q

Message queue.

Type

sysv_ipc.MessageQueue

Developer Notes:

The default size limit for IPC message queues is 2048 bytes on Mac operating systems so it is important that implementation of this communication mechanism properly split and send messages larger than this limit as more than one message.

address_description = 'An IPC message queue key.'
atexit()[source]

Close operations.

bind()[source]

Bind to random queue if address is generate.

classmethod close_registry_entry(value)[source]

Close a registry entry.

confirm_recv(noblock=False)[source]

Confirm that message was received.

confirm_send(noblock=False)[source]

Confirm that sent message was received.

property is_open

True if the queue is not None.

Type

bool

property n_msg_recv

Number of messages in the queue to recv.

Type

int

property n_msg_send

Number of messages in the queue to send.

Type

int

classmethod new_comm_kwargs(*args, **kwargs)[source]

Initialize communication with new queue.

open()[source]

Open the queue.

open_after_bind()[source]

Open the connection by getting the queue from the bound address.

purge()[source]

Purge all messages from the comm.

class yggdrasil.communication.IPCComm.IPCServer(*args, **kwargs)[source]

Bases: CommServer

IPC server object for cleaning up server queue.

terminate(*args, **kwargs)[source]

Also set break flag.

yggdrasil.communication.IPCComm.get_queue(qid=None)[source]

Create or return a sysv_ipc.MessageQueue and register it.

Parameters

qid (int, optional) – If provided, ID for existing queue that should be returned. Defaults to None and a new queue is returned.

Returns

Message queue.

Return type

sysv_ipc.MessageQueue

yggdrasil.communication.IPCComm.ipc_queues()[source]

Get a list of active IPC queues.

Returns

List of IPC queues.

Return type

list

yggdrasil.communication.IPCComm.ipcrm(options=[])[source]

Remove IPC constructs using the ipcrm command.

Parameters

options (list) – List of flags that should be used. Defaults to an empty list.

yggdrasil.communication.IPCComm.ipcrm_queues(queue_keys=None)[source]

Delete existing IPC queues.

Parameters

queue_keys (list, str, optional) – A list of keys for queues that should be removed. Defaults to all existing queues.

yggdrasil.communication.IPCComm.ipcs(options=[])[source]

Get the output from running the ipcs command.

Parameters

options (list) – List of flags that should be used. Defaults to an empty list.

Returns

Captured output.

Return type

str

yggdrasil.communication.IPCComm.remove_queue(mq)[source]

Remove a sysv_ipc.MessageQueue and unregister it.

Parameters

mq (sysv_ipc.MessageQueue) –

Raises

KeyError – If the provided queue is not registered.

yggdrasil.communication.JSONFileComm module

class yggdrasil.communication.JSONFileComm.JSONFileComm(*args, **kwargs)[source]

Bases: FileComm

Class for handling I/O from/to a JSON file on disk.

Parameters
  • name (str) – The environment variable where file path is stored.

  • **kwargs – Additional keywords arguments are passed to parent class.

yggdrasil.communication.MPIComm module

exception yggdrasil.communication.MPIComm.MPIClosedError[source]

Bases: BaseException

Exception to raise when the MPI connection has been closed.

class yggdrasil.communication.MPIComm.MPIComm(*args, **kwargs)[source]

Bases: CommBase

Class for handling I/O via MPI communicators.

Parameters
  • tag_start (int, optional) – Tag that MPI messages should start with. Defaults to 0.

  • tag_stride (int, optional) – Amount that tag should be advanced after each message to avoid conflicts w/ other MPIComm communicators. Defaults to 1.

  • partner_mpi_ranks (list, optional) – Rank of MPI processes that partner models are running on. Defaults to None.

tag

Tag that should be used for the next MPI message.

Type

int

tag_stride

Amount that tag should be advanced after each each message to avoid conflicts w/ other MPIComm communicators.

Type

int

add_request(on_empty=False, **kwargs)[source]

Add a request to the queue.

address_description = 'The partner communicator ID(s).'
advance_tag(request)[source]

Advance to the next tag.

Parameters

request (MPIRequest, MPIMultiRequest) – Request advancing the tag.

bind()[source]

Bind to random queue if address is generate.

cache_tag(request)[source]

Store a tag for an uncompleted request.

Parameters

request (MPIRequest, MPIMultiRequest) – Request to cache.

cancel_requests()[source]

Cancel requests that have not yet been completed.

confirm_recv(noblock=False)[source]

Confirm that message was received.

confirm_send(noblock=False)[source]

Confirm that sent message was received.

property create_work_comm_kwargs

Keyword arguments for a new work comm.

Type

dict

classmethod format_address(ranks, tag_start, tag_stride)[source]

Format an MPI address.

Parameters
  • ranks (tuple) – Ranks of the partner MPI processes.

  • tag_start (int) – Tag that the comm starts at.

  • tag_stride (int) – Tag that the comm advances by.

Returns

Formatted address.

Return type

str

property get_response_comm_kwargs

Keyword arguments to use for a response comm.

Type

dict

get_tag(rank=None)[source]

Get the next tag for a rank.

Parameters

rank (int) – Rank to get tag for.

Returns

Tag that should be used next for the rank.

Return type

int

property is_open

True if the queue is not None.

Type

bool

property model_env

Mapping between model name and opposite comm environment variables that need to be provided to the model.

Type

dict

property n_msg_recv

Number of messages in the queue to recv.

Type

int

property n_msg_send

Number of messages in the queue to send.

Type

int

next_rank()[source]

Get the rank that should be used next.

open()[source]

Open the queue.

property opp_address

Address for opposite comm.

Type

str

classmethod parse_address(address)[source]

Parse an MPI address for information about the partner process ranks and how the tags should be iterated.

Parameters

address (str) – Address to parse.

Returns

The ranks, starting tag, and tag stride contained in the

address.

Return type

tuple

purge()[source]

Purge all messages from the comm.

recv_message(*args, **kwargs)[source]

Receive a message.

Parameters
  • *args – Arguments are passed to the forked comm’s recv_message method.

  • **kwargs – Keyword arguments are passed to the forked comm’s recv_message method.

Returns

Received message.

Return type

CommMessage

send_message(msg, **kwargs)[source]

Send a message encapsulated in a CommMessage object.

Parameters
  • msg (CommMessage) – Message to be sent.

  • **kwargs – Additional keyword arguments are passed to _safe_send.

Returns

Success or failure of send.

Return type

bool

property tag

Tag for the next message.

Type

int

class yggdrasil.communication.MPIComm.MPIMultiRequest(*args, **kwargs)[source]

Bases: MPIRequest

Container for MPI request for multiple partner comms.

cancel()[source]

Cancel a request.

property complete

True if the request has been completed, False otherwise.

Type

bool

make_request(previous_requests=None)[source]

Complete a request.

remainder
class yggdrasil.communication.MPIComm.MPIRequest(comm, direction, address, tag, **kwargs)[source]

Bases: object

Container for MPI request.

address
cancel()[source]

Cancel a request.

comm
property complete

True if the request has been completed, False otherwise.

Type

bool

property data

Data returned by a request.

Type

object

direction
make_request(payload=None)[source]

Complete a request.

req
size
size_req
tag

yggdrasil.communication.MatFileComm module

class yggdrasil.communication.MatFileComm.MatFileComm(*args, **kwargs)[source]

Bases: FileComm

Class for handling I/O from/to a Matlab .mat file on disk.

Parameters
  • name (str) – The environment variable where file path is stored.

  • **kwargs – Additional keywords arguments are passed to parent class.

yggdrasil.communication.NetCDFFileComm module

class yggdrasil.communication.NetCDFFileComm.NetCDFFileComm(*args, **kwargs)[source]

Bases: FileComm

Class for handling I/O from/to an netCDF file.

Parameters
  • read_attributes (bool, optional) – If True, the attributes are read in as well as the variables. Defaults to False.

  • variables (list, optional) – List of variables to read in. If not provided, all variables will be read.

  • version (int, optional) – Version of netCDF format that should be used. Defaults to 1. Options are 1 (classic format) and 2 (64-bit offset format).

  • **kwargs – Additional keywords arguments are passed to parent class.

property concats_as_str

True if concatenating file contents result in a valid file.

Type

bool

deserialize(msg, **kwargs)[source]

Don’t deserialize for netCDF since using a serializer is inefficient.

file_flush()[source]

Flush the file.

file_seek(pos, whence=0)[source]

Move in the file to the specified position.

Parameters
  • pos (int) – Position (in bytes) to move file to.

  • whence (int, optional) – Flag indicating position that pos is relative to. 0 for the beginning of the file, 1 for from the current location, and 2 from the end of the file.

property file_size

Current size of file.

Type

int

file_tell()[source]

int: Current position in the file.

classmethod get_testing_options()[source]

Method to return a dictionary of testing options for this class.

Returns

Dictionary of variables to use for testing. Key/value pairs:
kwargs (dict): Keyword arguments for comms tested with the

provided content.

send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test

file that was sent the messages in ‘send’.

contents (bytes): Bytes contents of test file created by sending

the messages in ‘send’.

Return type

dict

property remaining_bytes

Remaining bytes in the file.

Type

int

serialize(obj, **kwargs)[source]

Don’t serialize for netCDF since using a serializer is inefficient.

transform_type_recv(x)[source]
transform_type_send(x)[source]

yggdrasil.communication.ObjFileComm module

class yggdrasil.communication.ObjFileComm.ObjFileComm(*args, **kwargs)[source]

Bases: PlyFileComm

Class for handling I/O from/to a .obj file on disk.

Parameters
  • name (str) – The environment variable where communication address is stored.

  • **kwargs – Additional keywords arguments are passed to parent class.

yggdrasil.communication.PandasFileComm module

class yggdrasil.communication.PandasFileComm.PandasFileComm(*args, **kwargs)[source]

Bases: AsciiTableComm

Class for handling I/O from/to a pandas csv file on disk.

Parameters
  • name (str) – The environment variable where communication address is stored.

  • delimiter (str, optional) – String that should be used to separate columns. Defaults to ‘t’.

  • **kwargs – Additional keywords arguments are passed to parent class.

yggdrasil.communication.PickleFileComm module

class yggdrasil.communication.PickleFileComm.PickleFileComm(*args, **kwargs)[source]

Bases: FileComm

Class for handling I/O from/to a pickled file on disk.

Parameters
  • name (str) – The environment variable where file path is stored.

  • **kwargs – Additional keywords arguments are passed to parent class.

yggdrasil.communication.PlyFileComm module

class yggdrasil.communication.PlyFileComm.PlyFileComm(*args, **kwargs)[source]

Bases: FileComm

Class for handling I/O from/to a .ply file on disk.

Parameters
  • name (str) – The environment variable where communication address is stored.

  • **kwargs – Additional keywords arguments are passed to parent class.

yggdrasil.communication.RESTComm module

class yggdrasil.communication.RESTComm.RESTComm(*args, **kwargs)[source]

Bases: CommBase

Class for handling I/O via a RESTful API. The provided address should be an HTTP address to a server running a flask app that has been equipped to respond to send/receive calls via the add_comm_server_to_app method.

Parameters
  • params (dict, optional) – Parameters that should be passed via URL. Defaults to None and is ignored.

  • cookies (dict, optional) – Cookies to send to the server. Defaults to None and is ignored.

  • **kwargs – Additional keyword arguments will be passed to the base class.

atexit()[source]

Close operations.

bind(*args, **kwargs)[source]

Bind to address based on information provided.

property is_open

True if the connection is open.

Type

bool

property n_msg_recv

The number of incoming messages in the connection.

Type

int

property n_msg_send

The number of outgoing messages in the connection.

Type

int

open(*args, **kwargs)[source]

Open the connection.

opp_comm_kwargs(for_yaml=False)[source]

Get keyword arguments to initialize communication with opposite comm object.

Parameters

for_yaml (bool, optional) – If True, the returned dict will only contain values that can be specified in a YAML file. Defaults to False.

Returns

Keyword arguments for opposite comm object.

Return type

dict

purge()[source]

Purge all messages from the comm.

yggdrasil.communication.RESTComm.add_comm_server_to_app(app)[source]

Add methods for handling send/receive calls server-side.

Parameters

app (flask.Flask) – Flask app to add methods to.

yggdrasil.communication.RMQAsyncComm module

class yggdrasil.communication.RMQAsyncComm.RMQAsyncComm(*args, **kwargs)[source]

Bases: RMQComm

Class for handling asynchronous RabbitMQ communications. It is not recommended to use this class as it can leave hanging threads if not closed propertly. The normal RMQComm will cover most use cases.

Parameters
  • name (str) – The environment variable where the comm address is stored.

  • dont_open (bool, optional) – If True, the connection will not be opened. Defaults to False.

  • **kwargs – Additional keyword arguments are passed to CommBase.

times_connected

Number of times that this connections has been established.

Type

int

rmq_thread

Thread used to run IO loop.

Type

multitasking.YggTask

add_on_cancel_callback()[source]
add_on_channel_close_callback()[source]

This method tells pika to call the on_channel_closed method if RabbitMQ unexpectedly closes the channel.

atexit()[source]

Close operations.

bind()[source]

Declare queue to get random new queue.

check_for_close()[source]

bool: Check if close has been called from the main process.

close_channel()[source]

Close the channel if it exists.

close_connection(*args, **kwargs)[source]

Close the connection.

connect()[source]

Establish the connection.

enable_delivery_confirmations()[source]

Turn on delivery confirmations.

property is_open

True if the connection and channel are open.

Type

bool

property n_msg_recv

Number of messages in the queue.

Type

int

property n_msg_send

Number of messages in the queue.

Type

int

new_run_thread(name=None)[source]

Get a new thread for running.

on_basic_qos_ok(unused_frame)[source]

Actions to perform one the qos is set.

on_bindok(unused_frame, userdata)[source]

Actions to perform once the queue is succesfully bound. Start consuming messages.

on_cancelok(unused_frame, userdata)[source]

Actions to perform after succesfully cancelling consumption. Closes the channel.

on_channel_closed(channel, reason)[source]

Actions to perform when the channel is closed. Close the connection.

on_channel_closeok(_unused_frame)[source]
on_channel_open(channel)[source]

Actions to perform after a channel is opened. Add the channel close callback and setup the exchange.

on_connection_closed(connection, reason)[source]

Actions that must be taken when the connection is closed. Set the channel to None. If the connection is meant to be closing, stop the IO loop. Otherwise, wait 5 seconds and try to reconnect.

on_connection_open(connection)[source]

Actions that must be taken when the connection is opened. Add the close connection callback and open the RabbitMQ channel.

on_connection_open_error(unused_connection, err)[source]

Actions that must be taken when the connection fails to open.

on_consumer_cancelled(method_frame)[source]
on_delivery_confirmation(method_frame)[source]

Actions performed when a sent message is confirmed.

on_exchange_declareok(unused_frame, userdata)[source]

Actions to perform once an exchange is succesfully declared. Set up the queue.

on_message(_unused_channel, basic_deliver, properties, body)[source]

Buffer received messages.

on_queue_declareok(method_frame, userdata)[source]

Actions to perform once the queue is succesfully declared. Bind the queue.

open_channel()[source]

Open a RabbitMQ channel.

publish_message()[source]

Publish the next message in the list.

reconnect()[source]

Try to re-establish a connection and resume a new IO loop.

reset_for_reconnection()[source]

Reset variables in preparation for reconnection.

property rmq_lock

Lock associated with RMQ ioloop thread.

run_thread()[source]

Connect to the connection and begin the IO loop.

set_qos()[source]
setup_exchange(exchange_name)[source]

Setup the exchange.

setup_queue(queue_name)[source]

Set up the message queue.

start_consuming()[source]
start_publishing()[source]

Enable confirmations and begin sending messages.

start_run_thread()[source]

Start the run thread and wait for it to finish.

stop(call_on_thread=False)[source]

Stop the ioloop.

stop_consuming()[source]
class yggdrasil.communication.RMQAsyncComm.RMQTaskLoop(*args, **kwargs)[source]

Bases: YggTaskLoop

Task loop for RMQ consumer.

atexit()[source]

Actions performed when python exits.

yggdrasil.communication.RMQComm module

class yggdrasil.communication.RMQComm.RMQComm(*args, **kwargs)[source]

Bases: CommBase

Class for handling basic RabbitMQ communications.

connection

RabbitMQ connection.

Type

pika.Connection

channel

RabbitMQ channel.

Type

pika.Channel

Raises

RuntimeError – If a connection cannot be established.

Developer Notes:

It is not advised that new language implement a RabbitMQ communication interface. Rather RMQ communication is included explicitly for connections between models that are not co-located on the same machine and are used by the yggdrasil framework connections on the Python side.

address_description = 'AMPQ queue address of the form ``<url>_RMQPARAM_<exchange>_RMQPARAM_<queue>`` where ``url`` is the broker address (see explanation `here <https://pika.readthedocs.io/en/stable/examples/using_urlparameters.html>`_), ``exchange`` is the name of the exchange on the queue that should be used, and ``queue`` is the name of the queue.'
atexit()[source]

Close operations.

bind()[source]

Declare queue to get random new queue.

close_channel()[source]

Close the channel if it exists.

close_connection(*args, **kwargs)[source]

Close the connection.

close_queue(skip_unbind=False)[source]

Close the queue if the channel exists.

property create_work_comm_kwargs

Keyword arguments for a new work comm.

Type

dict

property exchange

AMQP exchange.

Type

str

get_queue_result()[source]

Get the fram from passive queue declare.

property get_work_comm_kwargs

Keyword arguments for an existing work comm.

Type

dict

property is_confirmed_recv

True if all received messages have been confirmed.

Type

bool

property is_confirmed_send

True if all sent messages have been confirmed.

Type

bool

property is_open

True if the connection and channel are open.

Type

bool

property n_msg_recv

Number of messages in the queue.

Type

int

property n_msg_send

Number of messages in the queue.

Type

int

classmethod new_comm_kwargs(name, user=None, password=None, host=None, virtual_host=None, port=None, exchange=None, queue='', **kwargs)[source]

Initialize communication with new connection.

Parameters
  • name (str) – Name of new connection.

  • user (str, optional) – RabbitMQ server username. Defaults to config option ‘user’ in section ‘rmq’ if it exists and ‘guest’ if it does not.

  • password (str, optional) – RabbitMQ server password. Defaults to config option ‘password’ in section ‘rmq’ if it exists and ‘guest’ if it does not.

  • host (str, optional) – RabbitMQ server host. Defaults to config option ‘host’ in section ‘rmq’ if it exists and _localhost if it does not. If _localhost, the output of socket.gethostname() is used.

  • virtual_host (str, optional) – RabbitMQ server virtual host. Defaults to config option ‘vhost’ in section ‘rmq’ if it exists and ‘/’ if it does not.

  • port (str, optional) – Port on host to use. Defaults to config option ‘port’ in section ‘rmq’ if it exists and ‘5672’ if it does not.

  • exchange (str, optional) – RabbitMQ exchange. Defaults to config option ‘namespace’ in section ‘rmq’ if it exits and ‘’ if it does not.

  • queue (str, optional) – Name of the queue that messages will be send to or received from. If an empty string, the queue will be a random string and exclusive to a receiving comm. Defaults to ‘’.

  • **kwargs – Additional keywords arguments are returned as keyword arguments for the new comm.

Returns

Arguments and keyword arguments for new comm.

Return type

tuple(tuple, dict)

opp_comm_kwargs(for_yaml=False)[source]

Get keyword arguments to initialize communication with opposite comm object.

Parameters

for_yaml (bool, optional) – If True, the returned dict will only contain values that can be specified in a YAML file. Defaults to False.

Returns

Keyword arguments for opposite comm object.

Return type

dict

purge()[source]

Remove all messages from the associated queue.

property queue

AMQP queue.

Type

str

property url

AMQP server address.

Type

str

class yggdrasil.communication.RMQComm.RMQServer(*args, **kwargs)[source]

Bases: CommServer

RMQ server object for cleaning up server connections.

terminate(*args, **kwargs)[source]

Also set break flag.

yggdrasil.communication.RMQComm.check_rmq_server(url=None, **kwargs)[source]

Check that connection to a RabbitMQ server is possible.

Parameters
  • url (str, optional) – Address of RMQ server that includes all the necessary information. If this is provided, the remaining arguments are ignored. Defaults to None and the connection parameters are taken from the other arguments.

  • username (str, optional) – RMQ server username. Defaults to config value.

  • password (str, optional) – RMQ server password. Defaults to config value.

  • host (str, optional) – RMQ hostname or IP Address to connect to. Defaults to config value.

  • port (str, optional) – RMQ server TCP port to connect to. Defaults to config value.

  • vhost (str, optional) – RMQ virtual host to use. Defaults to config value.

Returns

True if connection to RabbitMQ server is possible, False

otherwise.

Return type

bool

yggdrasil.communication.RMQComm.get_rmq_parameters(url=None, user=None, username=None, password=None, host=None, virtual_host=None, vhost=None, port=None, exchange=None, queue='')[source]

Get RabbitMQ connection parameters.

Parameters
  • url (str, optional) – Address of RMQ server that includes all the necessary information. If this is provided, the remaining arguments are ignored. Defaults to None and the connection parameters are taken from the other arguments.

  • user (str, optional) – RabbitMQ server username. Defaults to config option ‘user’ in section ‘rmq’ if it exists and ‘guest’ if it does not.

  • username (str, optional) – Alias for user.

  • password (str, optional) – RabbitMQ server password. Defaults to config option ‘password’ in section ‘rmq’ if it exists and ‘guest’ if it does not.

  • host (str, optional) – RabbitMQ server host. Defaults to config option ‘host’ in section ‘rmq’ if it exists and _localhost if it does not. If _localhost, the output of socket.gethostname() is used.

  • virtual_host (str, optional) – RabbitMQ server virtual host. Defaults to config option ‘vhost’ in section ‘rmq’ if it exists and ‘/’ if it does not.

  • vhost (str, optional) – Alias for virtual_host.

  • port (str, optional) – Port on host to use. Defaults to config option ‘port’ in section ‘rmq’ if it exists and ‘5672’ if it does not.

  • exchange (str, optional) – RabbitMQ exchange. Defaults to config option ‘namespace’ in section ‘rmq’ if it exits and ‘’ if it does not.

  • queue (str, optional) – Name of the queue that messages will be send to or received from. If an empty string, the queue will be a random string and exclusive to a receiving comm. Defaults to ‘’.

Returns

The connection url, exchange, & queue.

Return type

tuple

yggdrasil.communication.ServerComm module

class yggdrasil.communication.ServerComm.Request(response_address, request_id)[source]

Bases: object

request_id
response_address
class yggdrasil.communication.ServerComm.ServerComm(*args, **kwargs)[source]

Bases: CommBase

Class for handling Server side communication.

Parameters
  • name (str) – The environment variable where communication address is stored.

  • request_commtype (str, optional) – Comm class that should be used for the request comm. Defaults to None.

  • response_kwargs (dict, optional) – Keyword arguments for the response comm. Defaults to empty dict.

  • direct_connection (bool, optional) – If True, the comm will be directly connected to a ServerComm. Defaults to False.

  • **kwargs – Additional keywords arguments are passed to the input comm.

response_kwargs

Keyword arguments for the response comm.

Type

dict

icomm

Request comm.

Type

Comm

ocomm

Response comms for each request.

Type

OrderedDict

property all_clients_connected

True if all expected clients have connected. False otherwise.

Type

bool

close(*args, **kwargs)[source]

Close the connection.

property close_on_eof_recv

True if the comm will close when EOF is received.

Type

bool

create_response_comm(header)[source]

Create a response comm based on information from the last header.

disconnect(*args, **kwargs)[source]

Disconnect the comm.

drain_messages(direction='recv', **kwargs)[source]

Sleep while waiting for messages to be drained.

drain_server_signon_messages(**kwargs)[source]

Drain server signon messages. This should only be used for testing purposes.

property filter

filter for the communicator.

Type

FilterBase

finalize_message(msg, **kwargs)[source]

Perform actions to decipher a message.

Parameters
  • msg (CommMessage) – Initial message object to be finalized.

  • **kwargs – Keyword arguments are passed to the request comm’s finalize_message method.

Returns

Deserialized and annotated message.

Return type

CommMessage

get_status_message(nindent=0, **kwargs)[source]

Return lines composing a status message.

Parameters
  • nindent (int, optional) – Number of tabs that should be used to indent each line. Defaults to 0.

  • *kwargs – Additional arguments are passed to the parent class’s method.

Returns

Lines composing the status message and the

prefix string used for the last message.

Return type

tuple(list, prefix)

property is_closed

True if the connection is closed.

Type

bool

classmethod is_installed(language=None)[source]

Determine if the necessary libraries are installed for this communication class.

Parameters

language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked.

Returns

Is the comm installed.

Return type

bool

property is_open

True if the connection is open.

Type

bool

property maxMsgSize

Maximum size of a single message that should be sent.

Type

int

property n_msg_direct

Number of messages currently being routed.

Type

int

property n_msg_recv

The number of messages in the connection.

Type

int

property n_msg_recv_drain

The number of messages in the connection to drain.

Type

int

classmethod new_comm_kwargs(name, request_commtype=None, **kwargs)[source]

Initialize communication with new comms.

Parameters
  • name (str) – Name for new comm.

  • request_commtype (str, optional) – Name of class for new input comm. Defaults to None.

open()[source]

Open the connection.

property open_clients

Available open clients.

Type

list

opp_comm_kwargs(for_yaml=False)[source]

Get keyword arguments to initialize communication with opposite comm object.

Parameters

for_yaml (bool, optional) – If True, the returned dict will only contain values that can be specified in a YAML file. Defaults to False.

Returns

Keyword arguments for opposite comm object.

Return type

dict

property opp_comms

Name/address pairs for opposite comms.

Type

dict

prepare_message(*args, **kwargs)[source]

Perform actions preparing to send a message.

Parameters
  • *args – Components of the outgoing message.

  • **kwargs – Keyword arguments are passed to the request comm’s prepare_message method.

Returns

Serialized and annotated message.

Return type

CommMessage

purge()[source]

Purge input and output comms.

recv_from(*args, **kwargs)[source]

Receive a message from the input comm and open a new response comm for output using address from the header, returning the response_id.

Parameters
  • *args – Arguments are passed to input comm recv method.

  • **kwargs – Keyword arguments are passed to input comm recv method.

Returns

Success or failure of recv call,

output from input comm recv method, and response_id that response should be sent to.

Return type

tuple(bool, obj, str)

recv_message(*args, **kwargs)[source]

Receive a message.

Parameters
  • *args – Arguments are passed to the request comm’s recv_message method.

  • **kwargs – Keyword arguments are passed to the request comm’s recv_message method.

Returns

Received message.

Return type

CommMessage

rpcRecv(*args, **kwargs)[source]

Alias for RPCComm.recv

rpcSend(*args, **kwargs)[source]

Alias for RPCComm.send

send_message(msg, **kwargs)[source]

Send a message encapsulated in a CommMessage object.

Parameters
  • msg (CommMessage) – Message to be sent.

  • **kwargs – Additional keyword arguments are passed to the response comm’s send_message method.

Returns

Success or failure of send.

Return type

bool

send_to(response_id, *args, **kwargs)[source]

Send a message to a specific response comm.

Parameters
  • response_id (str) – ID used to register the response comm.

  • *args – Arguments are passed to output comm send method.

  • **kwargs – Keyword arguments are passed to output comm send method.

Returns

Output from output comm send method.

Return type

obj

yggdrasil.communication.ValueComm module

class yggdrasil.communication.ValueComm.ValueComm(*args, **kwargs)[source]

Bases: CommBase

classmethod get_testing_options(**kwargs)[source]

Method to return a dictionary of testing options for this class.

Parameters

serializer (str, optional) – The name of the serializer that should be used. If not provided, the _default_serializer class attribute will be used.

Returns

Dictionary of variables to use for testing. Key/value pairs:
kwargs (dict): Keyword arguments for comms tested with the

provided content.

send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test

file that was sent the messages in ‘send’.

contents (bytes): Bytes contents of test file created by sending

the messages in ‘send’.

Return type

dict

classmethod is_installed(language=None)[source]

Determine if the necessary libraries are installed for this communication class.

Parameters

language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked. If set to ‘any’, the result will be True if this comm is installed for any of the supported languages.

Returns

Is the comm installed.

Return type

bool

property is_open

True if the connection is open.

Type

bool

property n_msg_recv

The number of incoming messages in the connection.

Type

int

no_serialization = True
open()[source]

Open the connection.

purge()[source]

Purge all messages from the comm.

send(*args, **kwargs)[source]

Send a message.

yggdrasil.communication.WOFOSTParamFileComm module

class yggdrasil.communication.WOFOSTParamFileComm.WOFOSTParamFileComm(*args, **kwargs)[source]

Bases: FileComm

Class for handling I/O from/to a WOFOST parameter file on disk.

yggdrasil.communication.YAMLFileComm module

class yggdrasil.communication.YAMLFileComm.YAMLFileComm(*args, **kwargs)[source]

Bases: FileComm

Class for handling I/O from/to a YAML file on disk.

Parameters
  • name (str) – The environment variable where file path is stored.

  • **kwargs – Additional keywords arguments are passed to parent class.

yggdrasil.communication.ZMQComm module

class yggdrasil.communication.ZMQComm.ZMQComm(*args, **kwargs)[source]

Bases: CommBase

Class for handling I/O using ZeroMQ sockets.

Parameters
  • name (str) – The environment variable where the socket address is stored.

  • context (zmq.Context, optional) – ZeroMQ context that should be used. Defaults to None and the global context is used.

  • socket_type (str, optional) – The type of socket that should be created. Defaults to _default_socket_type. See zmq for all options.

  • socket_action (str, optional) – The action that the socket should perform. Defaults to action based on the direction (‘connect’ for ‘recv’, ‘bind’ for ‘send’.)

  • topic_filter (str, optional) – Message filter to use when subscribing. This is only used for ‘SUB’ socket types. Defaults to ‘’ which is all messages.

  • dealer_identity (str, optional) – Identity that should be used to route messages to a dealer socket. Defaults to ‘0’.

  • **kwargs – Additional keyword arguments are passed to :class:.CommBase.

context

ZeroMQ context that will be used.

Type

zmq.Context

socket

ZeroMQ socket.

Type

zmq.Socket

socket_type_name

The type of socket that should be created.

Type

str

socket_type

ZeroMQ socket type.

Type

int

socket_action

The action that the socket should perform.

Type

str, optional

topic_filter

Message filter to use when subscribing.

Type

str

dealer_identity

Identity that should be used to route messages to a dealer socket.

Type

str

Developer Notes:

yggdrasil uses the tcp transport by default with a PAIR socket type. For every connection, yggdrasil establishes a second request/reply connection that is used to confirm messages passed between the primary PAIR of sockets. On the first send, the model should create a REP socket on an open tcp address and send that address in the header of the first message under the key ‘zmq_reply’. Receiving models should check message headers for this key and, on receipt, establish the partner REQ socket with the specified address (receiving comms can receive from more than one source so they can have more than one request addresses at at time for this purpose). Following every message, the sending model should wait for a message on the reply socket and, on receipt, return the message. Following every message, the receiving model should send the message ‘YGG_REPLY’ on the request socket and wait for a reply. When creating worker comms for sending large messages, the sending model should create the reply comm for the worker in advanced and send it in the header with the worker address under the key ‘zmq_reply_worker’.

address_description = 'A ZeroMQ endpoint of the form <transport>://<address>, where the format of address depends on the transport. Additional information can be found `here <http://api.zeromq.org/3-2:zmq-bind>`_.'
property address_param

Address parameters.

Type

dict

bind()[source]

Bind to address, getting random port as necessary.

check_reply_socket_recv(msg)[source]

Check incoming message for reply address.

Parameters

msg (str) – Incoming message to check.

Returns

Messages with reply address removed if present.

Return type

str

check_reply_socket_send(msg)[source]

Append reply socket address if it

Parameters

msg (str) – Message that will be piggy backed on.

Returns

Message with reply address if it has not been sent.

Return type

str

classmethod close_registry_entry(value)[source]

Close a registry entry.

confirm_recv(noblock=False)[source]

Confirm that message was received.

confirm_send(noblock=False)[source]

Confirm that sent message was received.

connect()[source]

Connect to address.

property create_work_comm_kwargs

Keyword arguments for a new work comm.

Type

dict

disconnect_socket(dont_close=False)[source]

Disconnect from address.

drain_server_signon_messages(**kwargs)[source]

Drain server signon messages. This should only be used for testing purposes.

get_status_message(nindent=0, **kwargs)[source]

Return lines composing a status message.

Parameters
  • nindent (int, optional) – Number of tabs that should be used to indent each line. Defaults to 0.

  • **kwargs – Additional keyword arguments are passed to the parent class’s method.

Returns

Lines composing the status message and the

prefix string used for the last message.

Return type

tuple(list, prefix)

property get_work_comm_kwargs

Keyword arguments for an existing work comm.

Type

dict

header2workcomm(header, **kwargs)[source]

Get a work comm based on header info.

Parameters
  • header (dict) – Information that will be sent in the message header to the work comm.

  • **kwargs – Additional keyword arguments are passed to the parent method.

Returns

class:.CommBase: Work comm.

property host

Host that socket is connected to.

Type

str

property is_confirmed_recv

True if all received messages have been confirmed.

Type

bool

is_message(flags)[source]

Poll the socket for a message.

Parameters

flags (int) – ZMQ poll flags.

Returns

True if there is a message matching the flags, False otherwise.

Return type

bool

property is_open

True if the socket is open.

Type

bool

property n_msg_recv

The number of incoming messages in the connection.

Type

int

property n_msg_send

The number of outgoing messages in the connection.

Type

int

classmethod new_comm_kwargs(name, protocol=None, host=None, port=None, **kwargs)[source]

Initialize communication with new queue.

Parameters
  • name (str) – Name of new socket.

  • protocol (str, optional) – The protocol that should be used. Defaults to None and is set to _default_protocol. See zmq for details.

  • host (str, optional) – The host that should be used. Invalid for ‘inproc’ protocol. Defaults to ‘localhost’.

  • port (int, optional) – The port used. Invalid for ‘inproc’ protocol. Defaults to None and a random port is choosen.

  • **kwargs – Additional keywords arguments are returned as keyword arguments for the new comm.

Returns

Arguments and keyword arguments for new socket.

Return type

tuple(tuple, dict)

open()[source]

Open connection by binding/connect to the specified socket.

property opp_address

Address for opposite comm.

Type

str

opp_comm_kwargs(for_yaml=False)[source]

Get keyword arguments to initialize communication with opposite comm object.

Parameters

for_yaml (bool, optional) – If True, the returned dict will only contain values that can be specified in a YAML file. Defaults to False.

Returns

Keyword arguments for opposite comm object.

Return type

dict

property port

Port that socket is connected to.

Type

str

prepare_message(*args, **kwargs)[source]

Perform actions preparing to send a message.

Parameters
  • *args – Components of the outgoing message.

  • **kwargs – Keyword arguments are passed to the parent class’s method.

Returns

Serialized and annotated message.

Return type

CommMessage

property protocol

Protocol that socket uses.

Type

str

recv(*args, **kwargs)[source]

Receive a message.

property registry_key

String used to register the socket.

Type

str

property reply_thread

Task that will handle sending or receiving backlogged messages.

Type

tools.YggTask

send(*args, **kwargs)[source]

Send a message.

server_exists(srv_address)[source]

Determine if a server exists.

Parameters

srv_address (str) – Address of server comm.

Returns

True if a server with the provided address exists, False

otherwise.

Return type

bool

set_reply_socket_recv(address)[source]

Set the recv reply socket if the address dosn’t exist.

set_reply_socket_send()[source]

Set the send reply socket if it dosn’t exist.

unbind(dont_close=False)[source]

Unbind from address.

workcomm2header(work_comm, **kwargs)[source]

Get header information from a comm.

Parameters
  • ( (work_comm) – class:.CommBase): Work comm that header describes.

  • **kwargs – Additional keyword arguments are added to the header.

Returns

Header information that will be sent with a message.

Return type

dict

class yggdrasil.communication.ZMQComm.ZMQProxy(*args, **kwargs)[source]

Bases: CommServer

Start a proxy in a new thread for a server address. A client-side address will be randomly generated.

Parameters
  • srv_address (str) – Address that should face the server(s).

  • context (zmq.Context, optional) – ZeroMQ context that should be used. Defaults to None and the global context is used.

  • protocol (str, optional) – Protocol that should be used for the sockets. Defaults to None and is set to _default_protocol.

  • host (str, optional) – Host for socket address. Defaults to ‘localhost’.

  • retry_timeout (float, optional) – Time (in seconds) that should be waited before retrying to bind the sockets to the addresses. If negative, a retry will not be attempted and an error will be raised. Defaults to -1.

  • nretry (int, optional) – Number of times to try binding the sockets to the addresses. Defaults to 1.

  • **kwargs – Additional keyword arguments are passed to the parent class.

srv_address

Address that faces the server(s).

Type

str

cli_address

Address that faces the client(s).

Type

str

context

ZeroMQ context that will be used.

Type

zmq.Context

srv_socket

Socket facing client(s).

Type

zmq.Socket

cli_socket

Socket facing server(s).

Type

zmq.Socket

cli_count

Number of clients that have connected to this proxy.

Type

int

after_loop()[source]

Close sockets after the loop finishes.

cleanup()[source]

Clean up sockets on exit.

client_recv()[source]

Receive single message from the client.

close_sockets()[source]

Close the sockets.

poll()[source]

Check if the process is finished and return the return code if it is.

run_loop()[source]

Forward messages from client to server.

server_send(msg)[source]

Send single message to the server.

server_signon_msg = b'ZMQ_SERVER_SIGNING_ON::'
yggdrasil.communication.ZMQComm.bind_socket(socket, address, retry_timeout=- 1, nretry=1)[source]

Bind a socket to an address, getting a random port as necessary.

Parameters
  • socket (zmq.Socket) – Socket that should be bound.

  • address (str) – Address that socket should be bound to.

  • retry_timeout (float, optional) – Time (in seconds) that should be waited before retrying to bind the socket to the address. If negative, a retry will not be attempted and an error will be raised. Defaults to -1.

  • nretry (int, optional) – Number of times to try binding the socket to the addresses. Defaults to 1.

Returns

Address that socket was bound to, including random port if one

was used.

Return type

str

yggdrasil.communication.ZMQComm.create_socket(context, socket_type)[source]

Create a socket w/ some default options to improve cleanup.

Parameters
  • context (zmq.Context) – ZeroMQ context.

  • socket_type (int) – ZeroMQ socket type.

Returns

New socket.

Return type

zmq.Socket

yggdrasil.communication.ZMQComm.format_address(protocol, host, port=None)[source]

Format an address based on its parts.

Parameters
  • protocol (str) – Communication protocol that should be used.

  • host (str) – Host that address should point to.

  • port (int, optional) – Port that address should point to. Defaults to None and is not added to the address.

Returns

Complete address.

Return type

str

Raises

ValueError – If the protocol is not recognized.

yggdrasil.communication.ZMQComm.get_ipc_host()[source]

Get an IPC host using uuid.

Returns

File path for IPC transport created using uuid.

Return type

str

yggdrasil.communication.ZMQComm.get_socket_type_mate(t_in)[source]

Find the counterpart socket type.

Parameters

t_in (str) – Socket type.

Returns

Counterpart socket type.

Return type

str

Raises

ValueError – If t_in is not a recognized socket type.

yggdrasil.communication.ZMQComm.parse_address(address)[source]

Split an address into its parts.

Parameters

address (str) – Address to be split.

Returns

Parameters extracted from the address.

Return type

dict

Raises
  • ValueError – If the address dosn’t contain ‘://’.

  • ValueError – If the protocol is not supported.

yggdrasil.communication.ZMQComm.set_context_opts(context)[source]

Module contents

yggdrasil.communication.get_comm(name, **kwargs)[source]

Return communicator for existing comm components.

Parameters
  • name (str) – Communicator name.

  • **kwargs – Additional keyword arguments are passed to new_comm.

Returns

Communicator of given class.

Return type

Comm

yggdrasil.communication.new_comm(name, commtype=None, use_async=False, **kwargs)[source]

Return a new communicator, creating necessary components for communication (queues, sockets, channels, etc.).

Parameters
  • name (str) – Communicator name.

  • commtype (str, list, optional) – Name of communicator type or a list of specifiers for multiple communicators that should be joined. Defaults to None.

  • use_async (bool, optional) – If True, send/recv operations will be performed asynchronously on new threads. Defaults to False.

  • **kwargs – Additional keyword arguments are passed to communicator class method new_comm.

Returns

Communicator of given class.

Return type

Comm