Source code for cis_interface.communication.tests.test_RMQAsyncComm

import unittest
import copy
from cis_interface.tests import assert_raises
from cis_interface.communication import new_comm
from cis_interface.communication.RMQComm import _rmq_server_running
from cis_interface.communication.tests import test_RMQComm as parent

    
[docs]@unittest.skipIf(not _rmq_server_running, "RMQ Server not running") class TestRMQAsyncComm(parent.TestRMQComm): r"""Test for RMQAsyncComm communication class.""" comm = 'RMQAsyncComm' attr_list = (copy.deepcopy(parent.TestRMQComm.attr_list) + ['times_connected', 'rmq_thread', 'rmq_lock'])
[docs] def test_reconnect(self): r"""Test reconnect after unexpected disconnect.""" self.recv_instance.connection.close(reply_code=100, reply_text="Test shutdown") T = self.recv_instance.start_timeout(5.0) while (not T.is_out) and (self.recv_instance.times_connected == 1): self.instance.sleep() self.instance.stop_timeout()
# def test_send_recv_direct(self): # r"""Disabled: Test send/recv direct.""" # pass
[docs]@unittest.skipIf(_rmq_server_running, "RMQ Server running") def test_not_running(): r"""Test raise of an error if a RMQ server is not running.""" comm_kwargs = dict(comm='RMQAsyncComm', direction='send', reverse_names=True) assert_raises(RuntimeError, new_comm, 'test', **comm_kwargs)