import unittest
import uuid
import copy
from cis_interface.communication import new_comm
from cis_interface.communication.tests import test_CommBase
[docs]class TestServerComm(test_CommBase.TestCommBase):
r"""Tests for ServerComm communication class."""
comm = 'ServerComm'
attr_list = (copy.deepcopy(test_CommBase.TestCommBase.attr_list)
+ ['response_kwargs', 'icomm', 'ocomm'])
@property
def send_inst_kwargs(self):
r"""dict: Keyword arguments for send instance."""
return {'comm': 'ClientComm'}
[docs] @unittest.skipIf(True, 'Server')
def test_error_send(self):
r"""Disabled: Test error on send."""
pass # pragma: no cover
[docs] @unittest.skipIf(True, 'Server')
def test_error_recv(self):
r"""Disabled: Test error on recv."""
pass # pragma: no cover
[docs] @unittest.skipIf(True, 'Server')
def test_invalid_direction(self):
r"""Disabled: Test of error on incorrect direction."""
pass # pragma: no cover
[docs] @unittest.skipIf(True, 'Server')
def test_work_comm(self):
r"""Disabled: Test creating/removing a work comm."""
pass # pragma: no cover
[docs] def test_newcomm_server(self):
r"""Test creation of server using newcomm."""
inst = new_comm('testserver_%s' % str(uuid.uuid4()), comm=self.comm)
self.remove_instance(inst)
[docs] def test_eof_no_close(self):
r"""Test send/recv of EOF message with no close."""
# Forwards
self.recv_instance.icomm.close_on_eof_recv = False
self.do_send_recv(send_meth='send_eof', close_on_recv_eof=False)
[docs] def test_call(self):
r"""Test RPC call."""
self.send_instance.sched_task(0.0, self.send_instance.rpcCall,
args=[self.test_msg], store_output=True)
flag, msg_recv = self.recv_instance.rpcRecv(timeout=self.timeout)
assert(flag)
self.assert_equal(msg_recv, self.test_msg)
flag = self.recv_instance.rpcSend(msg_recv)
assert(flag)
T = self.recv_instance.start_timeout()
while (not T.is_out) and (self.send_instance.sched_out is None): # pragma: debug
self.recv_instance.sleep()
self.recv_instance.stop_timeout()
flag, msg_recv = self.send_instance.sched_out
assert(flag)
self.assert_equal(msg_recv, self.test_msg)
[docs] def test_call_alias(self):
r"""Test RPC call aliases."""
# self.send_instance.sched_task(0.0, self.send_instance.rpcSend,
# args=[self.test_msg], store_output=True)
self.recv_instance.sched_task(self.sleeptime, self.recv_instance.rpcRecv,
kwargs=dict(timeout=self.timeout),
store_output=True)
flag = self.send_instance.rpcSend(self.test_msg)
assert(flag)
T = self.recv_instance.start_timeout()
while (not T.is_out) and (self.recv_instance.sched_out is None): # pragma: debug
self.recv_instance.sleep()
self.recv_instance.stop_timeout()
flag, msg_recv = self.recv_instance.sched_out
# flag, msg_recv = self.recv_instance.rpcRecv(timeout=self.timeout)
assert(flag)
self.assert_equal(msg_recv, self.test_msg)
flag = self.recv_instance.rpcSend(msg_recv)
assert(flag)
flag, msg_recv = self.send_instance.rpcRecv(timeout=self.timeout)
assert(flag)
self.assert_equal(msg_recv, self.test_msg)
[docs] def test_call_nolimit(self):
r"""Test RPC nolimit call."""
self.send_instance.sched_task(0.0, self.send_instance.call_nolimit,
args=[self.msg_long], store_output=True)
flag, msg_recv = self.recv_instance.recv_nolimit(timeout=self.timeout)
assert(flag)
self.assert_equal(msg_recv, self.msg_long)
flag = self.recv_instance.send_nolimit(msg_recv)
assert(flag)
T = self.recv_instance.start_timeout()
while (not T.is_out) and (self.send_instance.sched_out is None): # pragma: debug
self.recv_instance.sleep()
self.recv_instance.stop_timeout()
flag, msg_recv = self.send_instance.sched_out
assert(flag)
self.assert_equal(msg_recv, self.msg_long)
[docs] def test_close_in_thread(self):
r"""Test close of comm in thread."""
self.send_instance.close_in_thread()
self.recv_instance.close_in_thread()
# # This dosn't work for comms that are uni-directional
# def test_purge_recv(self):
# r"""Test purging messages from the client comm."""
# # Purge send while open
# if self.comm != 'CommBase':
# flag = self.send_instance.send(self.test_msg)
# assert(flag)
# T = self.recv_instance.start_timeout()
# while (not T.is_out) and (self.recv_instance.n_msg == 0): # pragma: debug
# self.recv_instance.sleep()
# self.recv_instance.stop_timeout()
# self.assert_equal(self.recv_instance.n_msg, 1)
# self.send_instance.purge()
# self.assert_equal(self.send_instance.n_msg, 0)
# self.assert_equal(self.recv_instance.n_msg, 0)
# # Purge send while closed
# self.send_instance.close()
# self.send_instance.purge()