yggdrasil.communication package¶
Subpackages¶
- yggdrasil.communication.filters package
- yggdrasil.communication.transforms package
- Submodules
- yggdrasil.communication.transforms.ArrayTransform module
ArrayTransform
ArrayTransform.check_array_items()
ArrayTransform.check_element()
ArrayTransform.check_summary()
ArrayTransform.evaluate_transform()
ArrayTransform.get_summary()
ArrayTransform.get_testing_options()
ArrayTransform.set_original_datatype()
ArrayTransform.transform_array_items()
ArrayTransform.transform_datatype()
ArrayTransform.validate_datatype()
- yggdrasil.communication.transforms.DirectTransform module
- yggdrasil.communication.transforms.FilterTransform module
- yggdrasil.communication.transforms.FunctionTransform module
- yggdrasil.communication.transforms.IterateTransform module
- yggdrasil.communication.transforms.MapFieldsTransform module
- yggdrasil.communication.transforms.MapTransform module
- yggdrasil.communication.transforms.PandasTransform module
- yggdrasil.communication.transforms.SelectFieldsTransform module
- yggdrasil.communication.transforms.SelectScalarTransform module
- yggdrasil.communication.transforms.StatementTransform module
- yggdrasil.communication.transforms.TransformBase module
TransformBase
TransformBase.__call__()
TransformBase.call_transform()
TransformBase.evaluate_transform()
TransformBase.set_original_datatype()
TransformBase.set_original_datatype_from_data()
TransformBase.set_transformed_datatype()
TransformBase.set_transformed_datatype_from_data()
TransformBase.transform_datatype()
TransformBase.transformed_datatype
TransformBase.validate_datatype()
- Module contents
Submodules¶
yggdrasil.communication.AsciiFileComm module¶
- class yggdrasil.communication.AsciiFileComm.AsciiFileComm(*args, **kwargs)[source]¶
Bases:
FileComm
Class for handling I/O from/to a file on disk.
- Parameters:
- classmethod get_testing_options(**kwargs)[source]¶
Method to return a dictionary of testing options for this class.
- Returns:
- Dictionary of variables to use for testing. Key/value pairs:
- kwargs (dict): Keyword arguments for comms tested with the
provided content.
send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test
file that was sent the messages in ‘send’.
- contents (bytes): Bytes contents of test file created by sending
the messages in ‘send’.
- Return type:
yggdrasil.communication.AsciiMapComm module¶
yggdrasil.communication.AsciiTableComm module¶
yggdrasil.communication.AsyncComm module¶
- class yggdrasil.communication.AsyncComm.AsyncComm(wrapped, daemon=False, async_recv_method='recv', async_send_method='send_message', async_recv_kwargs=None, async_send_kwargs=None)[source]¶
Bases:
ProxyObject
,ComponentBaseUnregistered
Class for handling asynchronous I/O.
- Parameters:
name (str) – The name of the message queue.
async_recv_method (str, optional) – The method that should be used to receive message into the backlog. Defaults to ‘recv’.
async_send_method (dict, optional) – The method that should be used to send message in the backlog. Defaults to ‘send_message’.
async_recv_kwargs (dict, optional) – Keyword arguments to pass to calls receiving messages into the backlog. Defaults to {}.
async_send_method – Keyword arguments to pass to calls sending message from the backlog. Defaults to {}.
**kwargs – Additional keyword arguments are passed to CommBase.
- backlog_ready¶
Event set when there is a message in the recv backlog.
- Type:
- add_backlog(payload)[source]¶
Add a message to the backlog of messages.
- Parameters:
payload (tuple) – Arguments and keyword argumetns for send or data from receive.
- async_recv_kwargs¶
- async_recv_method¶
- async_send_kwargs¶
- async_send_method¶
- backlog_ready¶
- property backlog_thread¶
Task that will handle sinding or receiving backlogged messages.
- Type:
tools.YggTask
- close(linger=False)[source]¶
Close the connection.
- Parameters:
linger (bool, optional) – If True, drain messages before closing the comm. Defaults to False.
- close_on_eof_recv¶
- daemon¶
- finalize_message(msg, **kwargs)[source]¶
Perform actions to decipher a message.
- Parameters:
msg (CommMessage) – Initial message object to be finalized.
**kwargs – Keyword arguments are passed to the request comm’s finalize_message method.
- Returns:
Deserialized and annotated message.
- Return type:
- next(*args, **kwargs)¶
- pop_backlog()[source]¶
Pop a message from the front of the backlog.
- Returns:
- First backlogged send arguments/keyword arguments
or received data.
- Return type:
- recv(timeout=None, return_message_object=False, dont_finalize=False, **kwargs)[source]¶
Receive a message.
- Parameters:
*args – All arguments are passed to comm _recv method.
return_message_object (bool, optional) – If True, the full wrapped CommMessage message object is returned instead of the tuple. Defaults to False.
dont_finalize (bool, optional) – If True, finalize_message will not be called even if async_recv_method is ‘recv_message’. Defaults to False.
**kwargs – All keywords arguments are passed to comm _recv method.
- Returns:
- Success or failure of receive and received
message.
- Return type:
- recv_message(timeout=None, **kwargs)[source]¶
Receive a message.
- Parameters:
*args – Arguments are passed to the response comm’s recv_message method.
**kwargs – Keyword arguments are passed to the response comm’s recv_message method.
- Returns:
Received message.
- Return type:
- send(*args, dont_prepare=False, **kwargs)[source]¶
Send a message to the backlog.
- Parameters:
*args – All arguments are assumed to be part of the message.
**kwargs – All keywords arguments are passed to comm _send method.
- Returns:
Success or failure of sending the message.
- Return type:
- send_eof(*args, **kwargs)[source]¶
Send the EOF message as a short message.
- Parameters:
*args – All arguments are passed to comm send.
**kwargs – All keywords arguments are passed to comm send.
- Returns:
Success or failure of send.
- Return type:
- send_message(msg, **kwargs)[source]¶
Send a message encapsulated in a CommMessage object.
- Parameters:
msg (CommMessage) – Message to be sent.
**kwargs – Additional keyword arguments are passed to _safe_send.
- Returns:
Success or failure of send.
- Return type:
yggdrasil.communication.BufferComm module¶
- class yggdrasil.communication.BufferComm.BufferComm(*args, **kwargs)[source]¶
Bases:
CommBase
Class for handling I/O to an in-memory buffer.
- Parameters:
name (str) – The name of the message queue.
address (LockedBuffer, optional) – Existing buffer that should be used. If not provided, a new buffer is created.
buffer_task_method (str, optional) – Type of tasks that buffer will be used to share information between. Defaults to ‘thread’.
**kwargs – Additional keyword arguments are passed to CommBase.
- address¶
Buffer containing messages.
- Type:
- classmethod is_installed(language=None)[source]¶
Determine if the necessary libraries are installed for this communication class.
- Parameters:
language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked. If set to ‘any’, the result will be True if this comm is installed for any of the supported languages.
- Returns:
Is the comm installed.
- Return type:
- no_serialization = True¶
yggdrasil.communication.CABOFileComm module¶
yggdrasil.communication.ClientComm module¶
- class yggdrasil.communication.ClientComm.ClientComm(*args, **kwargs)[source]¶
Bases:
CommBase
Class for handling Client side communication.
- Parameters:
name (str) – The environment variable where communication address is stored.
request_commtype (str, optional) – Comm class that should be used for the request comm. Defaults to None.
response_kwargs (dict, optional) – Keyword arguments for the response comm. Defaults to empty dict.
direct_connection (bool, optional) – If True, the comm will be directly connected to a ServerComm. Defaults to False.
**kwargs – Additional keywords arguments are passed to the output comm.
- ocomm¶
Request comm.
- Type:
Comm
- icomm¶
Response comm.
- Type:
Comm
- call(*args, **kwargs)[source]¶
Do RPC call. The request message is sent to the output comm and the response is received from the input comm.
- Parameters:
*args – Arguments are passed to output comm send method.
**kwargs – Keyword arguments are passed to output comm send method
- Returns:
Output from input comm recv method.
- Return type:
obj
- property filter¶
filter for the communicator.
- Type:
- finalize_message(msg, **kwargs)[source]¶
Perform actions to decipher a message.
- Parameters:
msg (CommMessage) – Initial message object to be finalized.
**kwargs – Keyword arguments are passed to the response comm’s finalize_message method.
- Returns:
Deserialized and annotated message.
- Return type:
- get_status_message(nindent=0, **kwargs)[source]¶
Return lines composing a status message.
- Parameters:
nindent (int, optional) – Number of tabs that should be used to indent each line. Defaults to 0.
*kwargs – Additional arguments are passed to the parent class’s method.
- Returns:
- Lines composing the status message and the
prefix string used for the last message.
- Return type:
- classmethod is_installed(language=None)[source]¶
Determine if the necessary libraries are installed for this communication class.
- classmethod new_comm_kwargs(name, request_commtype=None, **kwargs)[source]¶
Initialize communication with new comms.
- opp_comm_kwargs(for_yaml=False)[source]¶
Get keyword arguments to initialize communication with opposite comm object.
- prepare_message(*args, **kwargs)[source]¶
Perform actions preparing to send a message.
- Parameters:
*args – Components of the outgoing message.
**kwargs – Keyword arguments are passed to the request comm’s prepare_message method.
- Returns:
Serialized and annotated message.
- Return type:
- recv_message(*args, **kwargs)[source]¶
Receive a message.
- Parameters:
*args – Arguments are passed to the response comm’s recv_message method.
**kwargs – Keyword arguments are passed to the response comm’s recv_message method.
- Returns:
Received message.
- Return type:
- send_message(msg, **kwargs)[source]¶
Send a message encapsulated in a CommMessage object.
- Parameters:
msg (CommMessage) – Message to be sent.
**kwargs – Additional keyword arguments are passed to the request comm’s send_message method.
- Returns:
Success or failure of send.
- Return type:
yggdrasil.communication.CommBase module¶
- class yggdrasil.communication.CommBase.CommBase(*args, **kwargs)[source]¶
Bases:
YggClass
Class for handling I/O.
- Parameters:
name (str) – The environment variable where communication address is stored.
address (str, optional) – Communication info. Default to None and address is taken from the environment variable.
direction (str, optional) – The direction that messages should flow through the connection. ‘send’ if the connection will send messages, ‘recv’ if the connecton will receive messages. Defaults to ‘send’.
is_interface (bool, optional) – Set to True if this comm is a Python interface binding. Defaults to False.
language (str, optional) – Programming language of the calling model. Defaults to ‘python’.
env (dict, optional) – Environment variable that should be used. Defaults to os.environ if not provided.
partner_model (str, optional) – Name of model that this comm is partnered with. Default to None, indicating that the partner is not a model.
partner_language (str, optional) – Programming language of this comm’s partner comm. Defaults to ‘python’.
partner_mpi_ranks (list, optional) – Ranks of processes of this comm’s partner comm(s). Defaults to [].
datatype (schema, optional) – JSON schema (with expanded core types defined by yggdrasil) that constrains the type of data that should be sent/received by this object. Defaults to {‘type’: ‘bytes’}. Additional information on specifying datatypes can be found here.
field_names (list, optional) – [DEPRECATED] Field names that should be used to label fields in sent/received tables. This keyword is only valid for table-like datatypes. If not provided, field names are created based on the field order.
field_units (list, optional) – [DEPRECATED] Field units that should be used to convert fields in sent/received tables. This keyword is only valid for table-like datatypes. If not provided, all fields are assumed to be unitless.
as_array (bool, optional) – [DEPRECATED] If True and the datatype is table-like, tables are sent/recieved with either columns rather than row by row. Defaults to False.
( (default_file) – class:.DefaultSerialize, optional): Class with serialize and deserialize methods that should be used to process sent and received messages or a dictionary describing a serializer that obeys the serializer schema.
format_str (str, optional) – String that should be used to format/parse messages. Default to None.
dont_open (bool, optional) – If True, the connection will not be opened. Defaults to False.
is_interface – Set to True if this comm is a Python interface binding. Defaults to False.
recv_timeout (float, optional) – Time that should be waited for an incoming message before returning None. Defaults to 0 (no wait). A value of False indicates that recv should block.
close_on_eof_recv (bool, optional) – If True, the comm will be closed when it receives an end-of-file messages. Otherwise, it will remain open. Defaults to True.
close_on_eof_send (bool, optional) – If True, the comm will be closed after it sends an end-of-file messages. Otherwise, it will remain open. Defaults to False.
single_use (bool, optional) – If True, the comm will only be used to send/recv a single message. Defaults to False.
reverse_names (bool, optional) – If True, the suffix added to the comm with be reversed. Defaults to False.
no_suffix (bool, optional) – If True, no directional suffix will be added to the comm name. Defaults to False.
allow_multiple_comms (bool, optional) – If True, initialize the comm such that mulitiple comms can connect to the same address. Defaults to False.
is_client (bool, optional) – If True, the comm is one of many potential clients that will be sending messages to one or more servers. Defaults to False.
is_response_client (bool, optional) – If True, the comm is a client-side response comm. Defaults to False.
is_server (bool, optional) – If True, the commis one of many potential servers that will be receiving messages from one or more clients. Defaults to False.
is_response_server (bool, optional) – If True, the comm is a server-side response comm. Defaults to False.
recv_converter (func, optional) – Converter that should be used on received objects. Defaults to None.
send_converter (func, optional) – Converter that should be used on sent objects. Defaults to None.
vars (list, optional) – Names of variables to be sent/received by this comm. Defaults to [].
length_map (dict, optional) – Map from pointer variable names to the names of variables where their length will be stored. Defaults to {}.
comm (str, optional) – The comm that should be created. This only serves as a check that the correct class is being created. Defaults to None.
( – class:.FilterBase, optional): Filter that will be used to determine when messages should be sent/received. Ignored if not provided.
( – class:.TransformBase, optional): One or more transformations that will be applied to messages that are sent/received. Ignored if not provided.
is_default (bool, optional) – If True, this comm was created to handle all input/output variables to/from a model. Defaults to False. This variable is used internally and should not be set explicitly in the YAML.
outside_loop (bool, optional) – If True, and the comm is an input/outputs to/from a model being wrapped. The receive/send calls for this comm will be outside the loop for the model. Defaults to False.
dont_copy (bool, optional) – If True, the comm will not be duplicated in the even a model is duplicated via the ‘copies’ parameter. Defaults to False except for in the case that a model is wrapped and the comm is inside the loop or that a model is a RPC input to a model server.
( – class:FileComm, optional): Comm information for a file that input should be drawn from (for input comms) or that output should be sent to (for output comms) in the event that a yaml does not pair the comm with another model comm or a file.
default_value (object, optional) – Value that should be returned in the event that a yaml does not pair the comm with another model comm or a file.
for_service (bool, optional) – If True, this comm bridges the gap to an integration running as a service, possibly on a remote machine. Defaults to False.
**kwargs – Additional keywords arguments are passed to parent class.
- Class Attributes:
is_file (bool): True if the comm accesses a file. _maxMsgSize (int): Maximum size of a single message that should be sent. address_description (str): Description of the information constituting
an address for this communication mechanism.
- serializer (
class:.DefaultSerialize): Object that will be used to serialize/deserialize messages to/from python objects.
- close_on_eof_recv¶
If True, the comm will be closed when it receives an end-of-file messages. Otherwise, it will remain open.
- Type:
- close_on_eof_send¶
If True, the comm will be closed after it sends an end-of-file messages. Otherwise, it will remain open.
- Type:
- allow_multiple_comms¶
If True, initialize the comm such that mulitiple comms can connect to the same address.
- Type:
- is_client¶
If True, the comm is one of many potential clients that will be sending messages to one or more servers.
- Type:
- is_server¶
If True, the comm is one of many potential servers that will be receiving messages from one or more clients.
- Type:
- recv_converter¶
Converter that should be used on received objects.
- Type:
func
- send_converter¶
Converter that should be used on sent objects.
- Type:
func
- filter (
class:.FilterBase): Callable class that will be used to determine when messages should be sent/received.
- Raises:
RuntimeError – If the comm class is not installed.
AddressError – If there is not an environment variable with the specified name.
ValueError – If directions is not ‘send’ or ‘recv’.
- add_work_comm(comm)[source]¶
Add work comm to dict.
- Parameters:
( (comm) – class:.CommBase): Comm that should be added.
- Raises:
KeyError – If there is already a comm associated with the key.
- address_description = None¶
- apply_transform(msg_in, for_empty=False, header=False)[source]¶
Evaluate the transform to alter the emssage being sent/received.
- Parameters:
msg_in (object) – Message being transformed.
for_empty (bool, optional) – If True, the transformation is being used to check for an empty message and errors will be caught. Defaults to False.
header (dict, optional) – Header keyword arguments associated with a message. Defaults to False and is ignored.
typedef (dict, optiona) – Type to transform. Default to None and will be determined by the serializer if receiving.
- Returns:
Transformed message.
- Return type:
- close(linger=False, **kwargs)[source]¶
Close the connection.
- Parameters:
linger (bool, optional) – If True, drain messages before closing the comm. Defaults to False.
**kwargs – Additional keyword arguments are passed to linger method if linger is True.
- close_in_thread(no_wait=False, timeout=None)[source]¶
In a new thread, close the comm when it is empty.
- create_work_comm(work_comm_name=None, **kwargs)[source]¶
Create a temporary work comm.
- Parameters:
work_comm_name (str, optional) – Name that should be used for the work comm. If not provided, one is created from the header id and the comm class.
**kwargs – Keyword arguments for new_comm that should override work_comm_kwargs.
- Returns:
class:.CommBase: Work comm.
- drain_messages(direction=None, timeout=None, variable=None)[source]¶
Sleep while waiting for messages to be drained.
- drain_server_signon_messages(**kwargs)[source]¶
Drain server signon messages. This should only be used for testing purposes.
- property empty_obj_recv¶
Empty message object.
- Type:
obj
- evaluate_filter(*msg_in)[source]¶
Evaluate the filter to determine how the message should be handled.
- finalize_message(msg, skip_processing=False, skip_python2language=False, after_finalize_message=None)[source]¶
Perform actions to decipher a message. The order of steps is
Transform the message
Filter
python2language
Close comm on EOF if close_on_eof_recv set
Check for empty recv after processing
Mark comm as used and close if single use
Apply after_finalize_message functions
- Parameters:
msg (CommMessage) – Initial message object to be finalized.
skip_processing (bool, optional) – If True, filters, transformations, and after_finalize_message funciton applications will not be performed. Defaults to False.
skip_python2language (bool, optional) – If True, python2language will not be applied. Defaults to False.
after_finalize_message (list, optional) – A set of function that should be applied to received CommMessage objects following the standard finalization. Defaults to None and is ignored.
- Returns:
Deserialized and annotated message.
- Return type:
- get_status_message(nindent=0, extra_lines_before=None, extra_lines_after=None)[source]¶
Return lines composing a status message.
- Parameters:
nindent (int, optional) – Number of tabs that should be used to indent each line. Defaults to 0.
extra_lines_before (list, optional) – Additional lines that should be added to the beginning of the default print message. Defaults to empty list if not provided.
extra_lines_after (list, optional) – Additional lines that should be added to the end of the default print message. Defaults to empty list if not provided.
- Returns:
- Lines composing the status message and the
prefix string used for the last message.
- Return type:
- classmethod get_testing_options(serializer=None, test_dir=None, **kwargs)[source]¶
Method to return a dictionary of testing options for this class.
- Parameters:
serializer (str, optional) – The name of the serializer that should be used. If not provided, the _default_serializer class attribute will be used.
- Returns:
- Dictionary of variables to use for testing. Key/value pairs:
- kwargs (dict): Keyword arguments for comms tested with the
provided content.
send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test
file that was sent the messages in ‘send’.
- contents (bytes): Bytes contents of test file created by sending
the messages in ‘send’.
- Return type:
- get_work_comm(header, **kwargs)[source]¶
Get temporary work comm, creating as necessary.
- Parameters:
header (dict) – Information that will be sent in the message header to the work comm.
**kwargs – Additional keyword arguments are passed to header2workcomm.
- Returns:
class:.CommBase: Work comm.
- header2workcomm(header, work_comm_name=None, **kwargs)[source]¶
Get a work comm based on header info.
- Parameters:
header (dict) – Information that will be sent in the message header to the work comm.
work_comm_name (str, optional) – Name that should be used for the work comm. If not provided, one is created from the header id and the comm class.
**kwargs – Additional keyword arguments are added to the returned dictionary.
- Returns:
class:.CommBase: Work comm.
- is_empty_recv(msg)[source]¶
Check if a received message object is empty.
- Parameters:
msg (obj) – Message object.
- Returns:
True if the object is empty, False otherwise.
- Return type:
- is_eof(msg)[source]¶
Determine if a message is an EOF.
- Parameters:
msg (obj) – Message object to be tested.
- Returns:
True if the message indicates an EOF, False otherwise.
- Return type:
- is_file = False¶
- classmethod is_installed(language=None)[source]¶
Determine if the necessary libraries are installed for this communication class.
- Parameters:
language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked. If set to ‘any’, the result will be True if this comm is installed for any of the supported languages.
- Returns:
Is the comm installed.
- Return type:
- property model_env¶
Mapping between model name and opposite comm environment variables that need to be provided to the model.
- Type:
- new_server(srv_address)[source]¶
Create a new server.
- Parameters:
srv_address (str) – Address of server comm.
- no_serialization = False¶
- opp_comm_kwargs(for_yaml=False)[source]¶
Get keyword arguments to initialize communication with opposite comm object.
- precheck(direction)[source]¶
Check that comm is ready for action in specified direction, raising errors if it is not.
- Parameters:
direction (str) – Check that comm is ready to perform this action.
- prepare_message(*args, header_kwargs=None, skip_serialization=False, skip_processing=False, skip_language2python=False, after_prepare_message=None, flag=None)[source]¶
Perform actions preparing to send a message. The order of steps is
Convert the message based on the language
Isolate the message if there is only one
Check if the message is EOF
Check if the message should be filtered
Transform the message
Apply after_prepare_message functions
Serialize the message
Create a work comm if the message is too large to be sent all at once
- Parameters:
*args – Components of the outgoing message.
header_kwargs (dict, optional) – Header options that should be set.
skip_serialization (bool, optional) – If True, serialization will not be performed. Defaults to False.
skip_processing (bool, optional) – If True, filters, transformations, and after_prepare_message function applications will not be performed. Defaults to False.
skip_language2python (bool, optional) – If True, language2python will be skipped. Defaults to False.
after_prepare_message (list, optional) – Functions that should be applied after transformation, but before serialization. Defaults to None and is ignored.
flag (int, optional) – Flag that should be added to the message before any additional processing is performed. Defaults to None and is ignored.
- Returns:
Serialized and annotated message.
- Return type:
- printStatus(*args, level='info', return_str=False, **kwargs)[source]¶
Print status of the communicator.
- recv(*args, return_message_object=False, **kwargs)[source]¶
Receive a message.
- Parameters:
*args – All arguments are passed to comm _recv method.
return_message_object (bool, optional) – If True, the full wrapped CommMessage message object is returned instead of the tuple. Defaults to False.
**kwargs – All keywords arguments are passed to comm _recv method.
- Returns:
- Success or failure of receive and received
message. If return_message_object is True, the CommMessage object will be returned instead.
- Return type:
- recv_dict(*args, **kwargs)[source]¶
Return a received message as a dictionary of fields. If there are not any fields specified, the fields will have the form ‘f0’, ‘f1’, ‘f2’, …
- Parameters:
*args – Arguments are passed to recv.
**kwargs – Keyword arguments are passed to recv.
- Returns:
- Success/failure of receive and a dictionar of
message fields.
- Return type:
Raises:
- recv_message(*args, skip_deserialization=False, **kwargs)[source]¶
Receive a message.
- Parameters:
*args – Arguments are passed to _safe_recv.
skip_deserialization (bool, optional) – If True, deserialization is not performed. Defaults to False.
**kwargs – Additional keyword arguments are passed to _safe_recv.
- Returns:
Received message.
- Return type:
- send(*args, **kwargs)[source]¶
Send a message.
- Parameters:
*args – All arguments are assumed to be part of the message.
**kwargs – All keywords arguments are passed to prepare_message or send_message.
- Returns:
Success or failure of send.
- Return type:
- send_dict(args_dict, **kwargs)[source]¶
Send a message with fields specified in the input dictionary.
- Parameters:
args_dict (dict) – Dictionary of arguments to send.
**kwargs – Additiona keyword arguments are passed to send.
- Returns:
Success/failure of send.
- Return type:
- Raises:
RuntimeError – If the field order can not be determined.
- send_eof(*args, **kwargs)[source]¶
Send the EOF message as a short message.
- Parameters:
*args – All arguments are passed to comm send.
**kwargs – All keywords arguments are passed to comm send.
- Returns:
Success or failure of send.
- Return type:
- send_message(msg, skip_safe_send=False, **kwargs)[source]¶
Send a message encapsulated in a CommMessage object.
- Parameters:
msg (CommMessage) – Message to be sent.
skip_safe_send (bool, optional) – If True, no actual send will take place. Defaults to False.
**kwargs – Additional keyword arguments are passed to _safe_send.
- Returns:
Success or failure of send.
- Return type:
- update_message_from_serializer(msg)[source]¶
Update a message with information about the serializer.
- Parameters:
msg (CommMessage) – Incoming message.
- update_serializer_from_message(msg)[source]¶
Update the serializer based on information stored in a message.
- Parameters:
msg (CommMessage) – Outgoing message.
- wait_for_confirm(timeout=None, direction=None, active_confirm=False, noblock=False)[source]¶
Sleep until all messages are confirmed.
- workcomm2header(work_comm, **kwargs)[source]¶
Get header information from a comm.
- Parameters:
( (work_comm) – class:.CommBase): Work comm that header describes.
**kwargs – Additional keyword arguments are added to the header.
- Returns:
Header information that will be sent with a message.
- Return type:
- class yggdrasil.communication.CommBase.CommMessage(msg=None, length=0, flag=None, args=None, header=None)[source]¶
Bases:
object
Class for passing messages around with additional information required to send/receive them.
- flag¶
Indicates the result of processing the message. Values are: FLAG_FAILURE: Processing was unsuccessful. FLAG_SUCCESS: Processing was successful. FLAG_SKIP: The message should be skipped. FLAG_EOF: The message indicates that there will be no more messages.
- Type:
- additional_messages¶
Messages that should be sent along with this message as in the case that the message was an iterator.
- Type:
- worker¶
Worker communicator that should be used to send worker messages in the case that the original message had to be split.
- Type:
- worker_messages¶
Messages that should be sent via the worker comm comm as the original message had to be split due to its size.
- Type:
- add_message(*args, **kwargs)[source]¶
Add a message to the list of additional messages that should be sent following this one.
- Parameters:
*args – Arguments are passed to the CommMessage constructor.
*kwargs – Keyword arguments are passed to the CommMessage constructor.
- add_worker_message(*args, **kwargs)[source]¶
Add a message to the list of messages that should be sent via work comm following this one.
- Parameters:
*args – Arguments are passed to the CommMessage constructor.
*kwargs – Keyword arguments are passed to the CommMessage constructor.
- additional_messages¶
- apply_function(x)[source]¶
Apply a function to the message.
- Parameters:
x (function) – Function to apply.
- args¶
- finalized¶
- flag¶
- header¶
- length¶
- msg¶
- send_worker_messages(**kwargs)[source]¶
Send the worker messages via the worker comm.
- Parameters:
**kwargs – Keyword arguments are passed to the send_message method of the worker comm for each message.
- Returns:
Success of the send operations.
- Return type:
- sent¶
- sinfo¶
- singular¶
- stype¶
- worker¶
- worker_messages¶
- class yggdrasil.communication.CommBase.CommServer(*args, **kwargs)[source]¶
Bases:
YggTaskLoop
Basic server object to keep track of clients.
- class yggdrasil.communication.CommBase.CommTaskLoop(*args, **kwargs)[source]¶
Bases:
YggTaskLoop
Task loop for comms to ensure cleanup.
- Parameters:
( (comm) – class:.CommBase): Comm class that thread is for.
name (str, optional) – Name for the thread. If not provided, one is created by combining the comm name and the provided suffix.
suffix (str, optional) – Suffix that should be added to comm name to name the thread. Defaults to ‘CommTask’.
**kwargs – Additional keyword arguments are passed to the parent class.
- comm (
class:.CommBase): Comm class that thread is for.
- exception yggdrasil.communication.CommBase.IncompleteBaseComm[source]¶
Bases:
Exception
An exception class for methods that are incomplete for base classes.
- exception yggdrasil.communication.CommBase.NeverMatch[source]¶
Bases:
Exception
An exception class that is never raised by any code anywhere
- yggdrasil.communication.CommBase.cleanup_comms(commtype, close_func=None)[source]¶
Clean up comms of a certain type.
- yggdrasil.communication.CommBase.get_comm_registry(commtype)[source]¶
Get the comm registry for a comm class.
- yggdrasil.communication.CommBase.is_registered(commtype, key)[source]¶
Determine if a comm object has been registered under the specified key.
- yggdrasil.communication.CommBase.register_comm(commtype, key, value)[source]¶
Add a comm object to the global registry.
yggdrasil.communication.DedicatedFileBase module¶
- class yggdrasil.communication.DedicatedFileBase.DedicatedFileBase(*args, **kwargs)[source]¶
Bases:
FileComm
Base class for handling I/O via a dedicated library.
- deserialize(msg, **kwargs)[source]¶
Don’t deserialize for dedicated comms since using a serializer is inefficient.
- classmethod get_testing_options(**kwargs)[source]¶
Method to return a dictionary of testing options for this class.
- Returns:
- Dictionary of variables to use for testing. Items:
- kwargs (dict): Keyword arguments for comms tested with
the provided content.
send (list): List of objects to send to test file. recv (list): List of objects that will be received from a
test file that was sent the messages in ‘send’.
- contents (bytes): Bytes contents of test file created by
sending the messages in ‘send’.
- Return type:
yggdrasil.communication.DefaultComm module¶
yggdrasil.communication.ExcelFileComm module¶
- class yggdrasil.communication.ExcelFileComm.ExcelFileComm(*args, **kwargs)[source]¶
Bases:
DedicatedFileBase
Class for handling I/O from/to an Excel file.
- concats_as_str = True¶
- classmethod get_testing_options(sheets=None, columns=None, **kwargs)[source]¶
Method to return a dictionary of testing options for this class.
- Parameters:
- Returns:
- Dictionary of variables to use for testing. Items:
- kwargs (dict): Keyword arguments for comms tested with
the provided content.
send (list): List of objects to send to test file. recv (list): List of objects that will be received from a
test file that was sent the messages in ‘send’.
- contents (bytes): Bytes contents of test file created by
sending the messages in ‘send’.
- Return type:
- classmethod is_installed(language=None)[source]¶
Determine if the necessary libraries are installed for this communication class.
- Parameters:
language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked. If set to ‘any’, the result will be True if this comm is installed for any of the supported languages.
- Returns:
Is the comm installed.
- Return type:
- no_serialization = True¶
yggdrasil.communication.FileComm module¶
- class yggdrasil.communication.FileComm.FileComm(*args, **kwargs)[source]¶
Bases:
CommBase
Class for handling I/O from/to a file on disk.
>>> x = FileComm('test_send', address='test_file.txt', direction='send') >>> x.send('Test message') True >>> with open('test_file.txt', 'r') as fd: ... print(fd.read()) Test message >>> x = FileComm('test_recv', address='test_file.txt', direction='recv') >>> x.recv() (True, b'Test message')
- Parameters:
name (str) – The environment variable where communication address is stored.
read_meth (str, optional) – Method that should be used to read data from the file. Defaults to ‘read’. Ignored if direction is ‘send’.
append (bool, optional) – If True and writing, file is openned in append mode. If True and reading, file is kept open even if the end of the file is reached to allow for another process to write to the file in append mode. Defaults to False.
in_temp (bool, optional) – If True, the path will be considered relative to the platform temporary directory. Defaults to False.
is_series (bool, optional) – If True, input/output will be done to a series of files. If reading, each file will be processed until the end is reached. If writing, each output will be to a new file in the series. The addressed is assumed to contain a format for the index of the file. Defaults to False.
count (int, optional) – When reading a file, read the file this many of times. Defaults to 0.
wait_for_creation (float, optional) – Time (in seconds) that should be waited before opening for the file to be created if it dosn’t exist. Defaults to 0 s and file will attempt to be opened immediately.
**kwargs – Additional keywords arguments are passed to parent class.
- fd¶
File that should be read/written.
- Type:
file
- in_temp¶
If True, the path will be considered relative to the platform temporary directory.
- Type:
- is_series¶
If True, input/output will be done to a series of files. If reading, each file will be processed until the end is reached. If writing, each output will be to a new file in the series.
- Type:
- Raises:
ValueError – If the read_meth is not one of the supported values.
- advance_in_file(file_pos)[source]¶
Advance to a certain position in the current file.
- Parameters:
file_pos (int) – Position that should be moved to in the current. file.
- static before_registration(cls)[source]¶
Operations that should be performed to modify class attributes prior to registration.
- change_position(file_pos, series_index=None, header_was_read=None, header_was_written=None)[source]¶
Change the position in the file/series.
- Parameters:
file_pos (int) – Position that should be moved to in the file.
series_index (int, optinal) – Index of the file in the series that should be moved to. Defaults to None and will be set to the current series index.
header_was_read (bool, optional) – Status of if header has been read or not. Defaults to None and will be set to the current value.
header_was_written (bool, optional) – Status of if header has been written or not. Defaults to None and will be set to the current value.
- dump(obj, **kwargs)[source]¶
Serialize to a file.
- Parameters:
**kwargs – Keyword arguments are passed to the send call.
- Raises:
SerializationError – If the send call fails.
- property fd¶
Associated file identifier.
- classmethod get_test_contents(data, **kwargs)[source]¶
Method for returning the serialized form of a set of test data.
- classmethod get_testing_options(read_meth=None, serializer=None, **kwargs)[source]¶
Method to return a dictionary of testing options for this class.
- Parameters:
read_meth (str, optional) – Read method that will be used by the test class. Defaults to None and will be set by the serialier.
**kwargs – Additional keyword arguments are passed to the parent class’s method and the serializers methods for determining the default read_meth and concatenating the sent objects into the objects that are expected to be received.
- Returns:
- Dictionary of variables to use for testing. Key/value pairs:
- kwargs (dict): Keyword arguments for comms tested with the
provided content.
send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test
file that was sent the messages in ‘send’.
- contents (bytes): Bytes contents of test file created by sending
the messages in ‘send’.
- Return type:
- is_file = True¶
- classmethod is_installed(language=None)[source]¶
Determine if the necessary libraries are installed for this communication class.
- Parameters:
language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked. If set to ‘any’, the result will be True if this comm is installed for any of the supported languages.
- Returns:
Is the comm installed.
- Return type:
- load(return_message_object=False, **kwargs)[source]¶
Deserialize all contents from a file.
- Parameters:
**kwargs – Keyword arguments are passed to recv calls.
- Returns:
- The deserialized data object or a list of
deserialized data objects if there is more than one.
- Return type:
- Raises:
SerializationError – If the first recv call fails.
- opp_comm_kwargs(for_yaml=False)[source]¶
Get keyword arguments to initialize communication with opposite comm object.
- prepare_message(*args, **kwargs)[source]¶
Perform actions preparing to send a message.
- Parameters:
*args – Components of the outgoing message.
**kwargs – Keyword arguments are passed to the parent class’s method.
- Returns:
Serialized and annotated message.
- Return type:
- classmethod remove_companion_files(address)[source]¶
Remove companion files that are created when writing to a file
- Parameters:
address (str) – Address for the filename.
- yggdrasil.communication.FileComm.convert_file(src, dst, src_type=None, dst_type=None, src_kwargs=None, dst_kwargs=None, transform=None)[source]¶
Convert from one file type to another.
- Parameters:
src (str) – Path to source file to convert.
dst (str) – Path to destination file that should be created.
src_type (str, dict, optional) – Name of source file type. If not provided, an attempt will be made to identify the file type from the extension.
dst_type (str, dict, optional) – Name of destination file type. If not provided, an attempt will be made to identify the file type from the extension.
transform (dict, optional) – Transform parameters for transforming messages between the soruce and destination file.
- Raises:
yggdrasil.communication.ForkComm module¶
- class yggdrasil.communication.ForkComm.ForkComm(*args, **kwargs)[source]¶
Bases:
CommBase
Class for receiving/sending messages from/to multiple comms.
- Parameters:
name (str) – The environment variable where communication address is stored.
comm_list (list, optional) – The list of options for the comms that should be bundled. If not provided, the bundle will be empty.
pattern (str, optional) –
The communication pattern that should be used to handle outgoing/incoming messages. Options include:
- ’cycle’: Receive from or send to the next available comm in
the list (default for receiving comms).
- ’broadcast’: [SEND ONLY] Send the same message to each comm
(default for sending comms).
- ’scatter’: [SEND ONLY] Send part of message (must be a list)
to each comm.
- ’gather’: [RECV ONLY] Receive lists of messages from each
comm where a message is only returned when there is a message from each.
**kwargs – Additional keyword arguments are passed to the parent class.
- child_keys = ['serializer_class', 'serializer_kwargs', 'format_str', 'field_names', 'field_units', 'as_array', 'partner_copies']¶
- drain_server_signon_messages(**kwargs)[source]¶
Drain server signon messages. This should only be used for testing purposes.
- property empty_obj_recv¶
Empty message object.
- Type:
obj
- finalize_message(msg, **kwargs)[source]¶
Perform actions to decipher a message.
- Parameters:
msg (CommMessage) – Initial message object to be finalized.
**kwargs – Keyword arguments are passed to the forked comm’s finalize_message method.
- Returns:
Deserialized and annotated message.
- Return type:
- property model_env¶
Mapping between model name and opposite comm environment variables that need to be provided to the model.
- Type:
- noprop_keys = ['send_converter', 'recv_converter', 'filter', 'transform']¶
- opp_comm_kwargs(for_yaml=False)[source]¶
Get keyword arguments to initialize communication with opposite comm object.
- prepare_message(*args, **kwargs)[source]¶
Perform actions preparing to send a message.
- Parameters:
*args – Components of the outgoing message.
**kwargs – Additional keyword arguments are passed to the prepare_message methods for the forked comms.
- Returns:
Serialized and annotated message.
- Return type:
- recv_message(*args, **kwargs)[source]¶
Receive a message.
- Parameters:
*args – Arguments are passed to the forked comm’s recv_message method.
**kwargs – Keyword arguments are passed to the forked comm’s recv_message method.
- Returns:
Received message.
- Return type:
- send_message(msg, **kwargs)[source]¶
Send a message encapsulated in a CommMessage object.
- Parameters:
msg (CommMessage) – Message to be sent.
**kwargs – Additional keyword arguments are passed to _safe_send.
- Returns:
Success or failure of send.
- Return type:
- property suppress_special_debug¶
- update_serializer_from_message(msg)[source]¶
Update the serializer based on information stored in a message.
- Parameters:
msg (CommMessage) – Outgoing message.
- class yggdrasil.communication.ForkComm.ForkedCommMessage(msg, comm_list, pattern='broadcast', **kwargs)[source]¶
Bases:
CommMessage
Class for forked comm messages.
- Parameters:
msg (CommBase.CommMessage) – Message being distributed.
comm_list (list) – List of communicators that the message is being distributed to.
**kwargs – Additional keyword arguments are passed to the ‘prepare_message’ method for each communicator.
- orig¶
yggdrasil.communication.IPCComm module¶
- class yggdrasil.communication.IPCComm.IPCComm(*args, **kwargs)[source]¶
Bases:
CommBase
Class for handling I/O via IPC message queues.
- q¶
Message queue.
- Type:
sysv_ipc.MessageQueue
- Developer Notes:
The default size limit for IPC message queues is 2048 bytes on Mac operating systems so it is important that implementation of this communication mechanism properly split and send messages larger than this limit as more than one message.
- address_description = 'An IPC message queue key.'¶
- class yggdrasil.communication.IPCComm.IPCServer(*args, **kwargs)[source]¶
Bases:
CommServer
IPC server object for cleaning up server queue.
- yggdrasil.communication.IPCComm.get_queue(qid=None)[source]¶
Create or return a sysv_ipc.MessageQueue and register it.
- Parameters:
qid (int, optional) – If provided, ID for existing queue that should be returned. Defaults to None and a new queue is returned.
- Returns:
Message queue.
- Return type:
sysv_ipc.MessageQueue
- yggdrasil.communication.IPCComm.ipc_queues(by_id=False)[source]¶
Get a list of active IPC queues.
- Returns:
List of IPC queues.
- Return type:
- yggdrasil.communication.IPCComm.ipcrm(options=[])[source]¶
Remove IPC constructs using the ipcrm command.
- Parameters:
options (list) – List of flags that should be used. Defaults to an empty list.
- yggdrasil.communication.IPCComm.ipcrm_queues(queue_keys=None, by_id=False)[source]¶
Delete existing IPC queues.
yggdrasil.communication.ImageFileBase module¶
- class yggdrasil.communication.ImageFileBase.BMPFileComm(*args, **kwargs)¶
Bases:
PILFileBase
- class yggdrasil.communication.ImageFileBase.EPSFileComm(*args, **kwargs)¶
Bases:
PILFileBase
- class yggdrasil.communication.ImageFileBase.GIFFileComm(*args, **kwargs)¶
Bases:
PILFileBase
- class yggdrasil.communication.ImageFileBase.JPEGFileComm(*args, **kwargs)¶
Bases:
PILFileBase
- class yggdrasil.communication.ImageFileBase.PILFileBase(*args, **kwargs)[source]¶
Bases:
DedicatedFileBase
Class for handling I/O from/to a JPEG file.
- static before_registration(cls)[source]¶
Operations that should be performed to modify class attributes prior to registration.
- classmethod get_testing_options(**kwargs)[source]¶
Method to return a dictionary of testing options for this class.
- Returns:
- Dictionary of variables to use for testing. Items:
- kwargs (dict): Keyword arguments for comms tested with
the provided content.
send (list): List of objects to send to test file. recv (list): List of objects that will be received from a
test file that was sent the messages in ‘send’.
- contents (bytes): Bytes contents of test file created by
sending the messages in ‘send’.
- Return type:
- classmethod is_installed(language=None)[source]¶
Determine if the necessary libraries are installed for this communication class.
- Parameters:
language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked. If set to ‘any’, the result will be True if this comm is installed for any of the supported languages.
- Returns:
Is the comm installed.
- Return type:
- class yggdrasil.communication.ImageFileBase.PNGFileComm(*args, **kwargs)¶
Bases:
PILFileBase
- class yggdrasil.communication.ImageFileBase.TIFFFileComm(*args, **kwargs)¶
Bases:
PILFileBase
yggdrasil.communication.JSONFileComm module¶
yggdrasil.communication.MPIComm module¶
- exception yggdrasil.communication.MPIComm.MPIClosedError[source]¶
Bases:
BaseException
Exception to raise when the MPI connection has been closed.
- class yggdrasil.communication.MPIComm.MPIComm(*args, **kwargs)[source]¶
Bases:
CommBase
Class for handling I/O via MPI communicators.
- Parameters:
tag_start (int, optional) – Tag that MPI messages should start with. Defaults to 0.
tag_stride (int, optional) – Amount that tag should be advanced after each message to avoid conflicts w/ other MPIComm communicators. Defaults to 1.
partner_mpi_ranks (list, optional) – Rank of MPI processes that partner models are running on. Defaults to None.
- tag_stride¶
Amount that tag should be advanced after each each message to avoid conflicts w/ other MPIComm communicators.
- Type:
- address_description = 'The partner communicator ID(s).'¶
- advance_tag(request)[source]¶
Advance to the next tag.
- Parameters:
request (MPIRequest, MPIMultiRequest) – Request advancing the tag.
- cache_tag(request)[source]¶
Store a tag for an uncompleted request.
- Parameters:
request (MPIRequest, MPIMultiRequest) – Request to cache.
- property model_env¶
Mapping between model name and opposite comm environment variables that need to be provided to the model.
- Type:
- classmethod parse_address(address)[source]¶
Parse an MPI address for information about the partner process ranks and how the tags should be iterated.
- recv_message(*args, **kwargs)[source]¶
Receive a message.
- Parameters:
*args – Arguments are passed to the forked comm’s recv_message method.
**kwargs – Keyword arguments are passed to the forked comm’s recv_message method.
- Returns:
Received message.
- Return type:
- send_message(msg, **kwargs)[source]¶
Send a message encapsulated in a CommMessage object.
- Parameters:
msg (CommMessage) – Message to be sent.
**kwargs – Additional keyword arguments are passed to _safe_send.
- Returns:
Success or failure of send.
- Return type:
- class yggdrasil.communication.MPIComm.MPIMultiRequest(*args, **kwargs)[source]¶
Bases:
MPIRequest
Container for MPI request for multiple partner comms.
- remainder¶
yggdrasil.communication.MatFileComm module¶
- class yggdrasil.communication.MatFileComm.MatFileComm(*args, **kwargs)[source]¶
Bases:
FileComm
Class for handling I/O from/to a Matlab .mat file on disk.
- Parameters:
name (str) – The environment variable where file path is stored.
**kwargs – Additional keywords arguments are passed to parent class.
yggdrasil.communication.NetCDFFileComm module¶
- class yggdrasil.communication.NetCDFFileComm.NetCDFFileComm(*args, **kwargs)[source]¶
Bases:
DedicatedFileBase
Class for handling I/O from/to an netCDF file.
- Parameters:
read_attributes (bool, optional) – If True, the attributes are read in as well as the variables. Defaults to False.
variables (list, optional) – List of variables to read in. If not provided, all variables will be read.
version (int, optional) – Version of netCDF format that should be used. Defaults to 1. Options are 1 (classic format) and 2 (64-bit offset format).
**kwargs – Additional keywords arguments are passed to parent class.
- property fd¶
Associated file identifier.
- classmethod get_testing_options(**kwargs)[source]¶
Method to return a dictionary of testing options for this class.
- Returns:
- Dictionary of variables to use for testing. Items:
- kwargs (dict): Keyword arguments for comms tested with
the provided content.
send (list): List of objects to send to test file. recv (list): List of objects that will be received from a
test file that was sent the messages in ‘send’.
- contents (bytes): Bytes contents of test file created by
sending the messages in ‘send’.
- Return type:
yggdrasil.communication.ObjFileComm module¶
- class yggdrasil.communication.ObjFileComm.ObjFileComm(*args, **kwargs)[source]¶
Bases:
PlyFileComm
Class for handling I/O from/to a .obj file on disk.
- Parameters:
name (str) – The environment variable where communication address is stored.
**kwargs – Additional keywords arguments are passed to parent class.
yggdrasil.communication.PandasFileComm module¶
- class yggdrasil.communication.PandasFileComm.PandasFileComm(*args, **kwargs)[source]¶
Bases:
AsciiTableComm
Class for handling I/O from/to a pandas csv file on disk.
yggdrasil.communication.PickleFileComm module¶
- class yggdrasil.communication.PickleFileComm.PickleFileComm(*args, **kwargs)[source]¶
Bases:
FileComm
Class for handling I/O from/to a pickled file on disk.
- Parameters:
name (str) – The environment variable where file path is stored.
**kwargs – Additional keywords arguments are passed to parent class.
yggdrasil.communication.PlyFileComm module¶
- class yggdrasil.communication.PlyFileComm.PlyFileComm(*args, **kwargs)[source]¶
Bases:
FileComm
Class for handling I/O from/to a .ply file on disk.
- Parameters:
name (str) – The environment variable where communication address is stored.
**kwargs – Additional keywords arguments are passed to parent class.
yggdrasil.communication.RESTComm module¶
- class yggdrasil.communication.RESTComm.RESTComm(*args, **kwargs)[source]¶
Bases:
CommBase
Class for handling I/O via a RESTful API. The provided address should be an HTTP address to a server running a flask app that has been equipped to respond to send/receive calls via the add_comm_server_to_app method.
- Parameters:
yggdrasil.communication.RMQAsyncComm module¶
- class yggdrasil.communication.RMQAsyncComm.RMQAsyncComm(*args, **kwargs)[source]¶
Bases:
RMQComm
Class for handling asynchronous RabbitMQ communications. It is not recommended to use this class as it can leave hanging threads if not closed propertly. The normal RMQComm will cover most use cases.
- Parameters:
- rmq_thread¶
Thread used to run IO loop.
- Type:
- add_on_channel_close_callback()[source]¶
This method tells pika to call the on_channel_closed method if RabbitMQ unexpectedly closes the channel.
- on_bindok(unused_frame, userdata)[source]¶
Actions to perform once the queue is succesfully bound. Start consuming messages.
- on_cancelok(unused_frame, userdata)[source]¶
Actions to perform after succesfully cancelling consumption. Closes the channel.
- on_channel_closed(channel, reason)[source]¶
Actions to perform when the channel is closed. Close the connection.
- on_channel_open(channel)[source]¶
Actions to perform after a channel is opened. Add the channel close callback and setup the exchange.
- on_connection_closed(connection, reason)[source]¶
Actions that must be taken when the connection is closed. Set the channel to None. If the connection is meant to be closing, stop the IO loop. Otherwise, wait 5 seconds and try to reconnect.
- on_connection_open(connection)[source]¶
Actions that must be taken when the connection is opened. Add the close connection callback and open the RabbitMQ channel.
- on_connection_open_error(unused_connection, err)[source]¶
Actions that must be taken when the connection fails to open.
- on_exchange_declareok(unused_frame, userdata)[source]¶
Actions to perform once an exchange is succesfully declared. Set up the queue.
- on_queue_declareok(method_frame, userdata)[source]¶
Actions to perform once the queue is succesfully declared. Bind the queue.
- property rmq_lock¶
Lock associated with RMQ ioloop thread.
- class yggdrasil.communication.RMQAsyncComm.RMQTaskLoop(*args, **kwargs)[source]¶
Bases:
YggTaskLoop
Task loop for RMQ consumer.
yggdrasil.communication.RMQComm module¶
- class yggdrasil.communication.RMQComm.RMQComm(*args, **kwargs)[source]¶
Bases:
CommBase
Class for handling basic RabbitMQ communications.
- connection¶
RabbitMQ connection.
- Type:
pika.Connection
- channel¶
RabbitMQ channel.
- Type:
pika.Channel
- Raises:
RuntimeError – If a connection cannot be established.
- Developer Notes:
It is not advised that new language implement a RabbitMQ communication interface. Rather RMQ communication is included explicitly for connections between models that are not co-located on the same machine and are used by the yggdrasil framework connections on the Python side.
- address_description = 'AMPQ queue address of the form ``<url>_RMQPARAM_<exchange>_RMQPARAM_<queue>`` where ``url`` is the broker address (see explanation `here <https://pika.readthedocs.io/en/stable/examples/using_urlparameters.html>`_), ``exchange`` is the name of the exchange on the queue that should be used, and ``queue`` is the name of the queue.'¶
- classmethod new_comm_kwargs(name, user=None, password=None, host=None, virtual_host=None, port=None, exchange=None, queue='', **kwargs)[source]¶
Initialize communication with new connection.
- Parameters:
name (str) – Name of new connection.
user (str, optional) – RabbitMQ server username. Defaults to config option ‘user’ in section ‘rmq’ if it exists and ‘guest’ if it does not.
password (str, optional) – RabbitMQ server password. Defaults to config option ‘password’ in section ‘rmq’ if it exists and ‘guest’ if it does not.
host (str, optional) – RabbitMQ server host. Defaults to config option ‘host’ in section ‘rmq’ if it exists and _localhost if it does not. If _localhost, the output of socket.gethostname() is used.
virtual_host (str, optional) – RabbitMQ server virtual host. Defaults to config option ‘vhost’ in section ‘rmq’ if it exists and ‘/’ if it does not.
port (str, optional) – Port on host to use. Defaults to config option ‘port’ in section ‘rmq’ if it exists and ‘5672’ if it does not.
exchange (str, optional) – RabbitMQ exchange. Defaults to config option ‘namespace’ in section ‘rmq’ if it exits and ‘’ if it does not.
queue (str, optional) – Name of the queue that messages will be send to or received from. If an empty string, the queue will be a random string and exclusive to a receiving comm. Defaults to ‘’.
**kwargs – Additional keywords arguments are returned as keyword arguments for the new comm.
- Returns:
Arguments and keyword arguments for new comm.
- Return type:
- class yggdrasil.communication.RMQComm.RMQServer(*args, **kwargs)[source]¶
Bases:
CommServer
RMQ server object for cleaning up server connections.
- yggdrasil.communication.RMQComm.check_rmq_server(url=None, **kwargs)[source]¶
Check that connection to a RabbitMQ server is possible.
- Parameters:
url (str, optional) – Address of RMQ server that includes all the necessary information. If this is provided, the remaining arguments are ignored. Defaults to None and the connection parameters are taken from the other arguments.
username (str, optional) – RMQ server username. Defaults to config value.
password (str, optional) – RMQ server password. Defaults to config value.
host (str, optional) – RMQ hostname or IP Address to connect to. Defaults to config value.
port (str, optional) – RMQ server TCP port to connect to. Defaults to config value.
vhost (str, optional) – RMQ virtual host to use. Defaults to config value.
- Returns:
- True if connection to RabbitMQ server is possible, False
otherwise.
- Return type:
- yggdrasil.communication.RMQComm.get_rmq_parameters(url=None, user=None, username=None, password=None, host=None, virtual_host=None, vhost=None, port=None, exchange=None, queue='')[source]¶
Get RabbitMQ connection parameters.
- Parameters:
url (str, optional) – Address of RMQ server that includes all the necessary information. If this is provided, the remaining arguments are ignored. Defaults to None and the connection parameters are taken from the other arguments.
user (str, optional) – RabbitMQ server username. Defaults to config option ‘user’ in section ‘rmq’ if it exists and ‘guest’ if it does not.
username (str, optional) – Alias for user.
password (str, optional) – RabbitMQ server password. Defaults to config option ‘password’ in section ‘rmq’ if it exists and ‘guest’ if it does not.
host (str, optional) – RabbitMQ server host. Defaults to config option ‘host’ in section ‘rmq’ if it exists and _localhost if it does not. If _localhost, the output of socket.gethostname() is used.
virtual_host (str, optional) – RabbitMQ server virtual host. Defaults to config option ‘vhost’ in section ‘rmq’ if it exists and ‘/’ if it does not.
vhost (str, optional) – Alias for virtual_host.
port (str, optional) – Port on host to use. Defaults to config option ‘port’ in section ‘rmq’ if it exists and ‘5672’ if it does not.
exchange (str, optional) – RabbitMQ exchange. Defaults to config option ‘namespace’ in section ‘rmq’ if it exits and ‘’ if it does not.
queue (str, optional) – Name of the queue that messages will be send to or received from. If an empty string, the queue will be a random string and exclusive to a receiving comm. Defaults to ‘’.
- Returns:
The connection url, exchange, & queue.
- Return type:
yggdrasil.communication.SequenceFileBase module¶
- class yggdrasil.communication.SequenceFileBase.BAMFileComm(*args, **kwargs)¶
Bases:
PySamFileBase
- class yggdrasil.communication.SequenceFileBase.BCFFileComm(*args, **kwargs)¶
Bases:
PySamFileBase
- class yggdrasil.communication.SequenceFileBase.BioPythonFileBase(*args, **kwargs)[source]¶
Bases:
SequenceFileBase
Base class for nucleotide/protein sequence I/O using biopython.
- classmethod get_testing_options(piecemeal=False, **kwargs)[source]¶
Method to return a dictionary of testing options for this class.
- Parameters:
piecemeal (bool, optional) – If True, the test data will be piecemeal.
- Returns:
- Dictionary of variables to use for testing. Items:
- kwargs (dict): Keyword arguments for comms tested with
the provided content.
send (list): List of objects to send to test file. recv (list): List of objects that will be received from a
test file that was sent the messages in ‘send’.
- contents (bytes): Bytes contents of test file created by
sending the messages in ‘send’.
- Return type:
- classmethod is_installed(language=None)[source]¶
Determine if the necessary libraries are installed for this communication class.
- Parameters:
language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked. If set to ‘any’, the result will be True if this comm is installed for any of the supported languages.
- Returns:
Is the comm installed.
- Return type:
- property records_iterator¶
SeqRecord iterator.
- Type:
iterator
- class yggdrasil.communication.SequenceFileBase.CRAMFileComm(*args, **kwargs)¶
Bases:
PySamFileBase
- class yggdrasil.communication.SequenceFileBase.FASTAFileComm(*args, **kwargs)¶
Bases:
BioPythonFileBase
- class yggdrasil.communication.SequenceFileBase.FASTQFileComm(*args, **kwargs)¶
Bases:
BioPythonFileBase
- class yggdrasil.communication.SequenceFileBase.PySamFileBase(*args, **kwargs)[source]¶
Bases:
SequenceFileBase
Base class for nucleotide/protein sequence I/O using pysam.
- concats_as_str = True¶
- create_index(address, overwrite=False)[source]¶
Create an index file to accompany the file begin written/read if one is required.
- classmethod get_testing_options(test_dir=None, **kwargs)[source]¶
Method to return a dictionary of testing options for this class.
- property header_size¶
- classmethod is_installed(language=None)[source]¶
Determine if the necessary libraries are installed for this communication class.
- Parameters:
language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked. If set to ‘any’, the result will be True if this comm is installed for any of the supported languages.
- Returns:
Is the comm installed.
- Return type:
- class yggdrasil.communication.SequenceFileBase.SAMFileComm(*args, **kwargs)¶
Bases:
PySamFileBase
- class yggdrasil.communication.SequenceFileBase.SequenceFileBase(*args, **kwargs)[source]¶
Bases:
DedicatedFileBase
Base class for nucleotide/protein sequence I/O.
- class yggdrasil.communication.SequenceFileBase.VCFFileComm(*args, **kwargs)¶
Bases:
PySamFileBase
yggdrasil.communication.ServerComm module¶
- class yggdrasil.communication.ServerComm.Request(response_address, request_id)[source]¶
Bases:
object
- request_id¶
- response_address¶
- class yggdrasil.communication.ServerComm.ServerComm(*args, **kwargs)[source]¶
Bases:
CommBase
Class for handling Server side communication.
- Parameters:
name (str) – The environment variable where communication address is stored.
request_commtype (str, optional) – Comm class that should be used for the request comm. Defaults to None.
response_kwargs (dict, optional) – Keyword arguments for the response comm. Defaults to empty dict.
direct_connection (bool, optional) – If True, the comm will be directly connected to a ServerComm. Defaults to False.
**kwargs – Additional keywords arguments are passed to the input comm.
- icomm¶
Request comm.
- Type:
Comm
- ocomm¶
Response comms for each request.
- Type:
OrderedDict
- property all_clients_connected¶
True if all expected clients have connected. False otherwise.
- Type:
- create_response_comm(header)[source]¶
Create a response comm based on information from the last header.
- drain_server_signon_messages(**kwargs)[source]¶
Drain server signon messages. This should only be used for testing purposes.
- property filter¶
filter for the communicator.
- Type:
- finalize_message(msg, **kwargs)[source]¶
Perform actions to decipher a message.
- Parameters:
msg (CommMessage) – Initial message object to be finalized.
**kwargs – Keyword arguments are passed to the request comm’s finalize_message method.
- Returns:
Deserialized and annotated message.
- Return type:
- get_status_message(nindent=0, **kwargs)[source]¶
Return lines composing a status message.
- Parameters:
nindent (int, optional) – Number of tabs that should be used to indent each line. Defaults to 0.
*kwargs – Additional arguments are passed to the parent class’s method.
- Returns:
- Lines composing the status message and the
prefix string used for the last message.
- Return type:
- classmethod is_installed(language=None)[source]¶
Determine if the necessary libraries are installed for this communication class.
- classmethod new_comm_kwargs(name, request_commtype=None, **kwargs)[source]¶
Initialize communication with new comms.
- opp_comm_kwargs(for_yaml=False)[source]¶
Get keyword arguments to initialize communication with opposite comm object.
- prepare_message(*args, **kwargs)[source]¶
Perform actions preparing to send a message.
- Parameters:
*args – Components of the outgoing message.
**kwargs – Keyword arguments are passed to the request comm’s prepare_message method.
- Returns:
Serialized and annotated message.
- Return type:
- recv_from(*args, **kwargs)[source]¶
Receive a message from the input comm and open a new response comm for output using address from the header, returning the response_id.
- recv_message(*args, **kwargs)[source]¶
Receive a message.
- Parameters:
*args – Arguments are passed to the request comm’s recv_message method.
**kwargs – Keyword arguments are passed to the request comm’s recv_message method.
- Returns:
Received message.
- Return type:
- send_message(msg, **kwargs)[source]¶
Send a message encapsulated in a CommMessage object.
- Parameters:
msg (CommMessage) – Message to be sent.
**kwargs – Additional keyword arguments are passed to the response comm’s send_message method.
- Returns:
Success or failure of send.
- Return type:
- send_to(response_id, *args, **kwargs)[source]¶
Send a message to a specific response comm.
- Parameters:
response_id (str) – ID used to register the response comm.
*args – Arguments are passed to output comm send method.
**kwargs – Keyword arguments are passed to output comm send method.
- Returns:
Output from output comm send method.
- Return type:
obj
yggdrasil.communication.ValueComm module¶
- class yggdrasil.communication.ValueComm.ValueComm(*args, **kwargs)[source]¶
Bases:
CommBase
- classmethod get_testing_options(**kwargs)[source]¶
Method to return a dictionary of testing options for this class.
- Parameters:
serializer (str, optional) – The name of the serializer that should be used. If not provided, the _default_serializer class attribute will be used.
- Returns:
- Dictionary of variables to use for testing. Key/value pairs:
- kwargs (dict): Keyword arguments for comms tested with the
provided content.
send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test
file that was sent the messages in ‘send’.
- contents (bytes): Bytes contents of test file created by sending
the messages in ‘send’.
- Return type:
- classmethod is_installed(language=None)[source]¶
Determine if the necessary libraries are installed for this communication class.
- Parameters:
language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked. If set to ‘any’, the result will be True if this comm is installed for any of the supported languages.
- Returns:
Is the comm installed.
- Return type:
- no_serialization = True¶
yggdrasil.communication.YAMLFileComm module¶
yggdrasil.communication.ZMQComm module¶
- class yggdrasil.communication.ZMQComm.ZMQComm(*args, **kwargs)[source]¶
Bases:
CommBase
Class for handling I/O using ZeroMQ sockets.
- Parameters:
name (str) – The environment variable where the socket address is stored.
context (zmq.Context, optional) – ZeroMQ context that should be used. Defaults to None and the global context is used.
socket_type (str, optional) – The type of socket that should be created. Defaults to _default_socket_type. See zmq for all options.
socket_action (str, optional) – The action that the socket should perform. Defaults to action based on the direction (‘connect’ for ‘recv’, ‘bind’ for ‘send’.)
topic_filter (str, optional) – Message filter to use when subscribing. This is only used for ‘SUB’ socket types. Defaults to ‘’ which is all messages.
dealer_identity (str, optional) – Identity that should be used to route messages to a dealer socket. Defaults to ‘0’.
**kwargs – Additional keyword arguments are passed to :class:.CommBase.
- context¶
ZeroMQ context that will be used.
- Type:
zmq.Context
- socket¶
ZeroMQ socket.
- Type:
zmq.Socket
- Developer Notes:
yggdrasil uses the tcp transport by default with a PAIR socket type. For every connection, yggdrasil establishes a second request/reply connection that is used to confirm messages passed between the primary PAIR of sockets. On the first send, the model should create a REP socket on an open tcp address and send that address in the header of the first message under the key ‘zmq_reply’. Receiving models should check message headers for this key and, on receipt, establish the partner REQ socket with the specified address (receiving comms can receive from more than one source so they can have more than one request addresses at at time for this purpose). Following every message, the sending model should wait for a message on the reply socket and, on receipt, return the message. Following every message, the receiving model should send the message ‘YGG_REPLY’ on the request socket and wait for a reply. When creating worker comms for sending large messages, the sending model should create the reply comm for the worker in advanced and send it in the header with the worker address under the key ‘zmq_reply_worker’.
- address_description = 'A ZeroMQ endpoint of the form <transport>://<address>, where the format of address depends on the transport. Additional information can be found `here <http://api.zeromq.org/3-2:zmq-bind>`_.'¶
- drain_server_signon_messages(**kwargs)[source]¶
Drain server signon messages. This should only be used for testing purposes.
- get_status_message(nindent=0, **kwargs)[source]¶
Return lines composing a status message.
- Parameters:
nindent (int, optional) – Number of tabs that should be used to indent each line. Defaults to 0.
**kwargs – Additional keyword arguments are passed to the parent class’s method.
- Returns:
- Lines composing the status message and the
prefix string used for the last message.
- Return type:
- header2workcomm(header, **kwargs)[source]¶
Get a work comm based on header info.
- Parameters:
header (dict) – Information that will be sent in the message header to the work comm.
**kwargs – Additional keyword arguments are passed to the parent method.
- Returns:
class:.CommBase: Work comm.
- classmethod new_comm_kwargs(name, protocol=None, host=None, port=None, **kwargs)[source]¶
Initialize communication with new queue.
- Parameters:
name (str) – Name of new socket.
protocol (str, optional) – The protocol that should be used. Defaults to None and is set to _default_protocol. See zmq for details.
host (str, optional) – The host that should be used. Invalid for ‘inproc’ protocol. Defaults to ‘localhost’.
port (int, optional) – The port used. Invalid for ‘inproc’ protocol. Defaults to None and a random port is choosen.
**kwargs – Additional keywords arguments are returned as keyword arguments for the new comm.
- Returns:
Arguments and keyword arguments for new socket.
- Return type:
- opp_comm_kwargs(for_yaml=False)[source]¶
Get keyword arguments to initialize communication with opposite comm object.
- property reply_thread¶
Task that will handle sending or receiving backlogged messages.
- Type:
tools.YggTask
- workcomm2header(work_comm, **kwargs)[source]¶
Get header information from a comm.
- Parameters:
( (work_comm) – class:.CommBase): Work comm that header describes.
**kwargs – Additional keyword arguments are added to the header.
- Returns:
Header information that will be sent with a message.
- Return type:
- class yggdrasil.communication.ZMQComm.ZMQProxy(*args, **kwargs)[source]¶
Bases:
CommServer
Start a proxy in a new thread for a server address. A client-side address will be randomly generated.
- Parameters:
srv_address (str) – Address that should face the server(s).
context (zmq.Context, optional) – ZeroMQ context that should be used. Defaults to None and the global context is used.
protocol (str, optional) – Protocol that should be used for the sockets. Defaults to None and is set to _default_protocol.
host (str, optional) – Host for socket address. Defaults to ‘localhost’.
retry_timeout (float, optional) – Time (in seconds) that should be waited before retrying to bind the sockets to the addresses. If negative, a retry will not be attempted and an error will be raised. Defaults to -1.
nretry (int, optional) – Number of times to try binding the sockets to the addresses. Defaults to 1.
**kwargs – Additional keyword arguments are passed to the parent class.
- context¶
ZeroMQ context that will be used.
- Type:
zmq.Context
- srv_socket¶
Socket facing client(s).
- Type:
zmq.Socket
- cli_socket¶
Socket facing server(s).
- Type:
zmq.Socket
- client_signon_msg = b'ZMQ_CLIENT_SIGNED_ON::'¶
- server_signon_msg = b'ZMQ_SERVER_SIGNING_ON::'¶
- yggdrasil.communication.ZMQComm.bind_socket(socket, address, retry_timeout=-1, nretry=1)[source]¶
Bind a socket to an address, getting a random port as necessary.
- Parameters:
socket (zmq.Socket) – Socket that should be bound.
address (str) – Address that socket should be bound to.
retry_timeout (float, optional) – Time (in seconds) that should be waited before retrying to bind the socket to the address. If negative, a retry will not be attempted and an error will be raised. Defaults to -1.
nretry (int, optional) – Number of times to try binding the socket to the addresses. Defaults to 1.
- Returns:
- Address that socket was bound to, including random port if one
was used.
- Return type:
- yggdrasil.communication.ZMQComm.create_socket(context, socket_type)[source]¶
Create a socket w/ some default options to improve cleanup.
- Parameters:
context (zmq.Context) – ZeroMQ context.
socket_type (int) – ZeroMQ socket type.
- Returns:
New socket.
- Return type:
zmq.Socket
- yggdrasil.communication.ZMQComm.format_address(protocol, host, port=None)[source]¶
Format an address based on its parts.
- Parameters:
- Returns:
Complete address.
- Return type:
- Raises:
ValueError – If the protocol is not recognized.
- yggdrasil.communication.ZMQComm.get_ipc_host()[source]¶
Get an IPC host using uuid.
- Returns:
File path for IPC transport created using uuid.
- Return type:
- yggdrasil.communication.ZMQComm.get_socket_type_mate(t_in)[source]¶
Find the counterpart socket type.
- Parameters:
t_in (str) – Socket type.
- Returns:
Counterpart socket type.
- Return type:
- Raises:
ValueError – If t_in is not a recognized socket type.
- yggdrasil.communication.ZMQComm.parse_address(address)[source]¶
Split an address into its parts.
- Parameters:
address (str) – Address to be split.
- Returns:
Parameters extracted from the address.
- Return type:
- Raises:
ValueError – If the address dosn’t contain ‘://’.
ValueError – If the protocol is not supported.
Module contents¶
- yggdrasil.communication.get_comm(name, **kwargs)[source]¶
Return communicator for existing comm components.
- Parameters:
name (str) – Communicator name.
**kwargs – Additional keyword arguments are passed to new_comm.
- Returns:
Communicator of given class.
- Return type:
Comm
- yggdrasil.communication.new_comm(name, commtype=None, use_async=False, **kwargs)[source]¶
Return a new communicator, creating necessary components for communication (queues, sockets, channels, etc.).
- Parameters:
name (str) – Communicator name.
commtype (str, list, optional) – Name of communicator type or a list of specifiers for multiple communicators that should be joined. Defaults to None.
use_async (bool, optional) – If True, send/recv operations will be performed asynchronously on new threads. Defaults to False.
**kwargs – Additional keyword arguments are passed to communicator class method new_comm.
- Returns:
Communicator of given class.
- Return type:
Comm