cis_interface.communication package

Submodules

cis_interface.communication.AsciiFileComm module

class cis_interface.communication.AsciiFileComm.AsciiFileComm(*args, **kwargs)[source]

Bases: cis_interface.communication.FileComm.FileComm

Class for handling I/O from/to a file on disk.

Parameters:
  • name (str) – The environment variable where communication address is stored.
  • comment (str, optional) – String indicating a comment. If ‘read_meth’ is ‘readline’ and this is provided, lines starting with a comment will be skipped.
  • **kwargs – Additional keywords arguments are passed to parent class.
comment

String indicating a comment.

Type:str
classmethod get_testing_options(**kwargs)[source]

Method to return a dictionary of testing options for this class.

Returns:
Dictionary of variables to use for testing. Key/value pairs:
kwargs (dict): Keyword arguments for comms tested with the
provided content.

send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test

file that was sent the messages in ‘send’.
contents (bytes): Bytes contents of test file created by sending
the messages in ‘send’.
Return type:dict
opp_comm_kwargs()[source]

Get keyword arguments to initialize communication with opposite comm object.

Returns:Keyword arguments for opposite comm object.
Return type:dict

cis_interface.communication.AsciiMapComm module

class cis_interface.communication.AsciiMapComm.AsciiMapComm(*args, **kwargs)[source]

Bases: cis_interface.communication.FileComm.FileComm

Class for handling I/O from/to a ASCII map on disk.

Parameters:
  • name (str) – The environment variable where file path is stored.
  • **kwargs – Additional keywords arguments are passed to parent class.
classmethod get_testing_options()[source]

Method to return a dictionary of testing options for this class.

Returns:
Dictionary of variables to use for testing. Key/value pairs:
kwargs (dict): Keyword arguments for comms tested with the
provided content.

send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test

file that was sent the messages in ‘send’.
contents (bytes): Bytes contents of test file created by sending
the messages in ‘send’.
Return type:dict

cis_interface.communication.AsciiTableComm module

class cis_interface.communication.AsciiTableComm.AsciiTableComm(*args, **kwargs)[source]

Bases: cis_interface.communication.AsciiFileComm.AsciiFileComm

Class for handling I/O from/to a file on disk.

Parameters:
  • name (str) – The environment variable where communication address is stored.
  • delimiter (str, optional) – String that should be used to separate columns. If not provided and format_str is not set prior to I/O, this defaults to whitespace.
  • use_astropy (bool, optional) – If True and the astropy package is installed, it will be used to read/write the table. Defaults to False.
  • **kwargs – Additional keywords arguments are passed to parent class.
advance_in_series(*args, **kwargs)[source]

Advance to a certain file in a series.

Parameters:index (int, optional) – Index of file in the series that should be moved to. Defaults to None and call will advance to the next file in the series.
Returns:True if the file was advanced in the series, False otherwise.
Return type:bool
change_position(file_pos, series_index=None, header_was_read=None, header_was_written=None)[source]

Change the position in the file/series.

Parameters:
  • file_pos (int) – Position that should be moved to in the file.
  • series_index (int, optinal) – Index of the file in the series that should be moved to. Defaults to None and will be set to the current series index.
  • header_was_read (bool, optional) – Status of if header has been read or not. Defaults to None and will be set to the current value.
  • header_was_written (bool, optional) – Status of if header has been written or not. Defaults to None and will be set to the current value.
classmethod get_testing_options(as_array=False, **kwargs)[source]

Method to return a dictionary of testing options for this class.

Returns:
Dictionary of variables to use for testing. Key/value pairs:
kwargs (dict): Keyword arguments for comms tested with the
provided content.

send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test

file that was sent the messages in ‘send’.
contents (bytes): Bytes contents of test file created by sending
the messages in ‘send’.
Return type:dict
read_header()[source]

Read header lines from the file and update serializer info.

record_position()[source]

Record the current position in the file/series.

write_header()[source]

Write header lines to the file based on the serializer info.

cis_interface.communication.AsyncComm module

class cis_interface.communication.AsyncComm.AsyncComm(name, dont_backlog=False, **kwargs)[source]

Bases: cis_interface.communication.CommBase.CommBase

Class for handling asynchronous I/O.

Parameters:
  • name (str) – The name of the message queue.
  • dont_backlog (bool, optional) – If True, the backlog will not be started and all messages will be sent/received directly to/from the comm. Defaults to False.
  • **kwargs – Additional keyword arguments are passed to CommBase.
dont_backlog

If True, the backlog will not be started and all messages will be sent/received directly to/from the comm.

Type:bool
backlog_send_ready

Event set when there is a message in the send backlog.

Type:threading.Event
backlog_recv_ready

Event set when there is a message in the recv backlog.

Type:threading.Event
add_backlog_recv(msg)[source]

Add a message to the backlog of received messages.

Parameters:msg (str) – Received message that should be backlogged.
add_backlog_send(msg, **kwargs)[source]

Add a message to the backlog of messages to be sent.

Parameters:
  • msg (str) – Message that should be backlogged for sending.
  • **kwargs – Additional keyword arguments are added along with the message.
backlog_recv

Messages that have been received.

Type:list
backlog_send

Messages that should be sent.

Type:list
backlog_thread

Thread that will handle sinding or receiving backlogged messages.

Type:tools.CisThread
is_confirmed_recv

True if all received messages have been confirmed.

Type:bool
is_confirmed_send

True if all sent messages have been confirmed.

Type:bool
is_open

True if the backlog is open.

Type:bool
is_open_backlog

True if the backlog thread is running.

Type:bool
is_open_direct

True if the direct comm is not None.

Type:bool
n_msg_backlog

Number of messages in the backlog.

Type:int
n_msg_backlog_recv

Number of messages in the receive backlog.

Type:int
n_msg_backlog_send

Number of messages in the send backlog.

Type:int
n_msg_direct

Number of messages currently being routed.

Type:int
n_msg_direct_recv

Number of messages currently being routed in recv.

Type:int
n_msg_direct_send

Number of messages currently being routed in send.

Type:int
n_msg_recv

Number of messages in the receive backlog.

Type:int
n_msg_recv_drain

Number of messages in the receive backlog and direct comm.

Type:int
n_msg_send

Number of messages in the send backlog.

Type:int
n_msg_send_drain

Number of messages in the send backlog and direct comm.

Type:int
open()[source]

Open the connection by connecting to the queue.

pop_backlog_recv()[source]

Pop a message from the front of the recv backlog.

Returns:First backlogged recv message.
Return type:str
pop_backlog_send()[source]

Pop a message from the front of the send backlog.

Returns:
First backlogged send message and
keyword arguments.
Return type:tuple (str, dict)
printStatus(nindent=0)[source]

Print status of the communicator.

purge()[source]

Purge all messages from the comm.

recv_backlog()[source]

Check for any messages in the queue and add them to the recv backlog.

run_backlog_recv()[source]

Continue buffering received messages.

run_backlog_send()[source]

Continue trying to send buffered messages.

send_backlog()[source]

Send a message from the send backlog to the queue.

stop_backlog()[source]

Stop the asynchronous backlog, turning this into a direct comm.

exception cis_interface.communication.AsyncComm.AsyncTryAgain[source]

Bases: Exception

Exception raised when comm open, but send should be attempted again.

cis_interface.communication.ClientComm module

class cis_interface.communication.ClientComm.ClientComm(name, request_comm=None, response_kwargs=None, dont_open=False, **kwargs)[source]

Bases: cis_interface.communication.CommBase.CommBase

Class for handling Client side communication.

Parameters:
  • name (str) – The environment variable where communication address is stored.
  • request_comm (str, optional) – Comm class that should be used for the request comm. Defaults to None.
  • response_kwargs (dict, optional) – Keyword arguments for the response comm. Defaults to empty dict.
  • **kwargs – Additional keywords arguments are passed to the output comm.
response_kwargs

Keyword arguments for the response comm.

Type:dict
icomm

Response comms keyed to the ID of the associated request.

Type:dict
icomm_order

Response comm keys in the order or the requests.

Type:list
ocomm

Request comm.

Type:Comm
call(*args, **kwargs)[source]

Do RPC call. The request message is sent to the output comm and the response is received from the input comm.

Parameters:
  • *args – Arguments are passed to output comm send method.
  • **kwargs – Keyword arguments are passed to output comm send method
Returns:

Output from input comm recv method.

Return type:

obj

call_nolimit(*args, **kwargs)[source]

Alias for call.

close(*args, **kwargs)[source]

Close the connection.

classmethod comm_count()[source]

int: Number of communication connections.

create_response_comm()[source]

Create a response comm based on information from the last header.

drain_messages(direction='send', **kwargs)[source]

Sleep while waiting for messages to be drained.

is_closed

True if the connection is closed.

Type:bool
classmethod is_installed(language=None)[source]

Determine if the necessary libraries are installed for this communication class.

Parameters:language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked.
Returns:Is the comm installed.
Return type:bool
is_open

True if the connection is open.

Type:bool
maxMsgSize

Maximum size of a single message that should be sent.

Type:int
n_msg_send

The number of messages in the connection.

Type:int
n_msg_send_drain

The number of outgoing messages in the connection to drain.

Type:int
classmethod new_comm_kwargs(name, request_comm=None, **kwargs)[source]

Initialize communication with new comms.

Parameters:
  • name (str) – Name for new comm.
  • request_comm (str, optional) – Name of class for new output comm. Defaults to None.
open()[source]

Open the connection.

opp_comm_kwargs()[source]

Get keyword arguments to initialize communication with opposite comm object.

Returns:Keyword arguments for opposite comm object.
Return type:dict
opp_comms

Name/address pairs for opposite comms.

Type:dict
recv(*args, **kwargs)[source]

Receive a message from the input comm and open a new response comm for output using address from the header.

Parameters:
  • *args – Arguments are passed to input comm recv method.
  • **kwargs – Keyword arguments are passed to input comm recv method.
Returns:

Output from input comm recv method.

Return type:

obj

remove_response_comm()[source]

Remove response comm.

rpcCall(*args, **kwargs)[source]

Alias for RPCComm.call

rpcRecv(*args, **kwargs)[source]

Alias for RPCComm.recv

rpcSend(*args, **kwargs)[source]

Alias for RPCComm.send

send(*args, **kwargs)[source]

Create a response comm and then send a message to the output comm with the response address in the header.

Parameters:
  • *args – Arguments are passed to output comm send method.
  • **kwargs – Keyword arguments are passed to output comm send method.
Returns:

Output from output comm send method.

Return type:

obj

classmethod underlying_comm_class()[source]

str: Name of underlying communication class.

cis_interface.communication.CommBase module

class cis_interface.communication.CommBase.CommBase(name, address=None, direction='send', dont_open=False, is_interface=False, recv_timeout=0.0, close_on_eof_recv=True, close_on_eof_send=False, single_use=False, reverse_names=False, no_suffix=False, is_client=False, is_response_client=False, is_server=False, is_response_server=False, comm=None, matlab=False, **kwargs)[source]

Bases: cis_interface.tools.CisClass

Class for handling I/O.

Parameters:
  • name (str) – The environment variable where communication address is stored.
  • address (str, optional) – Communication info. Default to None and address is taken from the environment variable.
  • direction (str, optional) – The direction that messages should flow through the connection. ‘send’ if the connection will send messages, ‘recv’ if the connecton will receive messages. Defaults to ‘send’.
  • ( (serializer) – class:.DefaultSerialize, optional): Class with serialize and deserialize methods that should be used to process sent and received messages. Defaults to None and is constructed using provided ‘serializer_kwargs’.
  • serializer_kwargs (dict, optional) – Keyword arguments that should be passed to :class:.DefaultSerialize to create serializer. Defaults to {}.
  • format_str (str, optional) – String that should be used to format/parse messages. Default to None.
  • dont_open (bool, optional) – If True, the connection will not be opened. Defaults to False.
  • is_interface (bool, optional) – Set to True if this comm is a Python interface binding. Defaults to False.
  • recv_timeout (float, optional) – Time that should be waited for an incoming message before returning None. Defaults to 0 (no wait). A value of False indicates that recv should block.
  • close_on_eof_recv (bool, optional) – If True, the comm will be closed when it receives an end-of-file messages. Otherwise, it will remain open. Defaults to True.
  • close_on_eof_send (bool, optional) – If True, the comm will be closed after it sends an end-of-file messages. Otherwise, it will remain open. Defaults to False.
  • single_use (bool, optional) – If True, the comm will only be used to send/recv a single message. Defaults to False.
  • reverse_names (bool, optional) – If True, the suffix added to the comm with be reversed. Defaults to False.
  • no_suffix (bool, optional) – If True, no directional suffix will be added to the comm name. Defaults to False.
  • is_client (bool, optional) – If True, the comm is one of many potential clients that will be sending messages to one or more servers. Defaults to False.
  • is_response_client (bool, optional) – If True, the comm is a client-side response comm. Defaults to False.
  • is_server (bool, optional) – If True, the commis one of many potential servers that will be receiving messages from one or more clients. Defaults to False.
  • is_response_server (bool, optional) – If True, the comm is a server-side response comm. Defaults to False.
  • recv_converter (func, optional) – Converter that should be used on received objects. Defaults to None.
  • send_converter (func, optional) – Converter that should be used on sent objects. Defaults to None.
  • comm (str, optional) – The comm that should be created. This only serves as a check that the correct class is being created. Defaults to None.
  • matlab (bool, optional) – True if the comm will be accessed by Matlab code. Defaults to False.
  • **kwargs – Additional keywords arguments are passed to parent class.
name

The environment variable where communication address is stored.

Type:str
address

Communication info.

Type:str
direction

The direction that messages should flow through the connection.

Type:str
serializer (

class:.DefaultSerialize): Object that will be used to serialize/deserialize messages to/from python objects.

is_interface

True if this comm is a Python interface binding.

Type:bool
recv_timeout

Time that should be waited for an incoming message before returning None.

Type:float
close_on_eof_recv

If True, the comm will be closed when it receives an end-of-file messages. Otherwise, it will remain open.

Type:bool
close_on_eof_send

If True, the comm will be closed after it sends an end-of-file messages. Otherwise, it will remain open.

Type:bool
single_use

If True, the comm will only be used to send/recv a single message.

Type:bool
is_client

If True, the comm is one of many potential clients that will be sending messages to one or more servers.

Type:bool
is_response_client

If True, the comm is a client-side response comm.

Type:bool
is_server

If True, the comm is one of many potential servers that will be receiving messages from one or more clients.

Type:bool
is_response_server

If True, the comm is a server-side response comm.

Type:bool
is_file

True if the comm accesses a file.

Type:bool
recv_converter

Converter that should be used on received objects.

Type:func
send_converter

Converter that should be used on sent objects.

Type:func
matlab

True if the comm will be accessed by Matlab code.

Type:bool
maxMsgSize

Maximum size of a single message that should be sent.

Type:int
Raises:
  • RuntimeError – If the comm class is not installed.
  • RuntimeError – If there is not an environment variable with the specified name.
  • ValueError – If directions is not ‘send’ or ‘recv’.
add_work_comm(comm)[source]

Add work comm to dict.

Parameters:( (comm) – class:.CommBase): Comm that should be added.
Raises:KeyError – If there is already a comm associated with the key.
apply_recv_converter(msg_in)[source]

Apply recv_converter.

Parameters:msg_in (object) – Message to convert.
Returns:Converted message.
Return type:object
apply_send_converter(msg_in)[source]

Apply send converter.

Parameters:msg_in (object) – Message to convert.
Returns:Converted message.
Return type:object
atexit()[source]

Close operations.

bind()[source]

Bind in place of open.

chunk_message(msg)[source]

Yield chunks of message of size maxMsgSize

Parameters:msg (str, bytes) – Raw message bytes to be chunked.
Returns:Chunks of message.
Return type:str
classmethod cleanup_comms()[source]

Cleanup registered comms of this class.

close(linger=False)[source]

Close the connection.

Parameters:linger (bool, optional) – If True, drain messages before closing the comm. Defaults to False.
close_in_thread(no_wait=False, timeout=None)[source]

In a new thread, close the comm when it is empty.

Parameters:
  • no_wait (bool, optional) – If True, don’t wait for closing thread to stop.
  • timeout (float, optional) – Time that should be waited for the comm to close. Defaults to None and is set to self.timeout. If False, this will block until the comm is closed.
classmethod close_registry_entry(value)[source]

Close a registry entry.

comm_class

Name of communication class.

Type:str
classmethod comm_count()[source]

int: Number of communication connections.

classmethod comm_registry()[source]

dict: Registry of comms of this class.

confirm(direction=None, noblock=False)[source]

Confirm message.

confirm_recv(noblock=False)[source]

Confirm that message was received.

confirm_send(noblock=False)[source]

Confirm that sent message was received.

create_work_comm(work_comm_name=None, **kwargs)[source]

Create a temporary work comm.

Parameters:
  • work_comm_name (str, optional) – Name that should be used for the work comm. If not provided, one is created from the header id and the comm class.
  • **kwargs – Keyword arguments for new_comm that should override work_comm_kwargs.
Returns:

class:.CommBase: Work comm.

create_work_comm_kwargs

Keyword arguments for a new work comm.

Type:dict
deserialize(*args, **kwargs)[source]

Deserialize a message using the associated deserializer.

drain_messages(direction=None, timeout=None, variable=None)[source]

Sleep while waiting for messages to be drained.

empty_bytes_msg

Empty serialized message.

Type:str
empty_obj_recv

Empty message object.

Type:obj
eof_msg

Message indicating EOF.

Type:str
classmethod get_testing_options(**kwargs)[source]

Method to return a dictionary of testing options for this class.

Returns:
Dictionary of variables to use for testing. Key/value pairs:
kwargs (dict): Keyword arguments for comms tested with the
provided content.

send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test

file that was sent the messages in ‘send’.
contents (bytes): Bytes contents of test file created by sending
the messages in ‘send’.
Return type:dict
get_work_comm(header, **kwargs)[source]

Get temporary work comm, creating as necessary.

Parameters:
  • header (dict) – Information that will be sent in the message header to the work comm.
  • **kwargs – Additional keyword arguments are passed to header2workcomm.
Returns:

class:.CommBase: Work comm.

get_work_comm_kwargs

Keyword arguments for an existing work comm.

Type:dict
header2workcomm(header, work_comm_name=None, **kwargs)[source]

Get a work comm based on header info.

Parameters:
  • header (dict) – Information that will be sent in the message header to the work comm.
  • work_comm_name (str, optional) – Name that should be used for the work comm. If not provided, one is created from the header id and the comm class.
  • **kwargs – Additional keyword arguments are added to the returned dictionary.
Returns:

class:.CommBase: Work comm.

is_closed

True if the connection is closed.

Type:bool
is_confirmed

True if all messages have been confirmed.

Type:bool
is_confirmed_recv

True if all received messages have been confirmed.

Type:bool
is_confirmed_send

True if all sent messages have been confirmed.

Type:bool
is_empty_recv(msg)[source]

Check if a received message object is empty.

Parameters:msg (obj) – Message object.
Returns:True if the object is empty, False otherwise.
Return type:bool
is_empty_send(msg)[source]

Check if a message object being sent is empty.

Parameters:msg (obj) – Message object.
Returns:True if the object is empty, False otherwise.
Return type:bool
is_eof(msg)[source]

Determine if a message is an EOF.

Parameters:msg (obj) – Message object to be tested.
Returns:True if the message indicates an EOF, False otherwise.
Return type:bool
is_file = False
classmethod is_installed(language=None)[source]

Determine if the necessary libraries are installed for this communication class.

Parameters:language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked. If set to ‘any’, the result will be True if this comm is installed for any of the supported languages.
Returns:Is the comm installed.
Return type:bool
is_open

True if the connection is open.

Type:bool
linger()[source]

Wait for messages to drain.

linger_close()[source]

Wait for messages to drain, then close.

matlab_atexit()[source]

Close operations including draining receive.

maxMsgSize

Maximum size of a single message that should be sent.

Type:int
n_msg

The number of messages in the connection.

Type:int
n_msg_recv

The number of incoming messages in the connection.

Type:int
n_msg_recv_drain

The number of incoming messages in the connection to drain.

Type:int
n_msg_send

The number of outgoing messages in the connection.

Type:int
n_msg_send_drain

The number of outgoing messages in the connection to drain.

Type:int
classmethod new_comm(name, *args, **kwargs)[source]

Initialize communication with new queue.

classmethod new_comm_kwargs(*args, **kwargs)[source]

Get keyword arguments for new comm.

new_server(srv_address)[source]

Create a new server.

Parameters:srv_address (str) – Address of server comm.
on_recv(s_msg, second_pass=False)[source]

Process raw received message including handling deserializing message and handling EOF.

Parameters:
  • s_msg (bytes, str) – Raw bytes message.
  • second_pass (bool, optional) – If True, this is the second pass for a message and _last_header will not be set. Defaults to False.
Returns:

Success or failure, processed message, and

header information.

Return type:

tuple (bool, str, dict)

on_recv_eof()[source]

Actions to perform when EOF received.

Returns:Flag that should be returned for EOF.
Return type:bool
on_send(msg, header_kwargs=None)[source]

Process message to be sent including handling serializing message and handling EOF.

Parameters:
  • msg (obj) – Message to be sent
  • header_kwargs (dict, optional) – Keyword arguments that should be added to the header.
Returns:

Truth of if message should be sent, raw

bytes message to send, and header info contained in the message.

Return type:

tuple (bool, str, dict)

on_send_eof()[source]

Actions to perform when EOF being sent.

Returns:True if EOF message should be sent, False otherwise.
Return type:bool
open()[source]

Open the connection.

opp_address

Address for opposite comm.

Type:str
opp_comm_kwargs()[source]

Get keyword arguments to initialize communication with opposite comm object.

Returns:Keyword arguments for opposite comm object.
Return type:dict
opp_comms

Name/address pairs for opposite comms.

Type:dict
printStatus(nindent=0)[source]

Print status of the communicator.

purge()[source]

Purge all messages from the comm.

recv(*args, **kwargs)[source]

Receive a message.

Parameters:
  • *args – All arguments are passed to comm _recv method.
  • **kwargs – All keywords arguments are passed to comm _recv method.
Returns:

Success or failure of receive and received

message.

Return type:

tuple (bool, obj)

recv_array(*args, **kwargs)[source]

Alias for recv.

recv_dict(*args, **kwargs)[source]

Return a received message as a dictionary of fields. If there are not any fields specified, the fields will have the form ‘f0’, ‘f1’, ‘f2’, …

Parameters:
  • *args – Arguments are passed to recv.
  • **kwargs – Keyword arguments are passed to recv.
Returns:

Success/failure of receive and a dictionar of

message fields.

Return type:

tuple(bool, dict)

Raises:

recv_multipart(*args, **kwargs)[source]

Receive a multipart message. If a message is received without a header, it assumed to be complete. Otherwise, the message is received in parts from a worker comm initialized by the sender.

Parameters:
  • *args – All arguments are passed to comm _recv method.
  • **kwargs – All keywords arguments are passed to comm _recv method.
Returns:

Success or failure of receive and received

message.

Return type:

tuple (bool, str)

recv_nolimit(*args, **kwargs)[source]

Alias for recv.

register_comm(key, value)[source]

Register a comm.

remove_work_comm(key, in_thread=False, linger=False)[source]

Close and remove a work comm.

Parameters:
  • key (str) – Key of comm that should be removed.
  • in_thread (bool, optional) – If True, close the work comm in a thread. Defaults to False.
  • linger (bool, optional) – If True, drain messages before closing the comm. Defaults to False.
send(*args, **kwargs)[source]

Send a message.

Parameters:
  • *args – All arguments are assumed to be part of the message.
  • **kwargs – All keywords arguments are passed to comm _send method.
Returns:

Success or failure of send.

Return type:

bool

send_array(*args, **kwargs)[source]

Alias for send.

send_dict(args_dict, **kwargs)[source]

Send a message with fields specified in the input dictionary.

Parameters:
  • args_dict (dict) – Dictionary of arguments to send.
  • **kwargs – Additiona keyword arguments are passed to send.
Returns:

Success/failure of send.

Return type:

bool

Raises:

RuntimeError – If the field order can not be determined.

send_eof(*args, **kwargs)[source]

Send the EOF message as a short message.

Parameters:
  • *args – All arguments are passed to comm send.
  • **kwargs – All keywords arguments are passed to comm send.
Returns:

Success or failure of send.

Return type:

bool

send_multipart(msg, header_kwargs=None, **kwargs)[source]

Send a multipart message. If the message is smaller than maxMsgSize, it is sent using _send, otherwise it is sent to a worker comm using _send_multipart_worker.

Parameters:
  • msg (obj) – Message to be sent.
  • header_kwargs (dict, optional) – Keyword arguments that should be added to the header.
  • **kwargs – Additional keyword arguments are passed to _send or _send_multipart_worker.
Returns:

Success or failure of send.

Return type:

bool

send_nolimit(*args, **kwargs)[source]

Alias for send.

serialize(*args, **kwargs)[source]

Serialize a message using the associated serializer.

server_exists(srv_address)[source]

Determine if a server exists.

Parameters:srv_address (str) – Address of server comm.
Returns:
True if a server with the provided address exists, False
otherwise.
Return type:bool
signoff_from_server()[source]

Remove a client from the server.

signon_to_server()[source]

Add a client to an existing server or create one.

classmethod underlying_comm_class()[source]

str: Name of underlying communication class.

unregister_comm(key, dont_close=False)[source]

Unregister a comm.

wait_for_confirm(timeout=None, direction=None, active_confirm=False, noblock=False)[source]

Sleep until all messages are confirmed.

workcomm2header(work_comm, **kwargs)[source]

Get header information from a comm.

Parameters:
  • ( (work_comm) – class:.CommBase): Work comm that header describes.
  • **kwargs – Additional keyword arguments are added to the header.
Returns:

Header information that will be sent with a message.

Return type:

dict

class cis_interface.communication.CommBase.CommServer(srv_address, cli_address=None, **kwargs)[source]

Bases: cis_interface.tools.CisThreadLoop

Basic server object to keep track of clients.

cli_count

Number of clients that have connected to this server.

Type:int
add_client()[source]

Increment the client count.

remove_client()[source]

Decrement the client count, closing the server if all clients done.

class cis_interface.communication.CommBase.CommThreadLoop(comm, name=None, suffix='CommThread', **kwargs)[source]

Bases: cis_interface.tools.CisThreadLoop

Thread loop for comms to ensure cleanup.

Parameters:
  • ( (comm) – class:.CommBase): Comm class that thread is for.
  • name (str, optional) – Name for the thread. If not provided, one is created by combining the comm name and the provided suffix.
  • suffix (str, optional) – Suffix that should be added to comm name to name the thread. Defaults to ‘CommThread’.
  • **kwargs – Additional keyword arguments are passed to the parent class.
comm (

class:.CommBase): Comm class that thread is for.

on_main_terminated()[source]

Actions taken on the backlog thread when the main thread stops.

cis_interface.communication.CommBase.cleanup_comms(comm_class, close_func=None)[source]

Clean up comms of a certain type.

Parameters:comm_class (str) – Comm class that should be cleaned up.
Returns:Number of comms closed.
Return type:int
cis_interface.communication.CommBase.get_comm_registry(comm_class)[source]

Get the comm registry for a comm class.

Parameters:comm_class (str) – Comm class to get registry for.
Returns:Dictionary of registered comm objects.
Return type:dict
cis_interface.communication.CommBase.is_registered(comm_class, key)[source]

Determine if a comm object has been registered under the specified key.

Parameters:
  • comm_class (str) – Comm class to check for the key under.
  • key (str) – Key that should be checked.
cis_interface.communication.CommBase.register_comm(comm_class, key, value)[source]

Add a comm object to the global registry.

Parameters:
  • comm_class (str) – Comm class to register the object under.
  • key (str) – Key that should be used to register the object.
  • value (obj) – Object being registered.
cis_interface.communication.CommBase.unregister_comm(comm_class, key, dont_close=False)[source]

Remove a comm object from the global registry and close it.

Parameters:
  • comm_class (str) – Comm class to check for key under.
  • key (str) – Key for object that should be removed from the registry.
  • dont_close (bool, optional) – If True, the comm will be removed from the registry, but it won’t be closed. Defaults to False.
Returns:

True if an object was closed.

Return type:

bool

cis_interface.communication.ErrorComm module

cis_interface.communication.ErrorComm.ErrorComm(name, base_comm='CommBase', **kwargs)[source]

Wrapper to return errored version of a comm class.

Parameters:
  • name (str) – The environment variable where communication address is stored.
  • base_comm (str, optional) – Name of the base comm that should be used. Defaults to ‘CommBase’.
  • **kwargs – Additional keyword arguments are passed to the class constructor.
Returns:

Instance of a comm class that will raise an error at the

requested locaiton.

Return type:

ErrorClass

cis_interface.communication.FileComm module

class cis_interface.communication.FileComm.FileComm(*args, **kwargs)[source]

Bases: cis_interface.communication.CommBase.CommBase

Class for handling I/O from/to a file on disk.

>>> x = FileComm('test_send', address='test_file.txt', direction='send')
>>> x.send('Test message')
True
>>> with open('test_file.txt', 'r') as fd:
...     print(fd.read())
Test message
>>> x = FileComm('test_recv', address='test_file.txt', direction='recv')
>>> x.recv()
(True, b'Test message')
Parameters:
  • name (str) – The environment variable where communication address is stored.
  • read_meth (str, optional) – Method that should be used to read data from the file. Defaults to ‘read’. Ignored if direction is ‘send’.
  • append (bool, optional) – If True and writing, file is openned in append mode. Defaults to False.
  • in_temp (bool, optional) – If True, the path will be considered relative to the platform temporary directory. Defaults to False.
  • open_as_binary (bool, optional) – If True, the file is opened in binary mode. Defaults to True.
  • newline (str, optional) – String indicating a new line. Defaults to serialize._default_newline.
  • is_series (bool, optional) – If True, input/output will be done to a series of files. If reading, each file will be processed until the end is reached. If writing, each output will be to a new file in the series. The addressed is assumed to contain a format for the index of the file. Defaults to False.
  • wait_for_creation (float, optional) – Time (in seconds) that should be waited before opening for the file to be created if it dosn’t exist. Defaults to 0 s and file will attempt to be opened immediately.
  • **kwargs – Additional keywords arguments are passed to parent class.
fd

File that should be read/written.

Type:file
read_meth

Method that should be used to read data from the file.

Type:str
append

If True and writing, file is openned in append mode.

Type:bool
in_temp

If True, the path will be considered relative to the platform temporary directory.

Type:bool
open_as_binary

If True, the file is opened in binary mode.

Type:bool
newline

String indicating a new line.

Type:str
is_series

If True, input/output will be done to a series of files. If reading, each file will be processed until the end is reached. If writing, each output will be to a new file in the series.

Type:bool
platform_newline

String indicating a newline on the current platform.

Type:str
Raises:ValueError – If the read_meth is not one of the supported values.
advance_in_file(file_pos)[source]

Advance to a certain position in the current file.

Parameters:file_pos (int) – Position that should be moved to in the current. file.
advance_in_series(series_index=None)[source]

Advance to a certain file in a series.

Parameters:series_index (int, optional) – Index of file in the series that should be moved to. Defaults to None and call will advance to the next file in the series.
Returns:True if the file was advanced in the series, False otherwise.
Return type:bool
change_position(file_pos, series_index=None)[source]

Change the position in the file/series.

Parameters:
  • file_pos (int) – Position that should be moved to in the file.
  • series_index (int, optinal) – Index of the file in the series that should be moved to. Defaults to None and will be set to the current series index.
classmethod close_registry_entry(value)[source]

Close a registry entry.

current_address

Address of file currently being used.

Type:str
fd

Associated file identifier.

get_series_address(index=None)[source]

Get the address of a file in the series.

Parameters:index (int, optional) – Index in series to get address for. Defaults to None and the current index is used.
Returns:Address for the file in the series.
Return type:str
classmethod get_testing_options(read_meth='read', open_as_binary=True, **kwargs)[source]

Method to return a dictionary of testing options for this class.

Returns:
Dictionary of variables to use for testing. Key/value pairs:
kwargs (dict): Keyword arguments for comms tested with the
provided content.

send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test

file that was sent the messages in ‘send’.
contents (bytes): Bytes contents of test file created by sending
the messages in ‘send’.
Return type:dict
is_file = True
classmethod is_installed(language=None)[source]

Determine if the necessary libraries are installed for this communication class.

Parameters:language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked.
Returns:Is the comm installed.
Return type:bool
is_open

True if the connection is open.

Type:bool
n_msg_recv

The number of messages in the file.

Type:int
classmethod new_comm_kwargs(*args, **kwargs)[source]

Initialize communication with new queue.

on_send_eof()[source]

Close file when EOF to be sent.

Returns:False so that message not sent.
Return type:bool
open()[source]

Open the file.

open_mode

Mode that should be used to open the file.

Type:str
opp_comm_kwargs()[source]

Get keyword arguments to initialize communication with opposite comm object.

Returns:Keyword arguments for opposite comm object.
Return type:dict
purge()[source]

Purge all messages from the comm.

record_position()[source]

Record the current position in the file/series.

registry_key

String used to register the socket.

Type:str
remaining_bytes

Remaining bytes in the file.

Type:int
remove_file()[source]

Remove the file.

classmethod underlying_comm_class()[source]

str: Name of underlying communication class.

cis_interface.communication.ForkComm module

class cis_interface.communication.ForkComm.ForkComm(name, comm=None, **kwargs)[source]

Bases: cis_interface.communication.CommBase.CommBase

Class for receiving/sending messages from/to multiple comms.

Parameters:
  • name (str) – The environment variable where communication address is stored.
  • comm (list, optional) – The list of options for the comms that should be bundled. If not provided, the bundle will be empty.
  • **kwargs – Additional keyword arguments are passed to the parent class.
comm_list

Comms included in this fork.

Type:list
curr_comm_index

Index comm that next receive will be from.

Type:int
bind()[source]

Bind in place of open.

close(*args, **kwargs)[source]

Close the connection.

close_in_thread(*args, **kwargs)[source]

In a new thread, close the comm when it is empty.

confirm_recv(noblock=False)[source]

Confirm that message was received.

confirm_send(noblock=False)[source]

Confirm that sent message was received.

curr_comm

Current comm.

Type:CommBase
is_confirmed_recv

True if all received messages have been confirmed.

Type:bool
is_confirmed_send

True if all sent messages have been confirmed.

Type:bool
is_open

True if the connection is open.

Type:bool
maxMsgSize

Maximum size of a single message that should be sent.

Type:int
n_msg_recv

The number of incoming messages in the connection.

Type:int
n_msg_recv_drain

The number of incoming messages in the connection to drain.

Type:int
n_msg_send

The number of outgoing messages in the connection.

Type:int
n_msg_send_drain

The number of outgoing messages in the connection to drain.

Type:int
classmethod new_comm_kwargs(name, *args, **kwargs)[source]

Get keyword arguments for new comm.

open()[source]

Open the connection.

opp_comm_kwargs()[source]

Get keyword arguments to initialize communication with opposite comm object.

Returns:Keyword arguments for opposite comm object.
Return type:dict
opp_comms

Name/address pairs for opposite comms.

Type:dict
printStatus(nindent=0)[source]

Print status of the communicator.

purge()[source]

Purge all messages from the comm.

recv(*args, **kwargs)[source]

Receive a message.

Parameters:
  • *args – All arguments are passed to comm _recv method.
  • **kwargs – All keywords arguments are passed to comm _recv method.
Returns:

Success or failure of receive and received

message.

Return type:

tuple (bool, obj)

send(*args, **kwargs)[source]

Send a message.

Parameters:
  • *args – All arguments are assumed to be part of the message.
  • **kwargs – All keywords arguments are passed to comm _send method.
Returns:

Success or failure of send.

Return type:

bool

cis_interface.communication.ForkComm.get_comm_name(name, i)[source]

Get the name of the ith comm in the series.

Parameters:
  • name (str) – Name of the fork comm.
  • i (int) – Index of comm in fork bundle.
Returns:

Name of ith comm in fork bundle.

Return type:

str

cis_interface.communication.IPCComm module

class cis_interface.communication.IPCComm.IPCComm(name, dont_backlog=False, **kwargs)[source]

Bases: cis_interface.communication.AsyncComm.AsyncComm

Class for handling I/O via IPC message queues.

q

Message queue.

Type:sysv_ipc.MessageQueue
bind()[source]

Bind to random queue if address is generate.

classmethod close_registry_entry(value)[source]

Close a registry entry.

confirm_recv(noblock=False)[source]

Confirm that message was received.

confirm_send(noblock=False)[source]

Confirm that sent message was received.

classmethod is_installed(language=None)[source]

Determine if the necessary libraries are installed for this communication class.

Parameters:language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked.
Returns:Is the comm installed.
Return type:bool
is_open_direct

True if the queue is not None.

Type:bool
n_msg_direct_recv

Number of messages in the queue to recv.

Type:int
n_msg_direct_send

Number of messages in the queue to send.

Type:int
classmethod new_comm_kwargs(*args, **kwargs)[source]

Initialize communication with new queue.

open_after_bind()[source]

Open the connection by getting the queue from the bound address.

purge()[source]

Purge all messages from the comm.

classmethod underlying_comm_class()[source]

str: Name of underlying communication class.

class cis_interface.communication.IPCComm.IPCServer(srv_address, cli_address=None, **kwargs)[source]

Bases: cis_interface.communication.CommBase.CommServer

IPC server object for cleaning up server queue.

terminate(*args, **kwargs)[source]

Also set break flag.

cis_interface.communication.IPCComm.get_queue(qid=None)[source]

Create or return a sysv_ipc.MessageQueue and register it.

Parameters:qid (int, optional) – If provided, ID for existing queue that should be returned. Defaults to None and a new queue is returned.
Returns:Message queue.
Return type:sysv_ipc.MessageQueue
cis_interface.communication.IPCComm.ipc_queues()[source]

Get a list of active IPC queues.

Returns:List of IPC queues.
Return type:list
cis_interface.communication.IPCComm.ipcrm(options=[])[source]

Remove IPC constructs using the ipcrm command.

Parameters:options (list) – List of flags that should be used. Defaults to an empty list.
cis_interface.communication.IPCComm.ipcrm_queues(queue_keys=None)[source]

Delete existing IPC queues.

Parameters:queue_keys (list, str, optional) – A list of keys for queues that should be removed. Defaults to all existing queues.
cis_interface.communication.IPCComm.ipcs(options=[])[source]

Get the output from running the ipcs command.

Parameters:options (list) – List of flags that should be used. Defaults to an empty list.
Returns:Captured output.
Return type:str
cis_interface.communication.IPCComm.remove_queue(mq)[source]

Remove a sysv_ipc.MessageQueue and unregister it.

Parameters:mq (sysv_ipc.MessageQueue) –
Raises:KeyError – If the provided queue is not registered.

cis_interface.communication.JSONFileComm module

class cis_interface.communication.JSONFileComm.JSONFileComm(*args, **kwargs)[source]

Bases: cis_interface.communication.FileComm.FileComm

Class for handling I/O from/to a JSON file on disk.

Parameters:
  • name (str) – The environment variable where file path is stored.
  • **kwargs – Additional keywords arguments are passed to parent class.
classmethod get_testing_options()[source]

Method to return a dictionary of testing options for this class.

Returns:
Dictionary of variables to use for testing. Key/value pairs:
kwargs (dict): Keyword arguments for comms tested with the
provided content.

send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test

file that was sent the messages in ‘send’.
contents (bytes): Bytes contents of test file created by sending
the messages in ‘send’.
Return type:dict

cis_interface.communication.MatFileComm module

class cis_interface.communication.MatFileComm.MatFileComm(name, **kwargs)[source]

Bases: cis_interface.communication.FileComm.FileComm

Class for handling I/O from/to a Matlab .mat file on disk.

Parameters:
  • name (str) – The environment variable where file path is stored.
  • **kwargs – Additional keywords arguments are passed to parent class.
classmethod get_testing_options()[source]

Method to return a dictionary of testing options for this class.

Returns:
Dictionary of variables to use for testing. Key/value pairs:
kwargs (dict): Keyword arguments for comms tested with the
provided content.

send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test

file that was sent the messages in ‘send’.
contents (bytes): Bytes contents of test file created by sending
the messages in ‘send’.
Return type:dict

cis_interface.communication.ObjFileComm module

class cis_interface.communication.ObjFileComm.ObjFileComm(*args, **kwargs)[source]

Bases: cis_interface.communication.PlyFileComm.PlyFileComm

Class for handling I/O from/to a .obj file on disk.

Parameters:
  • name (str) – The environment variable where communication address is stored.
  • **kwargs – Additional keywords arguments are passed to parent class.
classmethod get_testing_options()[source]

Method to return a dictionary of testing options for this class.

Returns:
Dictionary of variables to use for testing. Key/value pairs:
kwargs (dict): Keyword arguments for comms tested with the
provided content.

send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test

file that was sent the messages in ‘send’.
contents (bytes): Bytes contents of test file created by sending
the messages in ‘send’.
Return type:dict

cis_interface.communication.PandasFileComm module

class cis_interface.communication.PandasFileComm.PandasFileComm(*args, **kwargs)[source]

Bases: cis_interface.communication.AsciiTableComm.AsciiTableComm

Class for handling I/O from/to a pandas csv file on disk.

Parameters:
  • name (str) – The environment variable where communication address is stored.
  • delimiter (str, optional) – String that should be used to separate columns. Defaults to ‘t’.
  • **kwargs – Additional keywords arguments are passed to parent class.
classmethod get_testing_options(as_frames=False, no_names=False)[source]

Method to return a dictionary of testing options for this class.

Parameters:
  • as_frames (bool, optional) – If True, the test objects will be Pandas data frames. Defaults to False.
  • no_names (bool, optional) – If True, an example is returned where the names are not provided to the deserializer. Defaults to False.
Returns:

Dictionary of variables to use for testing. Key/value pairs:
kwargs (dict): Keyword arguments for comms tested with the

provided content.

send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test

file that was sent the messages in ‘send’.

contents (bytes): Bytes contents of test file created by sending

the messages in ‘send’.

Return type:

dict

header_was_written

True if head has been written to the current file.

Type:bool
read_header()[source]

Read header lines from the file and update serializer info.

write_header()[source]

Write header lines to the file based on the serializer info.

cis_interface.communication.PandasFileComm.pandas_recv_converter(obj)[source]

Performs conversion to a limited set of objects from a Pandas data frame for receiving from a file via PandasFileComm. Currently supports converting to lists/tuples of numpy arrays.

Parameters:obj (pandas.DataFrame) – Data frame to convert.
Returns:
pandas.DataFrame: Converted data frame (or unmodified input if conversion
could not be completed.
Return type:list
cis_interface.communication.PandasFileComm.pandas_send_converter(obj)[source]

Performs conversion from a limited set of objects to a Pandas data frame for sending to a file via PandasFileComm. Currently supports converting from structured numpy arrays, lists/tuples of numpy arrays, and dictionaries.

Parameters:obj (object) – Object to convert.
Returns:
Converted data frame (or unmodified input if conversion
could not be completed.
Return type:pandas.DataFrame

cis_interface.communication.PickleFileComm module

class cis_interface.communication.PickleFileComm.PickleFileComm(name, **kwargs)[source]

Bases: cis_interface.communication.FileComm.FileComm

Class for handling I/O from/to a pickled file on disk.

Parameters:
  • name (str) – The environment variable where file path is stored.
  • **kwargs – Additional keywords arguments are passed to parent class.
classmethod get_testing_options()[source]

Method to return a dictionary of testing options for this class.

Returns:
Dictionary of variables to use for testing. Key/value pairs:
kwargs (dict): Keyword arguments for comms tested with the
provided content.

send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test

file that was sent the messages in ‘send’.
contents (bytes): Bytes contents of test file created by sending
the messages in ‘send’.
Return type:dict

cis_interface.communication.PlyFileComm module

class cis_interface.communication.PlyFileComm.PlyFileComm(*args, **kwargs)[source]

Bases: cis_interface.communication.FileComm.FileComm

Class for handling I/O from/to a .ply file on disk.

Parameters:
  • name (str) – The environment variable where communication address is stored.
  • **kwargs – Additional keywords arguments are passed to parent class.
classmethod get_testing_options()[source]

Method to return a dictionary of testing options for this class.

Returns:
Dictionary of variables to use for testing. Key/value pairs:
kwargs (dict): Keyword arguments for comms tested with the
provided content.

send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test

file that was sent the messages in ‘send’.
contents (bytes): Bytes contents of test file created by sending
the messages in ‘send’.
Return type:dict

cis_interface.communication.RMQAsyncComm module

class cis_interface.communication.RMQAsyncComm.RMQAsyncComm(name, dont_backlog=False, **kwargs)[source]

Bases: cis_interface.communication.RMQComm.RMQComm

Class for handling asynchronous RabbitMQ communications. It is not recommended to use this class as it can leave hanging threads if not closed propertly. The normal RMQComm will cover most use cases.

Parameters:
  • name (str) – The environment variable where the comm address is stored.
  • dont_open (bool, optional) – If True, the connection will not be opened. Defaults to False.
  • **kwargs – Additional keyword arguments are passed to CommBase.
times_connected

Number of times that this connections has been established.

Type:int
rmq_thread

Thread used to run IO loop.

Type:tools.CisThread
bind()[source]

Declare queue to get random new queue.

channel_open

True if connection ready for messages, False otherwise.

Type:bool
channel_stable

True if the connection ready for messages and not about to close. False otherwise.

Type:bool
close_connection()[source]

Stop the ioloop and close the connection.

connect()[source]

Establish the connection.

force_close()[source]

Force stop by removing the queue and stopping the IO loop.

get_queue_result()[source]

Get the fram from passive queue declare.

n_msg_direct_recv

Number of messages in the queue.

Type:int
new_run_thread(name=None)[source]

Get a new thread for running.

on_bindok(unused_frame)[source]

Actions to perform once the queue is succesfully bound. Start consuming messages.

on_cancelok(unused_frame)[source]

Actions to perform after succesfully cancelling consumption. Closes the channel.

on_channel_closed(channel, *args)[source]

Actions to perform when the channel is closed. Close the connection.

on_channel_open(channel)[source]

Actions to perform after a channel is opened. Add the channel close callback and setup the exchange.

on_connection_closed(connection, *args)[source]

Actions that must be taken when the connection is closed. Set the channel to None. If the connection is meant to be closing, stop the IO loop. Otherwise, wait 5 seconds and try to reconnect.

on_connection_open(connection)[source]

Actions that must be taken when the connection is opened. Add the close connection callback and open the RabbitMQ channel.

on_connection_open_error(unused_connection)[source]

Actions that must be taken when the connection fails to open.

on_exchange_declareok(unused_frame)[source]

Actions to perform once an exchange is succesfully declared. Set up the queue.

on_message(ch, method, props, body)[source]

Buffer received messages.

on_queue_declareok(method_frame)[source]

Actions to perform once the queue is succesfully declared. Bind the queue.

open_channel()[source]

Open a RabbitMQ channel.

purge()[source]

Remove all messages from the associated queue.

reconnect()[source]

Try to re-establish a connection and resume a new IO loop.

rmq_lock

Lock associated with RMQ ioloop thread.

run_thread()[source]

Connect to the connection and begin the IO loop.

setup_exchange(exchange_name)[source]

Setup the exchange.

setup_queue()[source]

Set up the message queue.

start_run_thread()[source]

Start the run thread and wait for it to finish.

cis_interface.communication.RMQComm module

class cis_interface.communication.RMQComm.RMQComm(name, dont_backlog=False, **kwargs)[source]

Bases: cis_interface.communication.AsyncComm.AsyncComm

Class for handling basic RabbitMQ communications.

connection

RabbitMQ connection.

Type:pika.Connection
channel

RabbitMQ channel.

Type:pika.Channel
Raises:RuntimeError – If a connection cannot be established.
bind()[source]

Declare queue to get random new queue.

close_channel()[source]

Close the channel if it exists.

close_connection()[source]

Close the connection.

close_queue()[source]

Close the queue if the channel exists.

create_work_comm_kwargs

Keyword arguments for a new work comm.

Type:dict
exchange

AMQP exchange.

Type:str
get_queue_result()[source]

Get the fram from passive queue declare.

get_work_comm_kwargs

Keyword arguments for an existing work comm.

Type:dict
is_confirmed_recv

True if all received messages have been confirmed.

Type:bool
is_confirmed_send

True if all sent messages have been confirmed.

Type:bool
classmethod is_installed(language=None)[source]

Determine if the necessary libraries are installed for this communication class.

Parameters:language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked.
Returns:Is the comm installed.
Return type:bool
is_open_direct

True if the connection and channel are open.

Type:bool
n_msg_direct_recv

Number of messages in the queue.

Type:int
n_msg_direct_send

Number of messages in the queue.

Type:int
classmethod new_comm_kwargs(name, user=None, password=None, host=None, virtual_host=None, port=None, exchange=None, queue='', **kwargs)[source]

Initialize communication with new connection.

Parameters:
  • name (str) – Name of new connection.
  • user (str, optional) – RabbitMQ server username. Defaults to config option ‘user’ in section ‘rmq’ if it exists and ‘guest’ if it does not.
  • password (str, optional) – RabbitMQ server password. Defaults to config option ‘password’ in section ‘rmq’ if it exists and ‘guest’ if it does not.
  • host (str, optional) – RabbitMQ server host. Defaults to config option ‘host’ in section ‘rmq’ if it exists and ‘localhost’ if it does not. If ‘localhost’, the output of socket.gethostname() is used.
  • virtual_host (str, optional) – RabbitMQ server virtual host. Defaults to config option ‘vhost’ in section ‘rmq’ if it exists and ‘/’ if it does not.
  • port (str, optional) – Port on host to use. Defaults to config option ‘port’ in section ‘rmq’ if it exists and ‘5672’ if it does not.
  • exchange (str, optional) – RabbitMQ exchange. Defaults to config option ‘namespace’ in section ‘rmq’ if it exits and ‘’ if it does not.
  • queue (str, optional) – Name of the queue that messages will be send to or received from. If an empty string, the queue will be a random string and exclusive to a receiving comm. Defaults to ‘’.
  • **kwargs – Additional keywords arguments are returned as keyword arguments for the new comm.
Returns:

Arguments and keyword arguments for new comm.

Return type:

tuple(tuple, dict)

opp_comm_kwargs()[source]

Get keyword arguments to initialize communication with opposite comm object.

Returns:Keyword arguments for opposite comm object.
Return type:dict
purge()[source]

Remove all messages from the associated queue.

queue

AMQP queue.

Type:str
classmethod underlying_comm_class()[source]

str: Name of underlying communication class.

url

AMQP server address.

Type:str
class cis_interface.communication.RMQComm.RMQServer(srv_address, cli_address=None, **kwargs)[source]

Bases: cis_interface.communication.CommBase.CommServer

RMQ server object for cleaning up server connections.

terminate(*args, **kwargs)[source]

Also set break flag.

cis_interface.communication.RMQComm.check_rmq_server(url=None, **kwargs)[source]

Check that connection to a RabbitMQ server is possible.

Parameters:
  • url (str, optional) – Address of RMQ server that includes all the necessary information. If this is provided, the remaining arguments are ignored. Defaults to None and the connection parameters are taken from the other arguments.
  • username (str, optional) – RMQ server username. Defaults to config value.
  • password (str, optional) – RMQ server password. Defaults to config value.
  • host (str, optional) – RMQ hostname or IP Address to connect to. Defaults to config value.
  • port (str, optional) – RMQ server TCP port to connect to. Defaults to config value.
  • vhost (str, optional) – RMQ virtual host to use. Defaults to config value.
Returns:

True if connection to RabbitMQ server is possible, False

otherwise.

Return type:

bool

cis_interface.communication.ServerComm module

class cis_interface.communication.ServerComm.ServerComm(name, request_comm=None, response_kwargs=None, dont_open=False, **kwargs)[source]

Bases: cis_interface.communication.CommBase.CommBase

Class for handling Server side communication.

Parameters:
  • name (str) – The environment variable where communication address is stored.
  • request_comm (str, optional) – Comm class that should be used for the request comm. Defaults to None.
  • response_kwargs (dict, optional) – Keyword arguments for the response comm. Defaults to empty dict.
  • **kwargs – Additional keywords arguments are passed to the input comm.
response_kwargs

Keyword arguments for the response comm.

Type:dict
icomm

Request comm.

Type:Comm
ocomm

Response comm for last request.

Type:Comm
close(*args, **kwargs)[source]

Close the connection.

classmethod comm_count()[source]

int: Number of communication connections.

create_response_comm()[source]

Create a response comm based on information from the last header.

drain_messages(direction='recv', **kwargs)[source]

Sleep while waiting for messages to be drained.

is_closed

True if the connection is closed.

Type:bool
classmethod is_installed(language=None)[source]

Determine if the necessary libraries are installed for this communication class.

Parameters:language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked.
Returns:Is the comm installed.
Return type:bool
is_open

True if the connection is open.

Type:bool
maxMsgSize

Maximum size of a single message that should be sent.

Type:int
n_msg_recv

The number of messages in the connection.

Type:int
n_msg_recv_drain

The number of messages in the connection to drain.

Type:int
classmethod new_comm_kwargs(name, request_comm=None, **kwargs)[source]

Initialize communication with new comms.

Parameters:
  • name (str) – Name for new comm.
  • request_comm (str, optional) – Name of class for new input comm. Defaults to None.
open()[source]

Open the connection.

opp_comm_kwargs()[source]

Get keyword arguments to initialize communication with opposite comm object.

Returns:Keyword arguments for opposite comm object.
Return type:dict
opp_comms

Name/address pairs for opposite comms.

Type:dict
purge()[source]

Purge input and output comms.

recv(*args, **kwargs)[source]

Receive a message from the input comm and open a new response comm for output using address from the header.

Parameters:
  • *args – Arguments are passed to input comm recv method.
  • **kwargs – Keyword arguments are passed to input comm recv method.
Returns:

Output from input comm recv method.

Return type:

obj

remove_response_comm()[source]

Remove response comm.

rpcRecv(*args, **kwargs)[source]

Alias for RPCComm.recv

rpcSend(*args, **kwargs)[source]

Alias for RPCComm.send

send(*args, **kwargs)[source]

Send a message to the output comm.

Parameters:
  • *args – Arguments are passed to output comm send method.
  • **kwargs – Keyword arguments are passed to output comm send method.
Returns:

Output from output comm send method.

Return type:

obj

classmethod underlying_comm_class()[source]

str: Name of underlying communication class.

cis_interface.communication.YAMLFileComm module

class cis_interface.communication.YAMLFileComm.YAMLFileComm(*args, **kwargs)[source]

Bases: cis_interface.communication.FileComm.FileComm

Class for handling I/O from/to a YAML file on disk.

Parameters:
  • name (str) – The environment variable where file path is stored.
  • **kwargs – Additional keywords arguments are passed to parent class.
classmethod get_testing_options()[source]

Method to return a dictionary of testing options for this class.

Returns:
Dictionary of variables to use for testing. Key/value pairs:
kwargs (dict): Keyword arguments for comms tested with the
provided content.

send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test

file that was sent the messages in ‘send’.
contents (bytes): Bytes contents of test file created by sending
the messages in ‘send’.
Return type:dict

cis_interface.communication.ZMQComm module

class cis_interface.communication.ZMQComm.ZMQComm(name, dont_backlog=False, **kwargs)[source]

Bases: cis_interface.communication.AsyncComm.AsyncComm

Class for handling I/O using ZeroMQ sockets.

Parameters:
  • name (str) – The environment variable where the socket address is stored.
  • context (zmq.Context, optional) – ZeroMQ context that should be used. Defaults to None and the global context is used.
  • socket_type (str, optional) – The type of socket that should be created. Defaults to _default_socket_type. See zmq for all options.
  • socket_action (str, optional) – The action that the socket should perform. Defaults to action based on the direction (‘connect’ for ‘recv’, ‘bind’ for ‘send’.)
  • topic_filter (str, optional) – Message filter to use when subscribing. This is only used for ‘SUB’ socket types. Defaults to ‘’ which is all messages.
  • dealer_identity (str, optional) – Identity that should be used to route messages to a dealer socket. Defaults to ‘0’.
  • **kwargs – Additional keyword arguments are passed to :class:.CommBase.
context

ZeroMQ context that will be used.

Type:zmq.Context
socket

ZeroMQ socket.

Type:zmq.Socket
socket_type_name

The type of socket that should be created.

Type:str
socket_type

ZeroMQ socket type.

Type:int
socket_action

The action that the socket should perform.

Type:str, optional
topic_filter

Message filter to use when subscribing.

Type:str
dealer_identity

Identity that should be used to route messages to a dealer socket.

Type:str
address_param

Address parameters.

Type:dict
bind()[source]

Bind to address, getting random port as necessary.

check_reply_socket_recv(msg)[source]

Check incoming message for reply address.

Parameters:msg (str) – Incoming message to check.
Returns:Messages with reply address removed if present.
Return type:str
check_reply_socket_send(msg)[source]

Append reply socket address if it

Parameters:msg (str) – Message that will be piggy backed on.
Returns:Message with reply address if it has not been sent.
Return type:str
classmethod close_registry_entry(value)[source]

Close a registry entry.

confirm_recv(noblock=False)[source]

Confirm that message was received.

confirm_send(noblock=False)[source]

Confirm that sent message was received.

connect()[source]

Connect to address.

create_work_comm_kwargs

Keyword arguments for a new work comm.

Type:dict
get_work_comm_kwargs

Keyword arguments for an existing work comm.

Type:dict
header2workcomm(header, **kwargs)[source]

Get a work comm based on header info.

Parameters:
  • header (dict) – Information that will be sent in the message header to the work comm.
  • **kwargs – Additional keyword arguments are passed to the parent method.
Returns:

class:.CommBase: Work comm.

host

Host that socket is connected to.

Type:str
is_confirmed_recv

True if all received messages have been confirmed.

Type:bool
classmethod is_installed(language=None)[source]

Determine if the necessary libraries are installed for this communication class.

Parameters:language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked.
Returns:Is the comm installed.
Return type:bool
is_message(flags)[source]

Poll the socket for a message.

Parameters:flags (int) – ZMQ poll flags.
Returns:True if there is a message matching the flags, False otherwise.
Return type:bool
is_open_direct

True if the socket is open.

Type:bool
n_msg_direct_recv

Number of messages currently being routed from recv.

Type:int
n_msg_direct_send

Number of messages currently being routed.

Type:int
classmethod new_comm_kwargs(name, protocol=None, host=None, port=None, **kwargs)[source]

Initialize communication with new queue.

Parameters:
  • name (str) – Name of new socket.
  • protocol (str, optional) – The protocol that should be used. Defaults to None and is set to _default_protocol. See zmq for details.
  • host (str, optional) – The host that should be used. Invalid for ‘inproc’ protocol. Defaults to ‘localhost’.
  • port (int, optional) – The port used. Invalid for ‘inproc’ protocol. Defaults to None and a random port is choosen.
  • **kwargs – Additional keywords arguments are returned as keyword arguments for the new comm.
Returns:

Arguments and keyword arguments for new socket.

Return type:

tuple(tuple, dict)

on_send(msg, header_kwargs=None)[source]

Process message to be sent including handling serializing message and handling EOF.

Parameters:
  • msg (obj) – Message to be sent
  • header_kwargs (dict, optional) – Keyword arguments that should be added to the header.
Returns:

Truth of if message should be sent, raw

bytes message to send, and header info contained in the message.

Return type:

tuple (bool, str, dict)

on_send_eof()[source]

Actions to perform when EOF being sent.

Returns:True if EOF message should be sent, False otherwise.
Return type:bool
opp_address

Address for opposite comm.

Type:str
opp_comm_kwargs()[source]

Get keyword arguments to initialize communication with opposite comm object.

Returns:Keyword arguments for opposite comm object.
Return type:dict
port

Port that socket is connected to.

Type:str
printStatus(nindent=0)[source]

Print status of the communicator.

protocol

Protocol that socket uses.

Type:str
registry_key

String used to register the socket.

Type:str
server_exists(srv_address)[source]

Determine if a server exists.

Parameters:srv_address (str) – Address of server comm.
Returns:
True if a server with the provided address exists, False
otherwise.
Return type:bool
set_reply_socket_recv(address)[source]

Set the recv reply socket if the address dosn’t exist.

set_reply_socket_send()[source]

Set the send reply socket if it dosn’t exist.

unbind(dont_close=False)[source]

Unbind from address.

classmethod underlying_comm_class()[source]

str: Name of underlying communication class.

workcomm2header(work_comm, **kwargs)[source]

Get header information from a comm.

Parameters:
  • ( (work_comm) – class:.CommBase): Work comm that header describes.
  • **kwargs – Additional keyword arguments are added to the header.
Returns:

Header information that will be sent with a message.

Return type:

dict

class cis_interface.communication.ZMQComm.ZMQProxy(srv_address, context=None, retry_timeout=-1, nretry=1, **kwargs)[source]

Bases: cis_interface.communication.CommBase.CommServer

Start a proxy in a new thread for a server address. A client-side address will be randomly generated.

Parameters:
  • srv_address (str) – Address that should face the server(s).
  • context (zmq.Context, optional) – ZeroMQ context that should be used. Defaults to None and the global context is used.
  • protocol (str, optional) – Protocol that should be used for the sockets. Defaults to None and is set to _default_protocol.
  • host (str, optional) – Host for socket address. Defaults to ‘localhost’.
  • retry_timeout (float, optional) – Time (in seconds) that should be waited before retrying to bind the sockets to the addresses. If negative, a retry will not be attempted and an error will be raised. Defaults to -1.
  • nretry (int, optional) – Number of times to try binding the sockets to the addresses. Defaults to 1.
  • **kwargs – Additional keyword arguments are passed to the parent class.
srv_address

Address that faces the server(s).

Type:str
cli_address

Address that faces the client(s).

Type:str
context

ZeroMQ context that will be used.

Type:zmq.Context
srv_socket

Socket facing client(s).

Type:zmq.Socket
cli_socket

Socket facing server(s).

Type:zmq.Socket
cli_count

Number of clients that have connected to this proxy.

Type:int
after_loop()[source]

Close sockets after the loop finishes.

cleanup()[source]

Clean up sockets on exit.

client_recv()[source]

Receive single message from the client.

close_sockets()[source]

Close the sockets.

poll()[source]
run_loop()[source]

Forward messages from client to server.

server_send(msg)[source]

Send single message to the server.

cis_interface.communication.ZMQComm.bind_socket(socket, address, retry_timeout=-1, nretry=1)[source]

Bind a socket to an address, getting a random port as necessary.

Parameters:
  • socket (zmq.Socket) – Socket that should be bound.
  • address (str) – Address that socket should be bound to.
  • retry_timeout (float, optional) – Time (in seconds) that should be waited before retrying to bind the socket to the address. If negative, a retry will not be attempted and an error will be raised. Defaults to -1.
  • nretry (int, optional) – Number of times to try binding the socket to the addresses. Defaults to 1.
Returns:

Address that socket was bound to, including random port if one

was used.

Return type:

str

cis_interface.communication.ZMQComm.check_czmq()[source]

Determine if the necessary C/C++ libraries are installed for ZeroMQ.

Returns:True if the libraries are installed, False otherwise.
Return type:bool
cis_interface.communication.ZMQComm.format_address(protocol, host, port=None)[source]

Format an address based on its parts.

Parameters:
  • protocol (str) – Communication protocol that should be used.
  • host (str) – Host that address should point to.
  • port (int, optional) – Port that address should point to. Defaults to None and is not added to the address.
Returns:

Complete address.

Return type:

str

Raises:

ValueError – If the protocol is not recognized.

cis_interface.communication.ZMQComm.get_ipc_host()[source]

Get an IPC host using uuid.

Returns:File path for IPC transport created using uuid.
Return type:str
cis_interface.communication.ZMQComm.get_socket_type_mate(t_in)[source]

Find the counterpart socket type.

Parameters:t_in (str) – Socket type.
Returns:Counterpart socket type.
Return type:str
Raises:ValueError – If t_in is not a recognized socket type.
cis_interface.communication.ZMQComm.parse_address(address)[source]

Split an address into its parts.

Parameters:

address (str) – Address to be split.

Returns:

Parameters extracted from the address.

Return type:

dict

Raises:
  • ValueError – If the address dosn’t contain ‘://’.
  • ValueError – If the protocol is not supported.

Module contents

cis_interface.communication.new_comm(name, comm=None, **kwargs)[source]

Return a new communicator, creating necessary components for communication (queues, sockets, channels, etc.).

Parameters:
  • name (str) – Communicator name.
  • comm (str, optional) – Name of communicator class.
  • **kwargs – Additional keyword arguments are passed to communicator class method new_comm.
Returns:

Communicator of given class.

Return type:

Comm

cis_interface.communication.get_comm(name, **kwargs)[source]

Return communicator for existing comm components.

Parameters:
  • name (str) – Communicator name.
  • **kwargs – Additional keyword arguments are passed to new_comm.
Returns:

Communicator of given class.

Return type:

Comm

cis_interface.communication.get_comm_class(comm=None)[source]

Return a communication class given it’s name.

Parameters:comm (str, optional) – Name of communicator class. Defaults to tools.get_default_comm() if not provided.
Returns:Communicator class.
Return type:class
cis_interface.communication.cleanup_comms(comm=None)[source]

Call cleanup_comms for the appropriate communicator class.

Parameters:comm (str, optional) – Name of communicator class. Defaults to tools.get_default_comm() if not provided.
Returns:Number of comms closed.
Return type:int
cis_interface.communication.DefaultComm(*args, **kwargs)[source]

Construct a comm object of the default type.