diff options
Diffstat (limited to 'test/integration/test_xmpp_transports_nb.py')
-rw-r--r-- | test/integration/test_xmpp_transports_nb.py | 277 |
1 files changed, 277 insertions, 0 deletions
diff --git a/test/integration/test_xmpp_transports_nb.py b/test/integration/test_xmpp_transports_nb.py new file mode 100644 index 000000000..f6cbab510 --- /dev/null +++ b/test/integration/test_xmpp_transports_nb.py @@ -0,0 +1,277 @@ +''' +Integration test for tranports classes. See unit for the ordinary +unit tests of this module. +''' + +import unittest +import socket + +import lib +lib.setup_env() + +from xmpp_mocks import IdleQueueThread, IdleMock +from common.xmpp import transports_nb + + +class AbstractTransportTest(unittest.TestCase): + ''' Encapsulates Idlequeue instantiation for transports and more...''' + + def setUp(self): + ''' IdleQueue thread is run and dummy connection is created. ''' + self.idlequeue_thread = IdleQueueThread() + self.idlequeue_thread.start() + self._setup_hook() + + def tearDown(self): + ''' IdleQueue thread is stopped. ''' + self._teardown_hook() + self.idlequeue_thread.stop_thread() + self.idlequeue_thread.join() + + def _setup_hook(self): + pass + + def _teardown_hook(self): + pass + + def expect_receive(self, expected, count=1, msg=None): + ''' + Returns a callback function that will assert whether the data passed to + it equals the one specified when calling this function. + + Can be used to make sure transport dispatch correct data. + ''' + def receive(data, *args, **kwargs): + self.assertEqual(data, expected, msg=msg) + self._expected_count -= 1 + self._expected_count = count + return receive + + def have_received_expected(self): + ''' + Plays together with expect_receive(). Will return true if expected_rcv + callback was called as often as specified + ''' + return self._expected_count == 0 + + +class TestNonBlockingTCP(AbstractTransportTest): + ''' + Test class for NonBlockingTCP. Will actually try to connect to an existing + XMPP server. + ''' + class MockClient(IdleMock): + ''' Simple client to test transport functionality ''' + def __init__(self, idlequeue, testcase): + self.idlequeue = idlequeue + self.testcase = testcase + IdleMock.__init__(self) + + def do_connect(self, establish_tls=False, proxy_dict=None): + try: + ips = socket.getaddrinfo('gajim.org', 5222, + socket.AF_UNSPEC,socket.SOCK_STREAM) + ip = ips[0] + except socket.error, e: + self.testcase.fail(msg=str(e)) + + self.socket = transports_nb.NonBlockingTCP( + raise_event=lambda event_type, data: self.testcase.assertTrue( + event_type and data), + on_disconnect=lambda: self.on_success(mode='SocketDisconnect'), + idlequeue=self.idlequeue, + estabilish_tls=establish_tls, + certs=('../data/other/cacerts.pem', 'tmp/cacerts.pem'), + proxy_dict=proxy_dict) + + self.socket.PlugIn(self) + + self.socket.connect(conn_5tuple=ip, + on_connect=lambda: self.on_success(mode='TCPconnect'), + on_connect_failure=self.on_failure) + self.testcase.assertTrue(self.wait(), msg='Connection timed out') + + def do_disconnect(self): + self.socket.disconnect() + self.testcase.assertTrue(self.wait(), msg='Disconnect timed out') + + def on_failure(self, err_message): + self.set_event() + self.testcase.fail(msg=err_message) + + def on_success(self, mode, data=None): + if mode == "TCPconnect": + pass + if mode == "SocketDisconnect": + pass + self.set_event() + + def _setup_hook(self): + self.client = self.MockClient(idlequeue=self.idlequeue_thread.iq, + testcase=self) + + def _teardown_hook(self): + if self.client.socket.state == 'CONNECTED': + self.client.do_disconnect() + + def test_connect_disconnect_plain(self): + ''' Establish plain connection ''' + self.client.do_connect(establish_tls=False) + self.assertEquals(self.client.socket.state, 'CONNECTED') + self.client.do_disconnect() + self.assertEquals(self.client.socket.state, 'DISCONNECTED') + +# def test_connect_disconnect_ssl(self): +# ''' Establish SSL (not TLS) connection ''' +# self.client.do_connect(establish_tls=True) +# self.assertEquals(self.client.socket.state, 'CONNECTED') +# self.client.do_disconnect() +# self.assertEquals(self.client.socket.state, 'DISCONNECTED') + + def test_do_receive(self): + ''' Test _do_receive method by overwriting socket.recv ''' + self.client.do_connect() + sock = self.client.socket + + # transport shall receive data + data = "Please don't fail" + sock._recv = lambda buffer: data + sock.onreceive(self.expect_receive(data)) + sock._do_receive() + self.assertTrue(self.have_received_expected(), msg='Did not receive data') + self.assert_(self.client.socket.state == 'CONNECTED') + + # transport shall do nothing as an non-fatal SSL is simulated + sock._recv = lambda buffer: None + sock.onreceive(self.assertFalse) # we did not receive anything... + sock._do_receive() + self.assert_(self.client.socket.state == 'CONNECTED') + + # transport shall disconnect as remote side closed the connection + sock._recv = lambda buffer: '' + sock.onreceive(self.assertFalse) # we did not receive anything... + sock._do_receive() + self.assert_(self.client.socket.state == 'DISCONNECTED') + + def test_do_send(self): + ''' Test _do_send method by overwriting socket.send ''' + self.client.do_connect() + sock = self.client.socket + + outgoing = [] # what we have actually send to our socket.socket + data_part1 = "Please don't " + data_part2 = "fail!" + data_complete = data_part1 + data_part2 + + # Simulate everything could be send in one go + def _send_all(data): + outgoing.append(data) + return len(data) + sock._send = _send_all + sock.send(data_part1) + sock.send(data_part2) + sock._do_send() + sock._do_send() + self.assertTrue(self.client.socket.state == 'CONNECTED') + self.assertTrue(data_part1 in outgoing and data_part2 in outgoing) + self.assertFalse(sock.sendqueue and sock.sendbuff, + msg='There is still unsend data in buffers') + + # Simulate data could only be sent in chunks + self.chunk_count = 0 + outgoing = [] + def _send_chunks(data): + if self.chunk_count == 0: + outgoing.append(data_part1) + self.chunk_count += 1 + return len(data_part1) + else: + outgoing.append(data_part2) + return len(data_part2) + sock._send = _send_chunks + sock.send(data_complete) + sock._do_send() # process first chunk + sock._do_send() # process the second one + self.assertTrue(self.client.socket.state == 'CONNECTED') + self.assertTrue(data_part1 in outgoing and data_part2 in outgoing) + self.assertFalse(sock.sendqueue and sock.sendbuff, + msg='There is still unsend data in buffers') + + +class TestNonBlockingHTTP(AbstractTransportTest): + ''' Test class for NonBlockingHTTP transport''' + + bosh_http_dict = { + 'http_uri': 'http://gajim.org:5280/http-bind', + 'http_version': 'HTTP/1.1', + 'http_persistent': True, + 'add_proxy_headers': False + } + + def _get_transport(self, http_dict, proxy_dict=None): + return transports_nb.NonBlockingHTTP( + raise_event=None, + on_disconnect=None, + idlequeue=self.idlequeue_thread.iq, + estabilish_tls=False, + certs=None, + on_http_request_possible=lambda: None, + on_persistent_fallback=None, + http_dict=http_dict, + proxy_dict=proxy_dict, + ) + + def test_parse_own_http_message(self): + ''' Build a HTTP message and try to parse it afterwards ''' + transport = self._get_transport(self.bosh_http_dict) + + data = "<test>Please don't fail!</test>" + http_message = transport.build_http_message(data) + statusline, headers, http_body = transport.parse_http_message( + http_message) + + self.assertTrue(statusline and isinstance(statusline, list)) + self.assertTrue(headers and isinstance(headers, dict)) + self.assertEqual(data, http_body, msg='Input and output are different') + + def test_receive_http_message(self): + ''' Let _on_receive handle some http messages ''' + transport = self._get_transport(self.bosh_http_dict) + + header = ("HTTP/1.1 200 OK\r\nContent-Type: text/xml; charset=utf-8\r\n" + + "Content-Length: 88\r\n\r\n") + payload = "<test>Please don't fail!</test>" + body = "<body xmlns='http://jabber.org/protocol/httpbind'>%s</body>" \ + % payload + message = "%s%s" % (header, body) + + # try to receive in one go + transport.onreceive(self.expect_receive(body, msg='Failed: In one go')) + transport._on_receive(message) + self.assertTrue(self.have_received_expected(), msg='Failed: In one go') + +# FIXME: Not yet implemented. +# def test_receive_http_message_in_chunks(self): +# ''' Let _on_receive handle some chunked http messages ''' +# transport = self._get_transport(self.bosh_http_dict) +# +# header = ("HTTP/1.1 200 OK\r\nContent-Type: text/xml; charset=utf-8\r\n" + +# "Content-Length: 88\r\n\r\n") +# payload = "<test>Please don't fail!</test>" +# body = "<body xmlns='http://jabber.org/protocol/httpbind'>%s</body>" \ +# % payload +# message = "%s%s" % (header, body) +# +# chunk1, chunk2, chunk3 = message[:20], message[20:73], message[73:] +# nextmessage_chunk = "\r\n\r\nHTTP/1.1 200 OK\r\nContent-Type: text/x" +# chunks = (chunk1, chunk2, chunk3, nextmessage_chunk) +# +# transport.onreceive(self.expect_receive(body, msg='Failed: In chunks')) +# for chunk in chunks: +# transport._on_receive(chunk) +# self.assertTrue(self.have_received_expected(), msg='Failed: In chunks') + +if __name__ == '__main__': + unittest.main() + +# vim: se ts=3: |