cis_interface.communication.tests package¶
Submodules¶
cis_interface.communication.tests.test_AsciiFileComm module¶
-
class
cis_interface.communication.tests.test_AsciiFileComm.TestAsciiFileComm(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_FileComm.TestFileCommTest for AsciiFileComm communication class.
-
attr_list= ['name', 'address', 'direction', 'serializer', 'recv_timeout', 'close_on_eof_recv', 'opp_address', 'opp_comms', 'maxMsgSize', 'fd', 'read_meth', 'append', 'in_temp', 'open_as_binary', 'newline', 'is_series', 'platform_newline', 'comment']¶
-
comm= 'AsciiFileComm'¶
-
cis_interface.communication.tests.test_AsciiMapComm module¶
-
class
cis_interface.communication.tests.test_AsciiMapComm.TestAsciiMapComm(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_FileComm.TestFileCommTest for AsciiMapComm communication class.
-
comm= 'AsciiMapComm'¶
-
cis_interface.communication.tests.test_AsciiTableComm module¶
-
class
cis_interface.communication.tests.test_AsciiTableComm.TestAsciiTableComm(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_AsciiFileComm.TestAsciiFileCommTest for AsciiTableComm communication class.
-
comm= 'AsciiTableComm'¶
-
-
class
cis_interface.communication.tests.test_AsciiTableComm.TestAsciiTableComm_AsArray(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_AsciiTableComm.TestAsciiTableCommTest for AsciiTableComm communication class.
-
testing_option_kws= {'as_array': True}¶
-
cis_interface.communication.tests.test_AsyncComm module¶
-
class
cis_interface.communication.tests.test_AsyncComm.TestAsyncComm(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_CommBase.TestCommBaseTests for AsyncComm communication class.
-
attr_list= ['name', 'address', 'direction', 'serializer', 'recv_timeout', 'close_on_eof_recv', 'opp_address', 'opp_comms', 'maxMsgSize', 'dont_backlog', 'backlog_send_ready', 'backlog_recv_ready']¶
-
comm= 'AsyncComm'¶
-
cis_interface.communication.tests.test_CommBase module¶
-
class
cis_interface.communication.tests.test_CommBase.TestCommBase(*args, **kwargs)[source]¶ Bases:
cis_interface.tests.CisTestClassInfoTests for CommBase communication class.
-
attr_list= ['name', 'address', 'direction', 'serializer', 'recv_timeout', 'close_on_eof_recv', 'opp_address', 'opp_comms', 'maxMsgSize']¶
-
comm= 'CommBase'¶
-
do_send_recv(send_meth='send', recv_meth='recv', msg_send=None, msg_recv=None, n_msg_send_meth='n_msg_send', n_msg_recv_meth='n_msg_recv', reverse_comms=False, send_kwargs=None, recv_kwargs=None, n_send=1, n_recv=1, close_on_send_eof=None, close_on_recv_eof=None)[source]¶ Generic send/recv of a message.
-
recv_instance¶ Alias for instance.
-
send_inst_kwargs Keyword arguments for send instance.
Type: dict
-
cis_interface.communication.tests.test_FileComm module¶
-
class
cis_interface.communication.tests.test_FileComm.TestFileComm(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_CommBase.TestCommBaseTest for FileComm communication class.
-
attr_list= ['name', 'address', 'direction', 'serializer', 'recv_timeout', 'close_on_eof_recv', 'opp_address', 'opp_comms', 'maxMsgSize', 'fd', 'read_meth', 'append', 'in_temp', 'open_as_binary', 'newline', 'is_series', 'platform_newline']¶
-
comm= 'FileComm'¶
-
-
class
cis_interface.communication.tests.test_FileComm.TestFileComm_ascii(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_FileComm.TestFileCommTest for FileComm communication class with open_as_binary = False.
-
testing_option_kws= {'open_as_binary': False}¶
-
-
class
cis_interface.communication.tests.test_FileComm.TestFileComm_readline(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_FileComm.TestFileCommTest for FileComm communication class with read_meth = ‘readline’.
cis_interface.communication.tests.test_ForkComm module¶
-
class
cis_interface.communication.tests.test_ForkComm.TestForkComm(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_CommBase.TestCommBaseTests for ForkComm communication class.
-
attr_list= ['name', 'address', 'direction', 'serializer', 'recv_timeout', 'close_on_eof_recv', 'opp_address', 'opp_comms', 'maxMsgSize', 'comm_list', 'curr_comm_index']¶
-
comm= 'ForkComm'¶
-
ncomm= 2¶
-
-
class
cis_interface.communication.tests.test_ForkComm.TestForkCommList(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_ForkComm.TestForkCommTests for ForkComm communication class with construction from address.
cis_interface.communication.tests.test_IPCComm module¶
-
class
cis_interface.communication.tests.test_IPCComm.TestIPCComm(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_AsyncComm.TestAsyncCommTest for IPCComm communication class.
-
attr_list= ['name', 'address', 'direction', 'serializer', 'recv_timeout', 'close_on_eof_recv', 'opp_address', 'opp_comms', 'maxMsgSize', 'dont_backlog', 'backlog_send_ready', 'backlog_recv_ready', 'q']¶
-
comm= 'IPCComm'¶
-
-
cis_interface.communication.tests.test_IPCComm.test_ipcrm_not_isntalled()[source]¶ Test ipcrm if IPC library is not installed.
-
cis_interface.communication.tests.test_IPCComm.test_ipcrm_queues()[source]¶ Test removal of ipc queues.
-
cis_interface.communication.tests.test_IPCComm.test_ipcrm_queues_not_isntalled()[source]¶ Test ipcrm_queues if IPC library is not installed.
-
cis_interface.communication.tests.test_IPCComm.test_ipcs_not_isntalled()[source]¶ Test return of ipcs if IPC library is not installed.
-
cis_interface.communication.tests.test_IPCComm.test_not_running()[source]¶ Test raise of an error if a IPC library is not installed.
cis_interface.communication.tests.test_JSONFileComm module¶
-
class
cis_interface.communication.tests.test_JSONFileComm.TestJSONFileComm(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_FileComm.TestFileCommTest for JSONFileComm communication class.
-
comm= 'JSONFileComm'¶
-
cis_interface.communication.tests.test_MatFileComm module¶
-
class
cis_interface.communication.tests.test_MatFileComm.TestMatFileComm(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_FileComm.TestFileCommTest for MatFileComm communication class.
-
comm= 'MatFileComm'¶
-
cis_interface.communication.tests.test_ObjFileComm module¶
-
class
cis_interface.communication.tests.test_ObjFileComm.TestObjFileComm(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_PlyFileComm.TestPlyFileCommTest for ObjFileComm communication class.
-
comm= 'ObjFileComm'¶
-
cis_interface.communication.tests.test_PandasFileComm module¶
-
class
cis_interface.communication.tests.test_PandasFileComm.TestPandasFileComm(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_AsciiTableComm.TestAsciiTableCommTest for PandasFileComm communication class.
-
comm= 'PandasFileComm'¶
-
-
class
cis_interface.communication.tests.test_PandasFileComm.TestPandasFileComm_nonames(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_PandasFileComm.TestPandasFileCommTest for PandasFileComm communication class without field names sent.
-
testing_option_kws= {'no_names': True}¶
-
-
class
cis_interface.communication.tests.test_PandasFileComm.TestPandasFileComm_single(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_PandasFileComm.TestPandasFileCommTest for PandasFileComm communication class with field names sent.
cis_interface.communication.tests.test_PickleFileComm module¶
-
class
cis_interface.communication.tests.test_PickleFileComm.TestPickleFileComm(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_FileComm.TestFileCommTest for PickleFileComm communication class.
-
comm= 'PickleFileComm'¶
-
cis_interface.communication.tests.test_PlyFileComm module¶
-
class
cis_interface.communication.tests.test_PlyFileComm.TestPlyFileComm(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_FileComm.TestFileCommTest for PlyFileComm communication class.
-
comm= 'PlyFileComm'¶
-
cis_interface.communication.tests.test_RMQAsyncComm module¶
-
class
cis_interface.communication.tests.test_RMQAsyncComm.TestRMQAsyncComm(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_RMQComm.TestRMQCommTest for RMQAsyncComm communication class.
-
attr_list= ['name', 'address', 'direction', 'serializer', 'recv_timeout', 'close_on_eof_recv', 'opp_address', 'opp_comms', 'maxMsgSize', 'dont_backlog', 'backlog_send_ready', 'backlog_recv_ready', 'connection', 'channel', 'times_connected', 'rmq_thread', 'rmq_lock']¶
-
comm= 'RMQAsyncComm'¶
-
cis_interface.communication.tests.test_RMQComm module¶
-
class
cis_interface.communication.tests.test_RMQComm.TestRMQComm(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_AsyncComm.TestAsyncCommTest for RMQComm communication class.
-
attr_list= ['name', 'address', 'direction', 'serializer', 'recv_timeout', 'close_on_eof_recv', 'opp_address', 'opp_comms', 'maxMsgSize', 'dont_backlog', 'backlog_send_ready', 'backlog_recv_ready', 'connection', 'channel']¶
-
comm= 'RMQComm'¶
-
timeout= 10.0¶
-
cis_interface.communication.tests.test_ServerComm module¶
-
class
cis_interface.communication.tests.test_ServerComm.TestServerComm(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_CommBase.TestCommBaseTests for ServerComm communication class.
-
attr_list= ['name', 'address', 'direction', 'serializer', 'recv_timeout', 'close_on_eof_recv', 'opp_address', 'opp_comms', 'maxMsgSize', 'response_kwargs', 'icomm', 'ocomm']¶
-
comm= 'ServerComm'¶
-
cis_interface.communication.tests.test_YAMLFileComm module¶
-
class
cis_interface.communication.tests.test_YAMLFileComm.TestYAMLFileComm(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_FileComm.TestFileCommTest for YAMLFileComm communication class.
-
comm= 'YAMLFileComm'¶
-
cis_interface.communication.tests.test_ZMQComm module¶
-
class
cis_interface.communication.tests.test_ZMQComm.TestZMQComm(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_AsyncComm.TestAsyncCommTest for ZMQComm communication class.
-
attr_list= ['name', 'address', 'direction', 'serializer', 'recv_timeout', 'close_on_eof_recv', 'opp_address', 'opp_comms', 'maxMsgSize', 'dont_backlog', 'backlog_send_ready', 'backlog_recv_ready', 'context', 'socket', 'socket_type_name', 'socket_type', 'protocol', 'host', 'port']¶
-
comm= 'ZMQComm'¶
-
description_prefix¶ String prefix to prepend docstr test message with.
-
protocol= None¶
-
send_inst_kwargs¶ Keyword arguments for send instance.
-
socket_type= None¶
-
-
class
cis_interface.communication.tests.test_ZMQComm.TestZMQCommINPROC(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_ZMQComm.TestZMQCommTest for ZMQComm communication class with INPROC socket.
-
protocol= 'inproc'¶
-
-
class
cis_interface.communication.tests.test_ZMQComm.TestZMQCommIPC(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_ZMQComm.TestZMQCommTest for ZMQComm communication class with IPC socket.
-
protocol= 'ipc'¶
-
-
class
cis_interface.communication.tests.test_ZMQComm.TestZMQCommIPC_client(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_ZMQComm.TestZMQComm_client,cis_interface.communication.tests.test_ZMQComm.TestZMQCommIPCTest for ZMQComm communication class with IPC socket.
-
class
cis_interface.communication.tests.test_ZMQComm.TestZMQCommPAIR(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_ZMQComm.TestZMQCommTest for ZMQComm communication class with PAIR/PAIR socket.
-
socket_type= 'PAIR'¶
-
-
class
cis_interface.communication.tests.test_ZMQComm.TestZMQCommPUB(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_ZMQComm.TestZMQCommTest for ZMQComm communication class with PUB/SUB socket.
-
socket_type= 'PUB'¶
-
-
class
cis_interface.communication.tests.test_ZMQComm.TestZMQCommPUSH(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_ZMQComm.TestZMQCommTest for ZMQComm communication class with PUSH/PULL socket.
-
socket_type= 'PUSH'¶
-
-
class
cis_interface.communication.tests.test_ZMQComm.TestZMQCommPUSH_INPROC(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_ZMQComm.TestZMQCommINPROCTest for ZMQComm communication class with INPROC PUSH/PULL socket.
-
socket_type= 'PUSH'¶
-
-
class
cis_interface.communication.tests.test_ZMQComm.TestZMQCommREQ(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_ZMQComm.TestZMQCommTest for ZMQComm communication class with REP/REQ socket.
-
socket_type= 'REQ'¶
-
-
class
cis_interface.communication.tests.test_ZMQComm.TestZMQCommROUTER(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_ZMQComm.TestZMQCommTest for ZMQComm communication class with DEALER/ROUTER socket.
-
setup(*args, **kwargs)[source]¶ Initialize comm object pair with sleep after setup to ensure dealer has connected.
-
socket_type= 'ROUTER'¶
-
-
class
cis_interface.communication.tests.test_ZMQComm.TestZMQCommTCP(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_ZMQComm.TestZMQCommTest for ZMQComm communication class with TCP socket.
-
protocol= 'tcp'¶
-
-
class
cis_interface.communication.tests.test_ZMQComm.TestZMQComm_client(*args, **kwargs)[source]¶ Bases:
cis_interface.communication.tests.test_ZMQComm.TestZMQCommTest for ZMQComm communication class for client/server.
-
send_inst_kwargs¶ Keyword arguments for send instance.
-
-
cis_interface.communication.tests.test_ZMQComm.test_error_on_send_open_twice()[source]¶ Test creation of the same send socket twice for an error.
-
cis_interface.communication.tests.test_ZMQComm.test_format_address()[source]¶ Test format/parse of address.