cis_interface.communication.tests package¶
Submodules¶
cis_interface.communication.tests.test_AsciiFileComm module¶
-
class
cis_interface.communication.tests.test_AsciiFileComm.
TestAsciiFileComm
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_FileComm.TestFileComm
Test for AsciiFileComm communication class.
-
attr_list
= ['name', 'address', 'direction', 'serializer', 'recv_timeout', 'close_on_eof_recv', 'opp_address', 'opp_comms', 'maxMsgSize', 'fd', 'read_meth', 'append', 'in_temp', 'open_as_binary', 'newline', 'is_series', 'platform_newline', 'comment']¶
-
comm
= 'AsciiFileComm'¶
-
cis_interface.communication.tests.test_AsciiMapComm module¶
-
class
cis_interface.communication.tests.test_AsciiMapComm.
TestAsciiMapComm
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_FileComm.TestFileComm
Test for AsciiMapComm communication class.
-
comm
= 'AsciiMapComm'¶
-
cis_interface.communication.tests.test_AsciiTableComm module¶
-
class
cis_interface.communication.tests.test_AsciiTableComm.
TestAsciiTableComm
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_AsciiFileComm.TestAsciiFileComm
Test for AsciiTableComm communication class.
-
comm
= 'AsciiTableComm'¶
-
-
class
cis_interface.communication.tests.test_AsciiTableComm.
TestAsciiTableComm_AsArray
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_AsciiTableComm.TestAsciiTableComm
Test for AsciiTableComm communication class.
-
testing_option_kws
= {'as_array': True}¶
-
cis_interface.communication.tests.test_AsyncComm module¶
-
class
cis_interface.communication.tests.test_AsyncComm.
TestAsyncComm
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_CommBase.TestCommBase
Tests for AsyncComm communication class.
-
attr_list
= ['name', 'address', 'direction', 'serializer', 'recv_timeout', 'close_on_eof_recv', 'opp_address', 'opp_comms', 'maxMsgSize', 'dont_backlog', 'backlog_send_ready', 'backlog_recv_ready']¶
-
comm
= 'AsyncComm'¶
-
cis_interface.communication.tests.test_CommBase module¶
-
class
cis_interface.communication.tests.test_CommBase.
TestCommBase
(*args, **kwargs)[source]¶ Bases:
cis_interface.tests.CisTestClassInfo
Tests for CommBase communication class.
-
attr_list
= ['name', 'address', 'direction', 'serializer', 'recv_timeout', 'close_on_eof_recv', 'opp_address', 'opp_comms', 'maxMsgSize']¶
-
comm
= 'CommBase'¶
-
do_send_recv
(send_meth='send', recv_meth='recv', msg_send=None, msg_recv=None, n_msg_send_meth='n_msg_send', n_msg_recv_meth='n_msg_recv', reverse_comms=False, send_kwargs=None, recv_kwargs=None, n_send=1, n_recv=1, close_on_send_eof=None, close_on_recv_eof=None)[source]¶ Generic send/recv of a message.
-
recv_instance
¶ Alias for instance.
-
send_inst_kwargs
Keyword arguments for send instance.
Type: dict
-
cis_interface.communication.tests.test_FileComm module¶
-
class
cis_interface.communication.tests.test_FileComm.
TestFileComm
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_CommBase.TestCommBase
Test for FileComm communication class.
-
attr_list
= ['name', 'address', 'direction', 'serializer', 'recv_timeout', 'close_on_eof_recv', 'opp_address', 'opp_comms', 'maxMsgSize', 'fd', 'read_meth', 'append', 'in_temp', 'open_as_binary', 'newline', 'is_series', 'platform_newline']¶
-
comm
= 'FileComm'¶
-
-
class
cis_interface.communication.tests.test_FileComm.
TestFileComm_ascii
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_FileComm.TestFileComm
Test for FileComm communication class with open_as_binary = False.
-
testing_option_kws
= {'open_as_binary': False}¶
-
-
class
cis_interface.communication.tests.test_FileComm.
TestFileComm_readline
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_FileComm.TestFileComm
Test for FileComm communication class with read_meth = ‘readline’.
cis_interface.communication.tests.test_ForkComm module¶
-
class
cis_interface.communication.tests.test_ForkComm.
TestForkComm
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_CommBase.TestCommBase
Tests for ForkComm communication class.
-
attr_list
= ['name', 'address', 'direction', 'serializer', 'recv_timeout', 'close_on_eof_recv', 'opp_address', 'opp_comms', 'maxMsgSize', 'comm_list', 'curr_comm_index']¶
-
comm
= 'ForkComm'¶
-
ncomm
= 2¶
-
-
class
cis_interface.communication.tests.test_ForkComm.
TestForkCommList
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_ForkComm.TestForkComm
Tests for ForkComm communication class with construction from address.
cis_interface.communication.tests.test_IPCComm module¶
-
class
cis_interface.communication.tests.test_IPCComm.
TestIPCComm
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_AsyncComm.TestAsyncComm
Test for IPCComm communication class.
-
attr_list
= ['name', 'address', 'direction', 'serializer', 'recv_timeout', 'close_on_eof_recv', 'opp_address', 'opp_comms', 'maxMsgSize', 'dont_backlog', 'backlog_send_ready', 'backlog_recv_ready', 'q']¶
-
comm
= 'IPCComm'¶
-
-
cis_interface.communication.tests.test_IPCComm.
test_ipcrm_not_isntalled
()[source]¶ Test ipcrm if IPC library is not installed.
-
cis_interface.communication.tests.test_IPCComm.
test_ipcrm_queues
()[source]¶ Test removal of ipc queues.
-
cis_interface.communication.tests.test_IPCComm.
test_ipcrm_queues_not_isntalled
()[source]¶ Test ipcrm_queues if IPC library is not installed.
-
cis_interface.communication.tests.test_IPCComm.
test_ipcs_not_isntalled
()[source]¶ Test return of ipcs if IPC library is not installed.
-
cis_interface.communication.tests.test_IPCComm.
test_not_running
()[source]¶ Test raise of an error if a IPC library is not installed.
cis_interface.communication.tests.test_JSONFileComm module¶
-
class
cis_interface.communication.tests.test_JSONFileComm.
TestJSONFileComm
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_FileComm.TestFileComm
Test for JSONFileComm communication class.
-
comm
= 'JSONFileComm'¶
-
cis_interface.communication.tests.test_MatFileComm module¶
-
class
cis_interface.communication.tests.test_MatFileComm.
TestMatFileComm
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_FileComm.TestFileComm
Test for MatFileComm communication class.
-
comm
= 'MatFileComm'¶
-
cis_interface.communication.tests.test_ObjFileComm module¶
-
class
cis_interface.communication.tests.test_ObjFileComm.
TestObjFileComm
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_PlyFileComm.TestPlyFileComm
Test for ObjFileComm communication class.
-
comm
= 'ObjFileComm'¶
-
cis_interface.communication.tests.test_PandasFileComm module¶
-
class
cis_interface.communication.tests.test_PandasFileComm.
TestPandasFileComm
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_AsciiTableComm.TestAsciiTableComm
Test for PandasFileComm communication class.
-
comm
= 'PandasFileComm'¶
-
-
class
cis_interface.communication.tests.test_PandasFileComm.
TestPandasFileComm_nonames
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_PandasFileComm.TestPandasFileComm
Test for PandasFileComm communication class without field names sent.
-
testing_option_kws
= {'no_names': True}¶
-
-
class
cis_interface.communication.tests.test_PandasFileComm.
TestPandasFileComm_single
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_PandasFileComm.TestPandasFileComm
Test for PandasFileComm communication class with field names sent.
cis_interface.communication.tests.test_PickleFileComm module¶
-
class
cis_interface.communication.tests.test_PickleFileComm.
TestPickleFileComm
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_FileComm.TestFileComm
Test for PickleFileComm communication class.
-
comm
= 'PickleFileComm'¶
-
cis_interface.communication.tests.test_PlyFileComm module¶
-
class
cis_interface.communication.tests.test_PlyFileComm.
TestPlyFileComm
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_FileComm.TestFileComm
Test for PlyFileComm communication class.
-
comm
= 'PlyFileComm'¶
-
cis_interface.communication.tests.test_RMQAsyncComm module¶
-
class
cis_interface.communication.tests.test_RMQAsyncComm.
TestRMQAsyncComm
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_RMQComm.TestRMQComm
Test for RMQAsyncComm communication class.
-
attr_list
= ['name', 'address', 'direction', 'serializer', 'recv_timeout', 'close_on_eof_recv', 'opp_address', 'opp_comms', 'maxMsgSize', 'dont_backlog', 'backlog_send_ready', 'backlog_recv_ready', 'connection', 'channel', 'times_connected', 'rmq_thread', 'rmq_lock']¶
-
comm
= 'RMQAsyncComm'¶
-
cis_interface.communication.tests.test_RMQComm module¶
-
class
cis_interface.communication.tests.test_RMQComm.
TestRMQComm
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_AsyncComm.TestAsyncComm
Test for RMQComm communication class.
-
attr_list
= ['name', 'address', 'direction', 'serializer', 'recv_timeout', 'close_on_eof_recv', 'opp_address', 'opp_comms', 'maxMsgSize', 'dont_backlog', 'backlog_send_ready', 'backlog_recv_ready', 'connection', 'channel']¶
-
comm
= 'RMQComm'¶
-
timeout
= 10.0¶
-
cis_interface.communication.tests.test_ServerComm module¶
-
class
cis_interface.communication.tests.test_ServerComm.
TestServerComm
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_CommBase.TestCommBase
Tests for ServerComm communication class.
-
attr_list
= ['name', 'address', 'direction', 'serializer', 'recv_timeout', 'close_on_eof_recv', 'opp_address', 'opp_comms', 'maxMsgSize', 'response_kwargs', 'icomm', 'ocomm']¶
-
comm
= 'ServerComm'¶
-
cis_interface.communication.tests.test_YAMLFileComm module¶
-
class
cis_interface.communication.tests.test_YAMLFileComm.
TestYAMLFileComm
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_FileComm.TestFileComm
Test for YAMLFileComm communication class.
-
comm
= 'YAMLFileComm'¶
-
cis_interface.communication.tests.test_ZMQComm module¶
-
class
cis_interface.communication.tests.test_ZMQComm.
TestZMQComm
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_AsyncComm.TestAsyncComm
Test for ZMQComm communication class.
-
attr_list
= ['name', 'address', 'direction', 'serializer', 'recv_timeout', 'close_on_eof_recv', 'opp_address', 'opp_comms', 'maxMsgSize', 'dont_backlog', 'backlog_send_ready', 'backlog_recv_ready', 'context', 'socket', 'socket_type_name', 'socket_type', 'protocol', 'host', 'port']¶
-
comm
= 'ZMQComm'¶
-
description_prefix
¶ String prefix to prepend docstr test message with.
-
protocol
= None¶
-
send_inst_kwargs
¶ Keyword arguments for send instance.
-
socket_type
= None¶
-
-
class
cis_interface.communication.tests.test_ZMQComm.
TestZMQCommINPROC
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_ZMQComm.TestZMQComm
Test for ZMQComm communication class with INPROC socket.
-
protocol
= 'inproc'¶
-
-
class
cis_interface.communication.tests.test_ZMQComm.
TestZMQCommIPC
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_ZMQComm.TestZMQComm
Test for ZMQComm communication class with IPC socket.
-
protocol
= 'ipc'¶
-
-
class
cis_interface.communication.tests.test_ZMQComm.
TestZMQCommIPC_client
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_ZMQComm.TestZMQComm_client
,cis_interface.communication.tests.test_ZMQComm.TestZMQCommIPC
Test for ZMQComm communication class with IPC socket.
-
class
cis_interface.communication.tests.test_ZMQComm.
TestZMQCommPAIR
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_ZMQComm.TestZMQComm
Test for ZMQComm communication class with PAIR/PAIR socket.
-
socket_type
= 'PAIR'¶
-
-
class
cis_interface.communication.tests.test_ZMQComm.
TestZMQCommPUB
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_ZMQComm.TestZMQComm
Test for ZMQComm communication class with PUB/SUB socket.
-
socket_type
= 'PUB'¶
-
-
class
cis_interface.communication.tests.test_ZMQComm.
TestZMQCommPUSH
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_ZMQComm.TestZMQComm
Test for ZMQComm communication class with PUSH/PULL socket.
-
socket_type
= 'PUSH'¶
-
-
class
cis_interface.communication.tests.test_ZMQComm.
TestZMQCommPUSH_INPROC
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_ZMQComm.TestZMQCommINPROC
Test for ZMQComm communication class with INPROC PUSH/PULL socket.
-
socket_type
= 'PUSH'¶
-
-
class
cis_interface.communication.tests.test_ZMQComm.
TestZMQCommREQ
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_ZMQComm.TestZMQComm
Test for ZMQComm communication class with REP/REQ socket.
-
socket_type
= 'REQ'¶
-
-
class
cis_interface.communication.tests.test_ZMQComm.
TestZMQCommROUTER
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_ZMQComm.TestZMQComm
Test for ZMQComm communication class with DEALER/ROUTER socket.
-
setup
(*args, **kwargs)[source]¶ Initialize comm object pair with sleep after setup to ensure dealer has connected.
-
socket_type
= 'ROUTER'¶
-
-
class
cis_interface.communication.tests.test_ZMQComm.
TestZMQCommTCP
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_ZMQComm.TestZMQComm
Test for ZMQComm communication class with TCP socket.
-
protocol
= 'tcp'¶
-
-
class
cis_interface.communication.tests.test_ZMQComm.
TestZMQComm_client
(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_ZMQComm.TestZMQComm
Test for ZMQComm communication class for client/server.
-
send_inst_kwargs
¶ Keyword arguments for send instance.
-
-
cis_interface.communication.tests.test_ZMQComm.
test_error_on_send_open_twice
()[source]¶ Test creation of the same send socket twice for an error.
-
cis_interface.communication.tests.test_ZMQComm.
test_format_address
()[source]¶ Test format/parse of address.