cis_interface.communication package¶
Subpackages¶
- cis_interface.communication.tests package
- Submodules
- cis_interface.communication.tests.test_AsciiFileComm module
- cis_interface.communication.tests.test_AsciiMapComm module
- cis_interface.communication.tests.test_AsciiTableComm module
- cis_interface.communication.tests.test_AsyncComm module
- cis_interface.communication.tests.test_CommBase module
- cis_interface.communication.tests.test_FileComm module
- cis_interface.communication.tests.test_ForkComm module
- cis_interface.communication.tests.test_IPCComm module
- cis_interface.communication.tests.test_JSONFileComm module
- cis_interface.communication.tests.test_MatFileComm module
- cis_interface.communication.tests.test_ObjFileComm module
- cis_interface.communication.tests.test_PandasFileComm module
- cis_interface.communication.tests.test_PickleFileComm module
- cis_interface.communication.tests.test_PlyFileComm module
- cis_interface.communication.tests.test_RMQAsyncComm module
- cis_interface.communication.tests.test_RMQComm module
- cis_interface.communication.tests.test_ServerComm module
- cis_interface.communication.tests.test_YAMLFileComm module
- cis_interface.communication.tests.test_ZMQComm module
- Module contents
Submodules¶
cis_interface.communication.AsciiFileComm module¶
-
class
cis_interface.communication.AsciiFileComm.
AsciiFileComm
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.FileComm.FileComm
Class for handling I/O from/to a file on disk.
Parameters: -
classmethod
get_testing_options
(**kwargs)[source]¶ Method to return a dictionary of testing options for this class.
Returns: - Dictionary of variables to use for testing. Key/value pairs:
- kwargs (dict): Keyword arguments for comms tested with the
- provided content.
send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test
file that was sent the messages in ‘send’.- contents (bytes): Bytes contents of test file created by sending
- the messages in ‘send’.
Return type: dict
-
classmethod
cis_interface.communication.AsciiMapComm module¶
-
class
cis_interface.communication.AsciiMapComm.
AsciiMapComm
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.FileComm.FileComm
Class for handling I/O from/to a ASCII map on disk.
Parameters: - name (str) – The environment variable where file path is stored.
- **kwargs – Additional keywords arguments are passed to parent class.
-
classmethod
get_testing_options
()[source]¶ Method to return a dictionary of testing options for this class.
Returns: - Dictionary of variables to use for testing. Key/value pairs:
- kwargs (dict): Keyword arguments for comms tested with the
- provided content.
send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test
file that was sent the messages in ‘send’.- contents (bytes): Bytes contents of test file created by sending
- the messages in ‘send’.
Return type: dict
cis_interface.communication.AsciiTableComm module¶
-
class
cis_interface.communication.AsciiTableComm.
AsciiTableComm
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.AsciiFileComm.AsciiFileComm
Class for handling I/O from/to a file on disk.
Parameters: - name (str) – The environment variable where communication address is stored.
- delimiter (str, optional) – String that should be used to separate columns. If not provided and format_str is not set prior to I/O, this defaults to whitespace.
- use_astropy (bool, optional) – If True and the astropy package is installed, it will be used to read/write the table. Defaults to False.
- **kwargs – Additional keywords arguments are passed to parent class.
-
advance_in_series
(*args, **kwargs)[source]¶ Advance to a certain file in a series.
Parameters: index (int, optional) – Index of file in the series that should be moved to. Defaults to None and call will advance to the next file in the series. Returns: True if the file was advanced in the series, False otherwise. Return type: bool
-
change_position
(file_pos, series_index=None, header_was_read=None, header_was_written=None)[source]¶ Change the position in the file/series.
Parameters: - file_pos (int) – Position that should be moved to in the file.
- series_index (int, optinal) – Index of the file in the series that should be moved to. Defaults to None and will be set to the current series index.
- header_was_read (bool, optional) – Status of if header has been read or not. Defaults to None and will be set to the current value.
- header_was_written (bool, optional) – Status of if header has been written or not. Defaults to None and will be set to the current value.
-
classmethod
get_testing_options
(as_array=False, **kwargs)[source]¶ Method to return a dictionary of testing options for this class.
Returns: - Dictionary of variables to use for testing. Key/value pairs:
- kwargs (dict): Keyword arguments for comms tested with the
- provided content.
send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test
file that was sent the messages in ‘send’.- contents (bytes): Bytes contents of test file created by sending
- the messages in ‘send’.
Return type: dict
cis_interface.communication.AsyncComm module¶
-
class
cis_interface.communication.AsyncComm.
AsyncComm
(name, dont_backlog=False, **kwargs)[source]¶ Bases:
cis_interface.communication.CommBase.CommBase
Class for handling asynchronous I/O.
Parameters: -
dont_backlog
¶ If True, the backlog will not be started and all messages will be sent/received directly to/from the comm.
Type: bool
-
backlog_send_ready
¶ Event set when there is a message in the send backlog.
Type: threading.Event
-
backlog_recv_ready
¶ Event set when there is a message in the recv backlog.
Type: threading.Event
-
add_backlog_recv
(msg)[source]¶ Add a message to the backlog of received messages.
Parameters: msg (str) – Received message that should be backlogged.
-
add_backlog_send
(msg, **kwargs)[source]¶ Add a message to the backlog of messages to be sent.
Parameters: - msg (str) – Message that should be backlogged for sending.
- **kwargs – Additional keyword arguments are added along with the message.
-
backlog_thread
¶ Thread that will handle sinding or receiving backlogged messages.
Type: tools.CisThread
-
pop_backlog_recv
()[source]¶ Pop a message from the front of the recv backlog.
Returns: First backlogged recv message. Return type: str
-
cis_interface.communication.ClientComm module¶
-
class
cis_interface.communication.ClientComm.
ClientComm
(name, request_comm=None, response_kwargs=None, dont_open=False, **kwargs)[source]¶ Bases:
cis_interface.communication.CommBase.CommBase
Class for handling Client side communication.
Parameters: - name (str) – The environment variable where communication address is stored.
- request_comm (str, optional) – Comm class that should be used for the request comm. Defaults to None.
- response_kwargs (dict, optional) – Keyword arguments for the response comm. Defaults to empty dict.
- **kwargs – Additional keywords arguments are passed to the output comm.
-
ocomm
¶ Request comm.
Type: Comm
-
call
(*args, **kwargs)[source]¶ Do RPC call. The request message is sent to the output comm and the response is received from the input comm.
Parameters: - *args – Arguments are passed to output comm send method.
- **kwargs – Keyword arguments are passed to output comm send method
Returns: Output from input comm recv method.
Return type: obj
-
classmethod
is_installed
(language=None)[source]¶ Determine if the necessary libraries are installed for this communication class.
Parameters: language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked. Returns: Is the comm installed. Return type: bool
-
classmethod
new_comm_kwargs
(name, request_comm=None, **kwargs)[source]¶ Initialize communication with new comms.
Parameters:
-
opp_comm_kwargs
()[source]¶ Get keyword arguments to initialize communication with opposite comm object.
Returns: Keyword arguments for opposite comm object. Return type: dict
-
recv
(*args, **kwargs)[source]¶ Receive a message from the input comm and open a new response comm for output using address from the header.
Parameters: - *args – Arguments are passed to input comm recv method.
- **kwargs – Keyword arguments are passed to input comm recv method.
Returns: Output from input comm recv method.
Return type: obj
-
send
(*args, **kwargs)[source]¶ Create a response comm and then send a message to the output comm with the response address in the header.
Parameters: - *args – Arguments are passed to output comm send method.
- **kwargs – Keyword arguments are passed to output comm send method.
Returns: Output from output comm send method.
Return type: obj
cis_interface.communication.CommBase module¶
-
class
cis_interface.communication.CommBase.
CommBase
(name, address=None, direction='send', dont_open=False, is_interface=False, recv_timeout=0.0, close_on_eof_recv=True, close_on_eof_send=False, single_use=False, reverse_names=False, no_suffix=False, is_client=False, is_response_client=False, is_server=False, is_response_server=False, comm=None, matlab=False, **kwargs)[source]¶ Bases:
cis_interface.tools.CisClass
Class for handling I/O.
Parameters: - name (str) – The environment variable where communication address is stored.
- address (str, optional) – Communication info. Default to None and address is taken from the environment variable.
- direction (str, optional) – The direction that messages should flow through the connection. ‘send’ if the connection will send messages, ‘recv’ if the connecton will receive messages. Defaults to ‘send’.
- ( (serializer) – class:.DefaultSerialize, optional): Class with serialize and deserialize methods that should be used to process sent and received messages. Defaults to None and is constructed using provided ‘serializer_kwargs’.
- serializer_kwargs (dict, optional) – Keyword arguments that should be passed to :class:.DefaultSerialize to create serializer. Defaults to {}.
- format_str (str, optional) – String that should be used to format/parse messages. Default to None.
- dont_open (bool, optional) – If True, the connection will not be opened. Defaults to False.
- is_interface (bool, optional) – Set to True if this comm is a Python interface binding. Defaults to False.
- recv_timeout (float, optional) – Time that should be waited for an incoming message before returning None. Defaults to 0 (no wait). A value of False indicates that recv should block.
- close_on_eof_recv (bool, optional) – If True, the comm will be closed when it receives an end-of-file messages. Otherwise, it will remain open. Defaults to True.
- close_on_eof_send (bool, optional) – If True, the comm will be closed after it sends an end-of-file messages. Otherwise, it will remain open. Defaults to False.
- single_use (bool, optional) – If True, the comm will only be used to send/recv a single message. Defaults to False.
- reverse_names (bool, optional) – If True, the suffix added to the comm with be reversed. Defaults to False.
- no_suffix (bool, optional) – If True, no directional suffix will be added to the comm name. Defaults to False.
- is_client (bool, optional) – If True, the comm is one of many potential clients that will be sending messages to one or more servers. Defaults to False.
- is_response_client (bool, optional) – If True, the comm is a client-side response comm. Defaults to False.
- is_server (bool, optional) – If True, the commis one of many potential servers that will be receiving messages from one or more clients. Defaults to False.
- is_response_server (bool, optional) – If True, the comm is a server-side response comm. Defaults to False.
- recv_converter (func, optional) – Converter that should be used on received objects. Defaults to None.
- send_converter (func, optional) – Converter that should be used on sent objects. Defaults to None.
- comm (str, optional) – The comm that should be created. This only serves as a check that the correct class is being created. Defaults to None.
- matlab (bool, optional) – True if the comm will be accessed by Matlab code. Defaults to False.
- **kwargs – Additional keywords arguments are passed to parent class.
-
serializer (
class:.DefaultSerialize): Object that will be used to serialize/deserialize messages to/from python objects.
-
close_on_eof_recv
¶ If True, the comm will be closed when it receives an end-of-file messages. Otherwise, it will remain open.
Type: bool
-
close_on_eof_send
¶ If True, the comm will be closed after it sends an end-of-file messages. Otherwise, it will remain open.
Type: bool
-
is_client
¶ If True, the comm is one of many potential clients that will be sending messages to one or more servers.
Type: bool
-
is_server
¶ If True, the comm is one of many potential servers that will be receiving messages from one or more clients.
Type: bool
-
recv_converter
¶ Converter that should be used on received objects.
Type: func
-
send_converter
¶ Converter that should be used on sent objects.
Type: func
Raises: RuntimeError
– If the comm class is not installed.RuntimeError
– If there is not an environment variable with the specified name.ValueError
– If directions is not ‘send’ or ‘recv’.
-
add_work_comm
(comm)[source]¶ Add work comm to dict.
Parameters: ( (comm) – class:.CommBase): Comm that should be added. Raises: KeyError
– If there is already a comm associated with the key.
-
apply_recv_converter
(msg_in)[source]¶ Apply recv_converter.
Parameters: msg_in (object) – Message to convert. Returns: Converted message. Return type: object
-
apply_send_converter
(msg_in)[source]¶ Apply send converter.
Parameters: msg_in (object) – Message to convert. Returns: Converted message. Return type: object
-
chunk_message
(msg)[source]¶ Yield chunks of message of size maxMsgSize
Parameters: msg (str, bytes) – Raw message bytes to be chunked. Returns: Chunks of message. Return type: str
-
close
(linger=False)[source]¶ Close the connection.
Parameters: linger (bool, optional) – If True, drain messages before closing the comm. Defaults to False.
-
close_in_thread
(no_wait=False, timeout=None)[source]¶ In a new thread, close the comm when it is empty.
Parameters:
-
create_work_comm
(work_comm_name=None, **kwargs)[source]¶ Create a temporary work comm.
Parameters: - work_comm_name (str, optional) – Name that should be used for the work comm. If not provided, one is created from the header id and the comm class.
- **kwargs – Keyword arguments for new_comm that should override work_comm_kwargs.
Returns: class:.CommBase: Work comm.
-
drain_messages
(direction=None, timeout=None, variable=None)[source]¶ Sleep while waiting for messages to be drained.
-
empty_obj_recv
¶ Empty message object.
Type: obj
-
classmethod
get_testing_options
(**kwargs)[source]¶ Method to return a dictionary of testing options for this class.
Returns: - Dictionary of variables to use for testing. Key/value pairs:
- kwargs (dict): Keyword arguments for comms tested with the
- provided content.
send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test
file that was sent the messages in ‘send’.- contents (bytes): Bytes contents of test file created by sending
- the messages in ‘send’.
Return type: dict
-
get_work_comm
(header, **kwargs)[source]¶ Get temporary work comm, creating as necessary.
Parameters: - header (dict) – Information that will be sent in the message header to the work comm.
- **kwargs – Additional keyword arguments are passed to header2workcomm.
Returns: class:.CommBase: Work comm.
-
header2workcomm
(header, work_comm_name=None, **kwargs)[source]¶ Get a work comm based on header info.
Parameters: - header (dict) – Information that will be sent in the message header to the work comm.
- work_comm_name (str, optional) – Name that should be used for the work comm. If not provided, one is created from the header id and the comm class.
- **kwargs – Additional keyword arguments are added to the returned dictionary.
Returns: class:.CommBase: Work comm.
-
is_empty_recv
(msg)[source]¶ Check if a received message object is empty.
Parameters: msg (obj) – Message object. Returns: True if the object is empty, False otherwise. Return type: bool
-
is_empty_send
(msg)[source]¶ Check if a message object being sent is empty.
Parameters: msg (obj) – Message object. Returns: True if the object is empty, False otherwise. Return type: bool
-
is_eof
(msg)[source]¶ Determine if a message is an EOF.
Parameters: msg (obj) – Message object to be tested. Returns: True if the message indicates an EOF, False otherwise. Return type: bool
-
is_file
= False
-
classmethod
is_installed
(language=None)[source]¶ Determine if the necessary libraries are installed for this communication class.
Parameters: language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked. If set to ‘any’, the result will be True if this comm is installed for any of the supported languages. Returns: Is the comm installed. Return type: bool
-
maxMsgSize
Maximum size of a single message that should be sent.
Type: int
-
new_server
(srv_address)[source]¶ Create a new server.
Parameters: srv_address (str) – Address of server comm.
-
on_recv
(s_msg, second_pass=False)[source]¶ Process raw received message including handling deserializing message and handling EOF.
Parameters: Returns: - Success or failure, processed message, and
header information.
Return type:
-
on_recv_eof
()[source]¶ Actions to perform when EOF received.
Returns: Flag that should be returned for EOF. Return type: bool
-
on_send
(msg, header_kwargs=None)[source]¶ Process message to be sent including handling serializing message and handling EOF.
Parameters: - msg (obj) – Message to be sent
- header_kwargs (dict, optional) – Keyword arguments that should be added to the header.
Returns: - Truth of if message should be sent, raw
bytes message to send, and header info contained in the message.
Return type:
-
on_send_eof
()[source]¶ Actions to perform when EOF being sent.
Returns: True if EOF message should be sent, False otherwise. Return type: bool
-
opp_comm_kwargs
()[source]¶ Get keyword arguments to initialize communication with opposite comm object.
Returns: Keyword arguments for opposite comm object. Return type: dict
-
recv
(*args, **kwargs)[source]¶ Receive a message.
Parameters: - *args – All arguments are passed to comm _recv method.
- **kwargs – All keywords arguments are passed to comm _recv method.
Returns: - Success or failure of receive and received
message.
Return type:
-
recv_dict
(*args, **kwargs)[source]¶ Return a received message as a dictionary of fields. If there are not any fields specified, the fields will have the form ‘f0’, ‘f1’, ‘f2’, …
Parameters: - *args – Arguments are passed to recv.
- **kwargs – Keyword arguments are passed to recv.
Returns: - Success/failure of receive and a dictionar of
message fields.
Return type: Raises:
-
recv_multipart
(*args, **kwargs)[source]¶ Receive a multipart message. If a message is received without a header, it assumed to be complete. Otherwise, the message is received in parts from a worker comm initialized by the sender.
Parameters: - *args – All arguments are passed to comm _recv method.
- **kwargs – All keywords arguments are passed to comm _recv method.
Returns: - Success or failure of receive and received
message.
Return type:
-
remove_work_comm
(key, in_thread=False, linger=False)[source]¶ Close and remove a work comm.
Parameters:
-
send
(*args, **kwargs)[source]¶ Send a message.
Parameters: - *args – All arguments are assumed to be part of the message.
- **kwargs – All keywords arguments are passed to comm _send method.
Returns: Success or failure of send.
Return type:
-
send_dict
(args_dict, **kwargs)[source]¶ Send a message with fields specified in the input dictionary.
Parameters: - args_dict (dict) – Dictionary of arguments to send.
- **kwargs – Additiona keyword arguments are passed to send.
Returns: Success/failure of send.
Return type: Raises: RuntimeError
– If the field order can not be determined.
-
send_eof
(*args, **kwargs)[source]¶ Send the EOF message as a short message.
Parameters: - *args – All arguments are passed to comm send.
- **kwargs – All keywords arguments are passed to comm send.
Returns: Success or failure of send.
Return type:
-
send_multipart
(msg, header_kwargs=None, **kwargs)[source]¶ Send a multipart message. If the message is smaller than maxMsgSize, it is sent using _send, otherwise it is sent to a worker comm using _send_multipart_worker.
Parameters: - msg (obj) – Message to be sent.
- header_kwargs (dict, optional) – Keyword arguments that should be added to the header.
- **kwargs – Additional keyword arguments are passed to _send or _send_multipart_worker.
Returns: Success or failure of send.
Return type:
-
server_exists
(srv_address)[source]¶ Determine if a server exists.
Parameters: srv_address (str) – Address of server comm. Returns: - True if a server with the provided address exists, False
- otherwise.
Return type: bool
-
wait_for_confirm
(timeout=None, direction=None, active_confirm=False, noblock=False)[source]¶ Sleep until all messages are confirmed.
-
class
cis_interface.communication.CommBase.
CommServer
(srv_address, cli_address=None, **kwargs)[source]¶ Bases:
cis_interface.tools.CisThreadLoop
Basic server object to keep track of clients.
-
class
cis_interface.communication.CommBase.
CommThreadLoop
(comm, name=None, suffix='CommThread', **kwargs)[source]¶ Bases:
cis_interface.tools.CisThreadLoop
Thread loop for comms to ensure cleanup.
Parameters: - ( (comm) – class:.CommBase): Comm class that thread is for.
- name (str, optional) – Name for the thread. If not provided, one is created by combining the comm name and the provided suffix.
- suffix (str, optional) – Suffix that should be added to comm name to name the thread. Defaults to ‘CommThread’.
- **kwargs – Additional keyword arguments are passed to the parent class.
-
comm (
class:.CommBase): Comm class that thread is for.
-
cis_interface.communication.CommBase.
cleanup_comms
(comm_class, close_func=None)[source]¶ Clean up comms of a certain type.
Parameters: comm_class (str) – Comm class that should be cleaned up. Returns: Number of comms closed. Return type: int
-
cis_interface.communication.CommBase.
get_comm_registry
(comm_class)[source]¶ Get the comm registry for a comm class.
Parameters: comm_class (str) – Comm class to get registry for. Returns: Dictionary of registered comm objects. Return type: dict
-
cis_interface.communication.CommBase.
is_registered
(comm_class, key)[source]¶ Determine if a comm object has been registered under the specified key.
Parameters:
-
cis_interface.communication.CommBase.
register_comm
(comm_class, key, value)[source]¶ Add a comm object to the global registry.
Parameters:
cis_interface.communication.ErrorComm module¶
cis_interface.communication.FileComm module¶
-
class
cis_interface.communication.FileComm.
FileComm
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.CommBase.CommBase
Class for handling I/O from/to a file on disk.
>>> x = FileComm('test_send', address='test_file.txt', direction='send') >>> x.send('Test message') True >>> with open('test_file.txt', 'r') as fd: ... print(fd.read()) Test message >>> x = FileComm('test_recv', address='test_file.txt', direction='recv') >>> x.recv() (True, b'Test message')
Parameters: - name (str) – The environment variable where communication address is stored.
- read_meth (str, optional) – Method that should be used to read data from the file. Defaults to ‘read’. Ignored if direction is ‘send’.
- append (bool, optional) – If True and writing, file is openned in append mode. Defaults to False.
- in_temp (bool, optional) – If True, the path will be considered relative to the platform temporary directory. Defaults to False.
- open_as_binary (bool, optional) – If True, the file is opened in binary mode. Defaults to True.
- newline (str, optional) – String indicating a new line. Defaults to serialize._default_newline.
- is_series (bool, optional) – If True, input/output will be done to a series of files. If reading, each file will be processed until the end is reached. If writing, each output will be to a new file in the series. The addressed is assumed to contain a format for the index of the file. Defaults to False.
- wait_for_creation (float, optional) – Time (in seconds) that should be waited before opening for the file to be created if it dosn’t exist. Defaults to 0 s and file will attempt to be opened immediately.
- **kwargs – Additional keywords arguments are passed to parent class.
-
fd
¶ File that should be read/written.
Type: file
-
in_temp
¶ If True, the path will be considered relative to the platform temporary directory.
Type: bool
-
is_series
¶ If True, input/output will be done to a series of files. If reading, each file will be processed until the end is reached. If writing, each output will be to a new file in the series.
Type: bool
Raises: ValueError
– If the read_meth is not one of the supported values.-
advance_in_file
(file_pos)[source]¶ Advance to a certain position in the current file.
Parameters: file_pos (int) – Position that should be moved to in the current. file.
-
advance_in_series
(series_index=None)[source]¶ Advance to a certain file in a series.
Parameters: series_index (int, optional) – Index of file in the series that should be moved to. Defaults to None and call will advance to the next file in the series. Returns: True if the file was advanced in the series, False otherwise. Return type: bool
-
change_position
(file_pos, series_index=None)[source]¶ Change the position in the file/series.
Parameters:
-
fd
Associated file identifier.
-
get_series_address
(index=None)[source]¶ Get the address of a file in the series.
Parameters: index (int, optional) – Index in series to get address for. Defaults to None and the current index is used. Returns: Address for the file in the series. Return type: str
-
classmethod
get_testing_options
(read_meth='read', open_as_binary=True, **kwargs)[source]¶ Method to return a dictionary of testing options for this class.
Returns: - Dictionary of variables to use for testing. Key/value pairs:
- kwargs (dict): Keyword arguments for comms tested with the
- provided content.
send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test
file that was sent the messages in ‘send’.- contents (bytes): Bytes contents of test file created by sending
- the messages in ‘send’.
Return type: dict
-
is_file
= True¶
-
classmethod
is_installed
(language=None)[source]¶ Determine if the necessary libraries are installed for this communication class.
Parameters: language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked. Returns: Is the comm installed. Return type: bool
-
on_send_eof
()[source]¶ Close file when EOF to be sent.
Returns: False so that message not sent. Return type: bool
cis_interface.communication.ForkComm module¶
-
class
cis_interface.communication.ForkComm.
ForkComm
(name, comm=None, **kwargs)[source]¶ Bases:
cis_interface.communication.CommBase.CommBase
Class for receiving/sending messages from/to multiple comms.
Parameters: -
opp_comm_kwargs
()[source]¶ Get keyword arguments to initialize communication with opposite comm object.
Returns: Keyword arguments for opposite comm object. Return type: dict
-
cis_interface.communication.IPCComm module¶
-
class
cis_interface.communication.IPCComm.
IPCComm
(name, dont_backlog=False, **kwargs)[source]¶ Bases:
cis_interface.communication.AsyncComm.AsyncComm
Class for handling I/O via IPC message queues.
-
q
¶ Message queue.
Type: sysv_ipc.MessageQueue
-
classmethod
is_installed
(language=None)[source]¶ Determine if the necessary libraries are installed for this communication class.
Parameters: language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked. Returns: Is the comm installed. Return type: bool
-
-
class
cis_interface.communication.IPCComm.
IPCServer
(srv_address, cli_address=None, **kwargs)[source]¶ Bases:
cis_interface.communication.CommBase.CommServer
IPC server object for cleaning up server queue.
-
cis_interface.communication.IPCComm.
get_queue
(qid=None)[source]¶ Create or return a sysv_ipc.MessageQueue and register it.
Parameters: qid (int, optional) – If provided, ID for existing queue that should be returned. Defaults to None and a new queue is returned. Returns: Message queue. Return type: sysv_ipc.MessageQueue
-
cis_interface.communication.IPCComm.
ipc_queues
()[source]¶ Get a list of active IPC queues.
Returns: List of IPC queues. Return type: list
-
cis_interface.communication.IPCComm.
ipcrm
(options=[])[source]¶ Remove IPC constructs using the ipcrm command.
Parameters: options (list) – List of flags that should be used. Defaults to an empty list.
-
cis_interface.communication.IPCComm.
ipcrm_queues
(queue_keys=None)[source]¶ Delete existing IPC queues.
Parameters: queue_keys (list, str, optional) – A list of keys for queues that should be removed. Defaults to all existing queues.
cis_interface.communication.JSONFileComm module¶
-
class
cis_interface.communication.JSONFileComm.
JSONFileComm
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.FileComm.FileComm
Class for handling I/O from/to a JSON file on disk.
Parameters: - name (str) – The environment variable where file path is stored.
- **kwargs – Additional keywords arguments are passed to parent class.
-
classmethod
get_testing_options
()[source]¶ Method to return a dictionary of testing options for this class.
Returns: - Dictionary of variables to use for testing. Key/value pairs:
- kwargs (dict): Keyword arguments for comms tested with the
- provided content.
send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test
file that was sent the messages in ‘send’.- contents (bytes): Bytes contents of test file created by sending
- the messages in ‘send’.
Return type: dict
cis_interface.communication.MatFileComm module¶
-
class
cis_interface.communication.MatFileComm.
MatFileComm
(name, **kwargs)[source]¶ Bases:
cis_interface.communication.FileComm.FileComm
Class for handling I/O from/to a Matlab .mat file on disk.
Parameters: - name (str) – The environment variable where file path is stored.
- **kwargs – Additional keywords arguments are passed to parent class.
-
classmethod
get_testing_options
()[source]¶ Method to return a dictionary of testing options for this class.
Returns: - Dictionary of variables to use for testing. Key/value pairs:
- kwargs (dict): Keyword arguments for comms tested with the
- provided content.
send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test
file that was sent the messages in ‘send’.- contents (bytes): Bytes contents of test file created by sending
- the messages in ‘send’.
Return type: dict
cis_interface.communication.ObjFileComm module¶
-
class
cis_interface.communication.ObjFileComm.
ObjFileComm
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.PlyFileComm.PlyFileComm
Class for handling I/O from/to a .obj file on disk.
Parameters: - name (str) – The environment variable where communication address is stored.
- **kwargs – Additional keywords arguments are passed to parent class.
-
classmethod
get_testing_options
()[source]¶ Method to return a dictionary of testing options for this class.
Returns: - Dictionary of variables to use for testing. Key/value pairs:
- kwargs (dict): Keyword arguments for comms tested with the
- provided content.
send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test
file that was sent the messages in ‘send’.- contents (bytes): Bytes contents of test file created by sending
- the messages in ‘send’.
Return type: dict
cis_interface.communication.PandasFileComm module¶
-
class
cis_interface.communication.PandasFileComm.
PandasFileComm
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.AsciiTableComm.AsciiTableComm
Class for handling I/O from/to a pandas csv file on disk.
Parameters: -
classmethod
get_testing_options
(as_frames=False, no_names=False)[source]¶ Method to return a dictionary of testing options for this class.
Parameters: Returns: - Dictionary of variables to use for testing. Key/value pairs:
- kwargs (dict): Keyword arguments for comms tested with the
provided content.
send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test
file that was sent the messages in ‘send’.
- contents (bytes): Bytes contents of test file created by sending
the messages in ‘send’.
Return type:
-
classmethod
-
cis_interface.communication.PandasFileComm.
pandas_recv_converter
(obj)[source]¶ Performs conversion to a limited set of objects from a Pandas data frame for receiving from a file via PandasFileComm. Currently supports converting to lists/tuples of numpy arrays.
Parameters: obj (pandas.DataFrame) – Data frame to convert. Returns: - pandas.DataFrame: Converted data frame (or unmodified input if conversion
- could not be completed.
Return type: list
-
cis_interface.communication.PandasFileComm.
pandas_send_converter
(obj)[source]¶ Performs conversion from a limited set of objects to a Pandas data frame for sending to a file via PandasFileComm. Currently supports converting from structured numpy arrays, lists/tuples of numpy arrays, and dictionaries.
Parameters: obj (object) – Object to convert. Returns: - Converted data frame (or unmodified input if conversion
- could not be completed.
Return type: pandas.DataFrame
cis_interface.communication.PickleFileComm module¶
-
class
cis_interface.communication.PickleFileComm.
PickleFileComm
(name, **kwargs)[source]¶ Bases:
cis_interface.communication.FileComm.FileComm
Class for handling I/O from/to a pickled file on disk.
Parameters: - name (str) – The environment variable where file path is stored.
- **kwargs – Additional keywords arguments are passed to parent class.
-
classmethod
get_testing_options
()[source]¶ Method to return a dictionary of testing options for this class.
Returns: - Dictionary of variables to use for testing. Key/value pairs:
- kwargs (dict): Keyword arguments for comms tested with the
- provided content.
send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test
file that was sent the messages in ‘send’.- contents (bytes): Bytes contents of test file created by sending
- the messages in ‘send’.
Return type: dict
cis_interface.communication.PlyFileComm module¶
-
class
cis_interface.communication.PlyFileComm.
PlyFileComm
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.FileComm.FileComm
Class for handling I/O from/to a .ply file on disk.
Parameters: - name (str) – The environment variable where communication address is stored.
- **kwargs – Additional keywords arguments are passed to parent class.
-
classmethod
get_testing_options
()[source]¶ Method to return a dictionary of testing options for this class.
Returns: - Dictionary of variables to use for testing. Key/value pairs:
- kwargs (dict): Keyword arguments for comms tested with the
- provided content.
send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test
file that was sent the messages in ‘send’.- contents (bytes): Bytes contents of test file created by sending
- the messages in ‘send’.
Return type: dict
cis_interface.communication.RMQAsyncComm module¶
-
class
cis_interface.communication.RMQAsyncComm.
RMQAsyncComm
(name, dont_backlog=False, **kwargs)[source]¶ Bases:
cis_interface.communication.RMQComm.RMQComm
Class for handling asynchronous RabbitMQ communications. It is not recommended to use this class as it can leave hanging threads if not closed propertly. The normal RMQComm will cover most use cases.
Parameters: -
rmq_thread
¶ Thread used to run IO loop.
Type: tools.CisThread
-
channel_stable
¶ True if the connection ready for messages and not about to close. False otherwise.
Type: bool
-
on_bindok
(unused_frame)[source]¶ Actions to perform once the queue is succesfully bound. Start consuming messages.
-
on_cancelok
(unused_frame)[source]¶ Actions to perform after succesfully cancelling consumption. Closes the channel.
-
on_channel_closed
(channel, *args)[source]¶ Actions to perform when the channel is closed. Close the connection.
-
on_channel_open
(channel)[source]¶ Actions to perform after a channel is opened. Add the channel close callback and setup the exchange.
-
on_connection_closed
(connection, *args)[source]¶ Actions that must be taken when the connection is closed. Set the channel to None. If the connection is meant to be closing, stop the IO loop. Otherwise, wait 5 seconds and try to reconnect.
-
on_connection_open
(connection)[source]¶ Actions that must be taken when the connection is opened. Add the close connection callback and open the RabbitMQ channel.
-
on_connection_open_error
(unused_connection)[source]¶ Actions that must be taken when the connection fails to open.
-
on_exchange_declareok
(unused_frame)[source]¶ Actions to perform once an exchange is succesfully declared. Set up the queue.
-
on_queue_declareok
(method_frame)[source]¶ Actions to perform once the queue is succesfully declared. Bind the queue.
-
rmq_lock
¶ Lock associated with RMQ ioloop thread.
-
cis_interface.communication.RMQComm module¶
-
class
cis_interface.communication.RMQComm.
RMQComm
(name, dont_backlog=False, **kwargs)[source]¶ Bases:
cis_interface.communication.AsyncComm.AsyncComm
Class for handling basic RabbitMQ communications.
-
connection
¶ RabbitMQ connection.
Type: pika.Connection
-
channel
¶ RabbitMQ channel.
Type: pika.Channel
Raises: RuntimeError
– If a connection cannot be established.-
classmethod
is_installed
(language=None)[source]¶ Determine if the necessary libraries are installed for this communication class.
Parameters: language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked. Returns: Is the comm installed. Return type: bool
-
classmethod
new_comm_kwargs
(name, user=None, password=None, host=None, virtual_host=None, port=None, exchange=None, queue='', **kwargs)[source]¶ Initialize communication with new connection.
Parameters: - name (str) – Name of new connection.
- user (str, optional) – RabbitMQ server username. Defaults to config option ‘user’ in section ‘rmq’ if it exists and ‘guest’ if it does not.
- password (str, optional) – RabbitMQ server password. Defaults to config option ‘password’ in section ‘rmq’ if it exists and ‘guest’ if it does not.
- host (str, optional) – RabbitMQ server host. Defaults to config option ‘host’ in section ‘rmq’ if it exists and ‘localhost’ if it does not. If ‘localhost’, the output of socket.gethostname() is used.
- virtual_host (str, optional) – RabbitMQ server virtual host. Defaults to config option ‘vhost’ in section ‘rmq’ if it exists and ‘/’ if it does not.
- port (str, optional) – Port on host to use. Defaults to config option ‘port’ in section ‘rmq’ if it exists and ‘5672’ if it does not.
- exchange (str, optional) – RabbitMQ exchange. Defaults to config option ‘namespace’ in section ‘rmq’ if it exits and ‘’ if it does not.
- queue (str, optional) – Name of the queue that messages will be send to or received from. If an empty string, the queue will be a random string and exclusive to a receiving comm. Defaults to ‘’.
- **kwargs – Additional keywords arguments are returned as keyword arguments for the new comm.
Returns: Arguments and keyword arguments for new comm.
Return type:
-
-
class
cis_interface.communication.RMQComm.
RMQServer
(srv_address, cli_address=None, **kwargs)[source]¶ Bases:
cis_interface.communication.CommBase.CommServer
RMQ server object for cleaning up server connections.
-
cis_interface.communication.RMQComm.
check_rmq_server
(url=None, **kwargs)[source]¶ Check that connection to a RabbitMQ server is possible.
Parameters: - url (str, optional) – Address of RMQ server that includes all the necessary information. If this is provided, the remaining arguments are ignored. Defaults to None and the connection parameters are taken from the other arguments.
- username (str, optional) – RMQ server username. Defaults to config value.
- password (str, optional) – RMQ server password. Defaults to config value.
- host (str, optional) – RMQ hostname or IP Address to connect to. Defaults to config value.
- port (str, optional) – RMQ server TCP port to connect to. Defaults to config value.
- vhost (str, optional) – RMQ virtual host to use. Defaults to config value.
Returns: - True if connection to RabbitMQ server is possible, False
otherwise.
Return type:
cis_interface.communication.ServerComm module¶
-
class
cis_interface.communication.ServerComm.
ServerComm
(name, request_comm=None, response_kwargs=None, dont_open=False, **kwargs)[source]¶ Bases:
cis_interface.communication.CommBase.CommBase
Class for handling Server side communication.
Parameters: - name (str) – The environment variable where communication address is stored.
- request_comm (str, optional) – Comm class that should be used for the request comm. Defaults to None.
- response_kwargs (dict, optional) – Keyword arguments for the response comm. Defaults to empty dict.
- **kwargs – Additional keywords arguments are passed to the input comm.
-
icomm
¶ Request comm.
Type: Comm
-
ocomm
¶ Response comm for last request.
Type: Comm
-
classmethod
is_installed
(language=None)[source]¶ Determine if the necessary libraries are installed for this communication class.
Parameters: language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked. Returns: Is the comm installed. Return type: bool
-
classmethod
new_comm_kwargs
(name, request_comm=None, **kwargs)[source]¶ Initialize communication with new comms.
Parameters:
-
opp_comm_kwargs
()[source]¶ Get keyword arguments to initialize communication with opposite comm object.
Returns: Keyword arguments for opposite comm object. Return type: dict
-
recv
(*args, **kwargs)[source]¶ Receive a message from the input comm and open a new response comm for output using address from the header.
Parameters: - *args – Arguments are passed to input comm recv method.
- **kwargs – Keyword arguments are passed to input comm recv method.
Returns: Output from input comm recv method.
Return type: obj
cis_interface.communication.YAMLFileComm module¶
-
class
cis_interface.communication.YAMLFileComm.
YAMLFileComm
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.FileComm.FileComm
Class for handling I/O from/to a YAML file on disk.
Parameters: - name (str) – The environment variable where file path is stored.
- **kwargs – Additional keywords arguments are passed to parent class.
-
classmethod
get_testing_options
()[source]¶ Method to return a dictionary of testing options for this class.
Returns: - Dictionary of variables to use for testing. Key/value pairs:
- kwargs (dict): Keyword arguments for comms tested with the
- provided content.
send (list): List of objects to send to test file. recv (list): List of objects that will be received from a test
file that was sent the messages in ‘send’.- contents (bytes): Bytes contents of test file created by sending
- the messages in ‘send’.
Return type: dict
cis_interface.communication.ZMQComm module¶
-
class
cis_interface.communication.ZMQComm.
ZMQComm
(name, dont_backlog=False, **kwargs)[source]¶ Bases:
cis_interface.communication.AsyncComm.AsyncComm
Class for handling I/O using ZeroMQ sockets.
Parameters: - name (str) – The environment variable where the socket address is stored.
- context (zmq.Context, optional) – ZeroMQ context that should be used. Defaults to None and the global context is used.
- socket_type (str, optional) – The type of socket that should be created. Defaults to _default_socket_type. See zmq for all options.
- socket_action (str, optional) – The action that the socket should perform. Defaults to action based on the direction (‘connect’ for ‘recv’, ‘bind’ for ‘send’.)
- topic_filter (str, optional) – Message filter to use when subscribing. This is only used for ‘SUB’ socket types. Defaults to ‘’ which is all messages.
- dealer_identity (str, optional) – Identity that should be used to route messages to a dealer socket. Defaults to ‘0’.
- **kwargs – Additional keyword arguments are passed to :class:.CommBase.
-
context
¶ ZeroMQ context that will be used.
Type: zmq.Context
-
socket
¶ ZeroMQ socket.
Type: zmq.Socket
-
check_reply_socket_recv
(msg)[source]¶ Check incoming message for reply address.
Parameters: msg (str) – Incoming message to check. Returns: Messages with reply address removed if present. Return type: str
-
check_reply_socket_send
(msg)[source]¶ Append reply socket address if it
Parameters: msg (str) – Message that will be piggy backed on. Returns: Message with reply address if it has not been sent. Return type: str
-
header2workcomm
(header, **kwargs)[source]¶ Get a work comm based on header info.
Parameters: - header (dict) – Information that will be sent in the message header to the work comm.
- **kwargs – Additional keyword arguments are passed to the parent method.
Returns: class:.CommBase: Work comm.
-
classmethod
is_installed
(language=None)[source]¶ Determine if the necessary libraries are installed for this communication class.
Parameters: language (str, optional) – Specific language that should be checked for compatibility. Defaults to None and all languages supported on the current platform will be checked. Returns: Is the comm installed. Return type: bool
-
is_message
(flags)[source]¶ Poll the socket for a message.
Parameters: flags (int) – ZMQ poll flags. Returns: True if there is a message matching the flags, False otherwise. Return type: bool
-
classmethod
new_comm_kwargs
(name, protocol=None, host=None, port=None, **kwargs)[source]¶ Initialize communication with new queue.
Parameters: - name (str) – Name of new socket.
- protocol (str, optional) – The protocol that should be used. Defaults to None and is set to _default_protocol. See zmq for details.
- host (str, optional) – The host that should be used. Invalid for ‘inproc’ protocol. Defaults to ‘localhost’.
- port (int, optional) – The port used. Invalid for ‘inproc’ protocol. Defaults to None and a random port is choosen.
- **kwargs – Additional keywords arguments are returned as keyword arguments for the new comm.
Returns: Arguments and keyword arguments for new socket.
Return type:
-
on_send
(msg, header_kwargs=None)[source]¶ Process message to be sent including handling serializing message and handling EOF.
Parameters: - msg (obj) – Message to be sent
- header_kwargs (dict, optional) – Keyword arguments that should be added to the header.
Returns: - Truth of if message should be sent, raw
bytes message to send, and header info contained in the message.
Return type:
-
on_send_eof
()[source]¶ Actions to perform when EOF being sent.
Returns: True if EOF message should be sent, False otherwise. Return type: bool
-
opp_comm_kwargs
()[source]¶ Get keyword arguments to initialize communication with opposite comm object.
Returns: Keyword arguments for opposite comm object. Return type: dict
-
server_exists
(srv_address)[source]¶ Determine if a server exists.
Parameters: srv_address (str) – Address of server comm. Returns: - True if a server with the provided address exists, False
- otherwise.
Return type: bool
-
class
cis_interface.communication.ZMQComm.
ZMQProxy
(srv_address, context=None, retry_timeout=-1, nretry=1, **kwargs)[source]¶ Bases:
cis_interface.communication.CommBase.CommServer
Start a proxy in a new thread for a server address. A client-side address will be randomly generated.
Parameters: - srv_address (str) – Address that should face the server(s).
- context (zmq.Context, optional) – ZeroMQ context that should be used. Defaults to None and the global context is used.
- protocol (str, optional) – Protocol that should be used for the sockets. Defaults to None and is set to _default_protocol.
- host (str, optional) – Host for socket address. Defaults to ‘localhost’.
- retry_timeout (float, optional) – Time (in seconds) that should be waited before retrying to bind the sockets to the addresses. If negative, a retry will not be attempted and an error will be raised. Defaults to -1.
- nretry (int, optional) – Number of times to try binding the sockets to the addresses. Defaults to 1.
- **kwargs – Additional keyword arguments are passed to the parent class.
-
context
¶ ZeroMQ context that will be used.
Type: zmq.Context
-
srv_socket
¶ Socket facing client(s).
Type: zmq.Socket
-
cli_socket
¶ Socket facing server(s).
Type: zmq.Socket
-
cis_interface.communication.ZMQComm.
bind_socket
(socket, address, retry_timeout=-1, nretry=1)[source]¶ Bind a socket to an address, getting a random port as necessary.
Parameters: - socket (zmq.Socket) – Socket that should be bound.
- address (str) – Address that socket should be bound to.
- retry_timeout (float, optional) – Time (in seconds) that should be waited before retrying to bind the socket to the address. If negative, a retry will not be attempted and an error will be raised. Defaults to -1.
- nretry (int, optional) – Number of times to try binding the socket to the addresses. Defaults to 1.
Returns: - Address that socket was bound to, including random port if one
was used.
Return type:
-
cis_interface.communication.ZMQComm.
check_czmq
()[source]¶ Determine if the necessary C/C++ libraries are installed for ZeroMQ.
Returns: True if the libraries are installed, False otherwise. Return type: bool
-
cis_interface.communication.ZMQComm.
format_address
(protocol, host, port=None)[source]¶ Format an address based on its parts.
Parameters: Returns: Complete address.
Return type: Raises: ValueError
– If the protocol is not recognized.
-
cis_interface.communication.ZMQComm.
get_ipc_host
()[source]¶ Get an IPC host using uuid.
Returns: File path for IPC transport created using uuid. Return type: str
-
cis_interface.communication.ZMQComm.
get_socket_type_mate
(t_in)[source]¶ Find the counterpart socket type.
Parameters: t_in (str) – Socket type. Returns: Counterpart socket type. Return type: str Raises: ValueError
– If t_in is not a recognized socket type.
-
cis_interface.communication.ZMQComm.
parse_address
(address)[source]¶ Split an address into its parts.
Parameters: address (str) – Address to be split.
Returns: Parameters extracted from the address.
Return type: Raises: ValueError
– If the address dosn’t contain ‘://’.ValueError
– If the protocol is not supported.
Module contents¶
-
cis_interface.communication.
new_comm
(name, comm=None, **kwargs)[source]¶ Return a new communicator, creating necessary components for communication (queues, sockets, channels, etc.).
Parameters: Returns: Communicator of given class.
Return type: Comm
-
cis_interface.communication.
get_comm
(name, **kwargs)[source]¶ Return communicator for existing comm components.
Parameters: - name (str) – Communicator name.
- **kwargs – Additional keyword arguments are passed to new_comm.
Returns: Communicator of given class.
Return type: Comm
-
cis_interface.communication.
get_comm_class
(comm=None)[source]¶ Return a communication class given it’s name.
Parameters: comm (str, optional) – Name of communicator class. Defaults to tools.get_default_comm() if not provided. Returns: Communicator class. Return type: class