From 2e5bf4d0d2b205cc76221cbe253176bdbc56d3a3 Mon Sep 17 00:00:00 2001 From: Stephan Erb Date: Thu, 5 Nov 2009 09:06:46 +0100 Subject: Organize tests into unit and integration tests. Integration tests can depend on UI, network or both. Unittests use neither. --- test/integration/__init__.py | 6 + test/integration/test_gui_event_integration.py | 196 +++++++++++++++++ test/integration/test_resolver.py | 102 +++++++++ test/integration/test_roster.py | 209 +++++++++++++++++++ test/integration/test_xmpp_client_nb.py | 171 +++++++++++++++ test/integration/test_xmpp_transports_nb.py | 277 +++++++++++++++++++++++++ 6 files changed, 961 insertions(+) create mode 100644 test/integration/__init__.py create mode 100644 test/integration/test_gui_event_integration.py create mode 100644 test/integration/test_resolver.py create mode 100644 test/integration/test_roster.py create mode 100644 test/integration/test_xmpp_client_nb.py create mode 100644 test/integration/test_xmpp_transports_nb.py (limited to 'test/integration') diff --git a/test/integration/__init__.py b/test/integration/__init__.py new file mode 100644 index 000000000..0adf844f1 --- /dev/null +++ b/test/integration/__init__.py @@ -0,0 +1,6 @@ +''' + +This package contains integration tests. Integration tests are tests +which require or include UI, network or both. + +''' \ No newline at end of file diff --git a/test/integration/test_gui_event_integration.py b/test/integration/test_gui_event_integration.py new file mode 100644 index 000000000..00ed5c696 --- /dev/null +++ b/test/integration/test_gui_event_integration.py @@ -0,0 +1,196 @@ +''' +Tests for the miscellaneous functions scattered throughout src/gajim.py +''' +import unittest + +import lib +lib.setup_env() + +from common import gajim +from gajim import Interface + +from gajim_mocks import * +gajim.logger = MockLogger() + +Interface() + +import time +from data import * + +import roster_window + +import notify + +class TestStatusChange(unittest.TestCase): + '''tests gajim.py's incredibly complex handle_event_notify''' + + def setUp(self): + gajim.interface.roster = roster_window.RosterWindow() + + for acc in contacts: + gajim.connections[acc] = MockConnection(acc) + + gajim.interface.roster.fill_contacts_and_groups_dicts(contacts[acc], + acc) + gajim.interface.roster.add_account(acc) + gajim.interface.roster.add_account_contacts(acc) + + self.assertEqual(0, len(notify.notifications)) + + def tearDown(self): + gajim.interface.roster.model.clear() + + for acc in contacts: + gajim.contacts.clear_contacts(acc) + + del gajim.interface.roster + + notify.notifications = [] + + def contact_comes_online(self, account, jid, resource, prio): + '''a remote contact comes online''' + gajim.interface.handle_event_notify(account, (jid, 'online', "I'm back!", + resource, prio, None, time.time(), None)) + + contact = None + for c in gajim.contacts.get_contacts(account, jid): + if c.resource == resource: + contact = c + break + + self.assertEqual('online', contact.show) + self.assertEqual("I'm back!", contact.status) + self.assertEqual(prio, contact.priority) + + # the most recent notification is that the contact connected + self.assertEqual('contact_connected', notify.notifications[-1][0]) + + def contact_goes_offline(self, account, jid, resource, prio, + still_exists = True): + '''a remote contact goes offline.''' + gajim.interface.handle_event_notify(account, (jid, 'offline', 'Goodbye!', + resource, prio, None, time.time(), None)) + + contact = None + for c in gajim.contacts.get_contacts(account, jid): + if c.resource == resource: + contact = c + break + + if not still_exists: + self.assert_(contact is None) + return + + self.assertEqual('offline', contact.show) + self.assertEqual('Goodbye!', contact.status) + self.assertEqual(prio, contact.priority) + + self.assertEqual('contact_disconnected', notify.notifications[-1][0]) + + def user_starts_chatting(self, jid, account, resource=None): + '''the user opens a chat window and starts talking''' + ctrl = MockChatControl(jid, account) + win = MockWindow() + win.new_tab(ctrl) + gajim.interface.msg_win_mgr._windows['test'] = win + + if resource: + jid = jid + '/' + resource + + # a basic session is started + session = gajim.connections[account1].make_new_session(jid, + '01234567890abcdef', cls=MockSession) + ctrl.set_session(session) + + return ctrl + + def user_starts_esession(self, jid, resource, account): + '''the user opens a chat window and starts an encrypted session''' + ctrl = self.user_starts_chatting(jid, account, resource) + ctrl.session.status = 'active' + ctrl.session.enable_encryption = True + + return ctrl + + def test_contact_comes_online(self): + jid = 'default1@gajim.org' + + # contact is offline initially + contacts = gajim.contacts.get_contacts(account1, jid) + self.assertEqual(1, len(contacts)) + self.assertEqual('offline', contacts[0].show) + self.assertEqual('', contacts[0].status) + + self.contact_comes_online(account1, jid, 'lowprio', 1) + + def test_contact_goes_offline(self): + jid = 'default1@gajim.org' + + self.contact_comes_online(account1, jid, 'lowprio', 1) + + ctrl = self.user_starts_chatting(jid, account1) + orig_sess = ctrl.session + + self.contact_goes_offline(account1, jid, 'lowprio', 1) + + # session hasn't changed since we were talking to the bare jid + self.assertEqual(orig_sess, ctrl.session) + + def test_two_resources_higher_comes_online(self): + jid = 'default1@gajim.org' + + self.contact_comes_online(account1, jid, 'lowprio', 1) + + ctrl = self.user_starts_chatting(jid, account1) + + self.contact_comes_online(account1, jid, 'highprio', 50) + + # old session was dropped + self.assertEqual(None, ctrl.session) + + def test_two_resources_higher_goes_offline(self): + jid = 'default1@gajim.org' + + self.contact_comes_online(account1, jid, 'lowprio', 1) + self.contact_comes_online(account1, jid, 'highprio', 50) + + ctrl = self.user_starts_chatting(jid, account1) + + self.contact_goes_offline(account1, jid, 'highprio', 50, + still_exists=False) + + # old session was dropped + self.assertEqual(None, ctrl.session) + + def test_two_resources_higher_comes_online_with_esession(self): + jid = 'default1@gajim.org' + + self.contact_comes_online(account1, jid, 'lowprio', 1) + + ctrl = self.user_starts_esession(jid, 'lowprio', account1) + + self.contact_comes_online(account1, jid, 'highprio', 50) + + # session was associated with the low priority full jid, so it should + # have been removed from the control + self.assertEqual(None, ctrl.session) + + def test_two_resources_higher_goes_offline_with_esession(self): + jid = 'default1@gajim.org' + + self.contact_comes_online(account1, jid, 'lowprio', 1) + self.contact_comes_online(account1, jid, 'highprio', 50) + + ctrl = self.user_starts_esession(jid, 'highprio', account1) + + self.contact_goes_offline(account1, jid, 'highprio', 50, + still_exists=False) + + # session was associated with the high priority full jid, so it should + # have been removed from the control + self.assertEqual(None, ctrl.session) + +if __name__ == '__main__': + unittest.main() + +# vim: se ts=3: diff --git a/test/integration/test_resolver.py b/test/integration/test_resolver.py new file mode 100644 index 000000000..1836d3c7b --- /dev/null +++ b/test/integration/test_resolver.py @@ -0,0 +1,102 @@ +import unittest + +import time + +import lib +lib.setup_env() + +from common import resolver + +from mock import Mock, expectParams +from gajim_mocks import * +from xmpp_mocks import IdleQueueThread + +GMAIL_SRV_NAME = '_xmpp-client._tcp.gmail.com' +NONSENSE_NAME = 'sfsdfsdfsdf.sdfs.fsd' +JABBERCZ_TXT_NAME = '_xmppconnect.jabber.cz' +JABBERCZ_SRV_NAME = '_xmpp-client._tcp.jabber.cz' + +TEST_LIST = [(GMAIL_SRV_NAME, 'srv', True), + (NONSENSE_NAME, 'srv', False), + (JABBERCZ_SRV_NAME, 'srv', True)] + +class TestResolver(unittest.TestCase): + ''' + Test for LibAsyncNSResolver and NSLookupResolver. Requires working + network connection. + ''' + def setUp(self): + self.idlequeue_thread = IdleQueueThread() + self.idlequeue_thread.start() + + self.iq = self.idlequeue_thread.iq + self._reset() + self.resolver = None + + def tearDown(self): + self.idlequeue_thread.stop_thread() + self.idlequeue_thread.join() + + def _reset(self): + self.flag = False + self.expect_results = False + self.nslookup = False + self.resolver = None + + def testLibAsyncNSResolver(self): + self._reset() + if not resolver.USE_LIBASYNCNS: + print 'testLibAsyncResolver: libasyncns-python not installed' + return + self.resolver = resolver.LibAsyncNSResolver() + + for name, type, expect_results in TEST_LIST: + self.expect_results = expect_results + self._runLANSR(name, type) + self.flag = False + + def _runLANSR(self, name, type): + self.resolver.resolve( + host = name, + type = type, + on_ready = self._myonready) + while not self.flag: + time.sleep(1) + self.resolver.process() + + def _myonready(self, name, result_set): + if __name__ == '__main__': + from pprint import pprint + pprint('on_ready called ...') + pprint('hostname: %s' % name) + pprint('result set: %s' % result_set) + pprint('res.resolved_hosts: %s' % self.resolver.resolved_hosts) + pprint('') + if self.expect_results: + self.assert_(len(result_set) > 0) + else: + self.assert_(result_set == []) + self.flag = True + if self.nslookup: + self._testNSLR() + + def testNSLookupResolver(self): + self._reset() + self.nslookup = True + self.resolver = resolver.NSLookupResolver(self.iq) + self.test_list = TEST_LIST + self._testNSLR() + + def _testNSLR(self): + if self.test_list == []: + return + name, type, self.expect_results = self.test_list.pop() + self.resolver.resolve( + host = name, + type = type, + on_ready = self._myonready) + +if __name__ == '__main__': + unittest.main() + +# vim: se ts=3: diff --git a/test/integration/test_roster.py b/test/integration/test_roster.py new file mode 100644 index 000000000..10d1ab488 --- /dev/null +++ b/test/integration/test_roster.py @@ -0,0 +1,209 @@ +import unittest + +import lib +lib.setup_env() + +from data import * + +from mock import Mock, expectParams +from gajim_mocks import * + +from common import gajim +import roster_window + +gajim.get_jid_from_account = lambda acc: 'myjid@' + acc + +class TestRosterWindow(unittest.TestCase): + + def setUp(self): + gajim.interface = MockInterface() + self.roster = roster_window.RosterWindow() + + self.C_NAME = roster_window.C_NAME + self.C_TYPE = roster_window.C_TYPE + self.C_JID = roster_window.C_JID + self.C_ACCOUNT = roster_window.C_ACCOUNT + + # Add after creating RosterWindow + # We want to test the filling explicitly + for acc in contacts: + gajim.connections[acc] = MockConnection(acc) + + def tearDown(self): + self.roster.model.clear() + for acc in gajim.contacts.get_accounts(): + gajim.contacts.clear_contacts(acc) + + ### Custom assertions + def assert_all_contacts_are_in_roster(self, acc): + for jid in contacts[acc]: + self.assert_contact_is_in_roster(jid, acc) + + def assert_contact_is_in_roster(self, jid, account): + contacts = gajim.contacts.get_contacts(account, jid) + # check for all resources + for contact in contacts: + iters = self.roster._get_contact_iter(jid, account, + model=self.roster.model) + + if jid != gajim.get_jid_from_account(account): + # We don't care for groups of SelfContact + self.assertTrue(len(iters) == len(contact.get_shown_groups()), + msg='Contact is not in all his groups') + + # Are we big brother? + bb_jid = None + bb_account = None + family = gajim.contacts.get_metacontacts_family(account, jid) + if family: + nearby_family, bb_jid, bb_account = \ + self.roster._get_nearby_family_and_big_brother(family, account) + + is_in_nearby_family = (jid, account) in ( + (data['jid'], data['account']) for data in nearby_family) + self.assertTrue(is_in_nearby_family, + msg='Contact not in his own nearby family') + + is_big_brother = (bb_jid, bb_account) == (jid, account) + + # check for each group tag + for titerC in iters: + self.assertTrue(self.roster.model.iter_is_valid(titerC), + msg='Contact iter invalid') + + c_model = self.roster.model[titerC] + self.assertEquals(contact.get_shown_name(), c_model[self.C_NAME], + msg='Contact name missmatch') + self.assertEquals(contact.jid, c_model[self.C_JID], + msg='Jid missmatch') + + if not self.roster.regroup: + self.assertEquals(account, c_model[self.C_ACCOUNT], + msg='Account missmatch') + + # Check for correct nesting + parent_iter = self.roster.model.iter_parent(titerC) + p_model = self.roster.model[parent_iter] + if family: + if is_big_brother: + self.assertTrue(p_model[self.C_TYPE] == 'group', + msg='Big Brother is not on top') + else: + self.assertTrue(p_model[self.C_TYPE] == 'contact', + msg='Little Brother brother has no BigB') + else: + if jid == gajim.get_jid_from_account(account): + self.assertTrue(p_model[self.C_TYPE] == 'account', + msg='SelfContact is not on top') + else: + self.assertTrue(p_model[self.C_TYPE] == 'group', + msg='Contact not found in a group') + + def assert_group_is_in_roster(self, group, account): + #TODO + pass + + def assert_account_is_in_roster(self, acc): + titerA = self.roster._get_account_iter(acc, model=self.roster.model) + self.assertTrue(self.roster.model.iter_is_valid(titerA), + msg='Account iter is invalid') + + acc_model = self.roster.model[titerA] + self.assertEquals(acc_model[self.C_TYPE], 'account', + msg='No account found') + + if not self.roster.regroup: + self.assertEquals(acc_model[self.C_ACCOUNT], acc, + msg='Account not found') + + self_jid = gajim.get_jid_from_account(acc) + self.assertEquals(acc_model[self.C_JID], self_jid, + msg='Account JID not found in account row') + + def assert_model_is_in_sync(self): + #TODO: check that iter_n_children returns the correct numbers + pass + + # tests + def test_fill_contacts_and_groups_dicts(self): + for acc in contacts: + self.roster.fill_contacts_and_groups_dicts(contacts[acc], acc) + + for jid in contacts[acc]: + instances = gajim.contacts.get_contacts(acc, jid) + + # Created a contact for each single jid? + self.assertTrue(len(instances) == 1) + + # Contacts kept their info + contact = instances[0] + self.assertEquals(contact.groups, contacts[acc][jid]['groups'], + msg='Group Missmatch') + + groups = contacts[acc][jid]['groups'] or ['General',] + + # cleanup + self.roster.model.clear() + for acc in contacts: + gajim.contacts.clear_contacts(acc) + + def test_fill_roster_model(self): + for acc in contacts: + self.roster.fill_contacts_and_groups_dicts(contacts[acc], acc) + + self.roster.add_account(acc) + self.assert_account_is_in_roster(acc) + + self.roster.add_account_contacts(acc) + self.assert_all_contacts_are_in_roster(acc) + + self.assert_model_is_in_sync() + + +class TestRosterWindowRegrouped(TestRosterWindow): + + def setUp(self): + gajim.config.set('mergeaccounts', True) + TestRosterWindow.setUp(self) + + def test_toggle_regroup(self): + self.roster.regroup = not self.roster.regroup + self.roster.setup_and_draw_roster() + self.roster.regroup = not self.roster.regroup + self.roster.setup_and_draw_roster() + + +class TestRosterWindowMetaContacts(TestRosterWindowRegrouped): + + def test_receive_metacontact_data(self): + for complete_data in metacontact_data: + t_acc = complete_data[0]['account'] + t_jid = complete_data[0]['jid'] + data = complete_data[1:] + for brother in data: + acc = brother['account'] + jid = brother['jid'] + gajim.contacts.add_metacontact(t_acc, t_jid, acc, jid) + self.roster.setup_and_draw_roster() + + def test_connect_new_metacontact(self): + self.test_fill_roster_model() + + jid = u'coolstuff@gajim.org' + contact = gajim.contacts.create_contact(jid) + gajim.contacts.add_contact(account1, contact) + self.roster.add_contact(jid, account1) + self.roster.chg_contact_status(contact, 'offline', '', account1) + + gajim.contacts.add_metacontact(account1, u'samejid@gajim.org', + account1, jid) + self.roster.chg_contact_status(contact, 'online', '', account1) + + self.assert_model_is_in_sync() + + + +if __name__ == '__main__': + unittest.main() + +# vim: se ts=3: diff --git a/test/integration/test_xmpp_client_nb.py b/test/integration/test_xmpp_client_nb.py new file mode 100644 index 000000000..24a54a3ca --- /dev/null +++ b/test/integration/test_xmpp_client_nb.py @@ -0,0 +1,171 @@ +''' +Testing script for NonBlockingClient class (src/common/xmpp/client_nb.py) + +It actually connects to a xmpp server. +''' + +import unittest + +import lib +lib.setup_env() + +from xmpp_mocks import MockConnection, IdleQueueThread +from mock import Mock +from common.xmpp import client_nb + +#import logging +#log = logging.getLogger('gajim') +#log.setLevel(logging.DEBUG) + +# (XMPP server hostname, c2s port). Script will connect to the machine. +xmpp_server_port = ('gajim.org', 5222) + +# [username, password, passphrase]. Script will authenticate to server above +credentials = ['unittest', 'testtest', 'res'] + +class TestNonBlockingClient(unittest.TestCase): + ''' + Test Cases class for NonBlockingClient. + ''' + def setUp(self): + ''' IdleQueue thread is run and dummy connection is created. ''' + self.idlequeue_thread = IdleQueueThread() + self.connection = MockConnection() # for dummy callbacks + self.idlequeue_thread.start() + + def tearDown(self): + ''' IdleQueue thread is stopped. ''' + self.idlequeue_thread.stop_thread() + self.idlequeue_thread.join() + + self.client = None + + def open_stream(self, server_port, wrong_pass=False): + ''' + Method opening the XMPP connection. It returns when + is received from server. + + :param server_port: tuple of (hostname, port) for where the client should + connect. + ''' + + class TempConnection(): + def get_password(self, cb): + if wrong_pass: + cb('wrong pass') + else: + cb(credentials[1]) + def on_connect_failure(self): + pass + + self.client = client_nb.NonBlockingClient( + domain=server_port[0], + idlequeue=self.idlequeue_thread.iq, + caller=Mock(realClass=TempConnection)) + + self.client.connect( + hostname=server_port[0], + port=server_port[1], + on_connect=lambda *args: self.connection.on_connect(True, *args), + on_connect_failure=lambda *args: self.connection.on_connect( + False, *args)) + + self.assert_(self.connection.wait(), + msg='waiting for callback from client constructor') + + # if on_connect was called, client has to be connected and vice versa + if self.connection.connect_succeeded: + self.assert_(self.client.get_connect_type()) + else: + self.assert_(not self.client.get_connect_type()) + + def client_auth(self, username, password, resource, sasl): + ''' + Method authenticating connected client with supplied credentials. Returns + when authentication is over. + + :param sasl: whether to use sasl (sasl=1) or old (sasl=0) authentication + :todo: to check and be more specific about when it returns + (bind, session..) + ''' + self.client.auth(username, password, resource, sasl, + on_auth=self.connection.on_auth) + + self.assert_(self.connection.wait(), msg='waiting for authentication') + + def do_disconnect(self): + ''' + Does disconnecting of connected client. Returns when TCP connection is + closed. + ''' + self.client.RegisterDisconnectHandler(self.connection.set_event) + self.client.disconnect() + + self.assertTrue(self.connection.wait(), msg='waiting for disconnecting') + + def test_proper_connect_sasl(self): + ''' + The ideal testcase - client is connected, authenticated with SASL and + then disconnected. + ''' + self.open_stream(xmpp_server_port) + + # if client is not connected, lets raise the AssertionError + self.assert_(self.client.get_connect_type()) + # client.disconnect() is already called from NBClient via + # _on_connected_failure, no need to call it here + + self.client_auth(credentials[0], credentials[1], credentials[2], sasl=1) + self.assert_(self.connection.con) + self.assert_(self.connection.auth=='sasl', msg='Unable to auth via SASL') + + self.do_disconnect() + + def test_proper_connect_oldauth(self): + ''' + The ideal testcase - client is connected, authenticated with old auth and + then disconnected. + ''' + self.open_stream(xmpp_server_port) + self.assert_(self.client.get_connect_type()) + self.client_auth(credentials[0], credentials[1], credentials[2], sasl=0) + self.assert_(self.connection.con) + features = self.client.Dispatcher.Stream.features + if not features.getTag('auth'): + print "Server doesn't support old authentication type, ignoring test" + else: + self.assert_(self.connection.auth=='old_auth', + msg='Unable to auth via old_auth') + self.do_disconnect() + + def test_connect_to_nonexisting_host(self): + ''' + Connect to nonexisting host. DNS request for A records should return + nothing. + ''' + self.open_stream(('fdsfsdf.fdsf.fss', 5222)) + self.assert_(not self.client.get_connect_type()) + + def test_connect_to_wrong_port(self): + ''' + Connect to nonexisting server. DNS request for A records should return an + IP but there shouldn't be XMPP server running on specified port. + ''' + self.open_stream((xmpp_server_port[0], 31337)) + self.assert_(not self.client.get_connect_type()) + + def test_connect_with_wrong_creds(self): + ''' + Connecting with invalid password. + ''' + self.open_stream(xmpp_server_port, wrong_pass=True) + self.assert_(self.client.get_connect_type()) + self.client_auth(credentials[0], 'wrong pass', credentials[2], sasl=1) + self.assert_(self.connection.auth is None) + self.do_disconnect() + + +if __name__ == '__main__': + unittest.main() + +# vim: se ts=3: diff --git a/test/integration/test_xmpp_transports_nb.py b/test/integration/test_xmpp_transports_nb.py new file mode 100644 index 000000000..f6cbab510 --- /dev/null +++ b/test/integration/test_xmpp_transports_nb.py @@ -0,0 +1,277 @@ +''' +Integration test for tranports classes. See unit for the ordinary +unit tests of this module. +''' + +import unittest +import socket + +import lib +lib.setup_env() + +from xmpp_mocks import IdleQueueThread, IdleMock +from common.xmpp import transports_nb + + +class AbstractTransportTest(unittest.TestCase): + ''' Encapsulates Idlequeue instantiation for transports and more...''' + + def setUp(self): + ''' IdleQueue thread is run and dummy connection is created. ''' + self.idlequeue_thread = IdleQueueThread() + self.idlequeue_thread.start() + self._setup_hook() + + def tearDown(self): + ''' IdleQueue thread is stopped. ''' + self._teardown_hook() + self.idlequeue_thread.stop_thread() + self.idlequeue_thread.join() + + def _setup_hook(self): + pass + + def _teardown_hook(self): + pass + + def expect_receive(self, expected, count=1, msg=None): + ''' + Returns a callback function that will assert whether the data passed to + it equals the one specified when calling this function. + + Can be used to make sure transport dispatch correct data. + ''' + def receive(data, *args, **kwargs): + self.assertEqual(data, expected, msg=msg) + self._expected_count -= 1 + self._expected_count = count + return receive + + def have_received_expected(self): + ''' + Plays together with expect_receive(). Will return true if expected_rcv + callback was called as often as specified + ''' + return self._expected_count == 0 + + +class TestNonBlockingTCP(AbstractTransportTest): + ''' + Test class for NonBlockingTCP. Will actually try to connect to an existing + XMPP server. + ''' + class MockClient(IdleMock): + ''' Simple client to test transport functionality ''' + def __init__(self, idlequeue, testcase): + self.idlequeue = idlequeue + self.testcase = testcase + IdleMock.__init__(self) + + def do_connect(self, establish_tls=False, proxy_dict=None): + try: + ips = socket.getaddrinfo('gajim.org', 5222, + socket.AF_UNSPEC,socket.SOCK_STREAM) + ip = ips[0] + except socket.error, e: + self.testcase.fail(msg=str(e)) + + self.socket = transports_nb.NonBlockingTCP( + raise_event=lambda event_type, data: self.testcase.assertTrue( + event_type and data), + on_disconnect=lambda: self.on_success(mode='SocketDisconnect'), + idlequeue=self.idlequeue, + estabilish_tls=establish_tls, + certs=('../data/other/cacerts.pem', 'tmp/cacerts.pem'), + proxy_dict=proxy_dict) + + self.socket.PlugIn(self) + + self.socket.connect(conn_5tuple=ip, + on_connect=lambda: self.on_success(mode='TCPconnect'), + on_connect_failure=self.on_failure) + self.testcase.assertTrue(self.wait(), msg='Connection timed out') + + def do_disconnect(self): + self.socket.disconnect() + self.testcase.assertTrue(self.wait(), msg='Disconnect timed out') + + def on_failure(self, err_message): + self.set_event() + self.testcase.fail(msg=err_message) + + def on_success(self, mode, data=None): + if mode == "TCPconnect": + pass + if mode == "SocketDisconnect": + pass + self.set_event() + + def _setup_hook(self): + self.client = self.MockClient(idlequeue=self.idlequeue_thread.iq, + testcase=self) + + def _teardown_hook(self): + if self.client.socket.state == 'CONNECTED': + self.client.do_disconnect() + + def test_connect_disconnect_plain(self): + ''' Establish plain connection ''' + self.client.do_connect(establish_tls=False) + self.assertEquals(self.client.socket.state, 'CONNECTED') + self.client.do_disconnect() + self.assertEquals(self.client.socket.state, 'DISCONNECTED') + +# def test_connect_disconnect_ssl(self): +# ''' Establish SSL (not TLS) connection ''' +# self.client.do_connect(establish_tls=True) +# self.assertEquals(self.client.socket.state, 'CONNECTED') +# self.client.do_disconnect() +# self.assertEquals(self.client.socket.state, 'DISCONNECTED') + + def test_do_receive(self): + ''' Test _do_receive method by overwriting socket.recv ''' + self.client.do_connect() + sock = self.client.socket + + # transport shall receive data + data = "Please don't fail" + sock._recv = lambda buffer: data + sock.onreceive(self.expect_receive(data)) + sock._do_receive() + self.assertTrue(self.have_received_expected(), msg='Did not receive data') + self.assert_(self.client.socket.state == 'CONNECTED') + + # transport shall do nothing as an non-fatal SSL is simulated + sock._recv = lambda buffer: None + sock.onreceive(self.assertFalse) # we did not receive anything... + sock._do_receive() + self.assert_(self.client.socket.state == 'CONNECTED') + + # transport shall disconnect as remote side closed the connection + sock._recv = lambda buffer: '' + sock.onreceive(self.assertFalse) # we did not receive anything... + sock._do_receive() + self.assert_(self.client.socket.state == 'DISCONNECTED') + + def test_do_send(self): + ''' Test _do_send method by overwriting socket.send ''' + self.client.do_connect() + sock = self.client.socket + + outgoing = [] # what we have actually send to our socket.socket + data_part1 = "Please don't " + data_part2 = "fail!" + data_complete = data_part1 + data_part2 + + # Simulate everything could be send in one go + def _send_all(data): + outgoing.append(data) + return len(data) + sock._send = _send_all + sock.send(data_part1) + sock.send(data_part2) + sock._do_send() + sock._do_send() + self.assertTrue(self.client.socket.state == 'CONNECTED') + self.assertTrue(data_part1 in outgoing and data_part2 in outgoing) + self.assertFalse(sock.sendqueue and sock.sendbuff, + msg='There is still unsend data in buffers') + + # Simulate data could only be sent in chunks + self.chunk_count = 0 + outgoing = [] + def _send_chunks(data): + if self.chunk_count == 0: + outgoing.append(data_part1) + self.chunk_count += 1 + return len(data_part1) + else: + outgoing.append(data_part2) + return len(data_part2) + sock._send = _send_chunks + sock.send(data_complete) + sock._do_send() # process first chunk + sock._do_send() # process the second one + self.assertTrue(self.client.socket.state == 'CONNECTED') + self.assertTrue(data_part1 in outgoing and data_part2 in outgoing) + self.assertFalse(sock.sendqueue and sock.sendbuff, + msg='There is still unsend data in buffers') + + +class TestNonBlockingHTTP(AbstractTransportTest): + ''' Test class for NonBlockingHTTP transport''' + + bosh_http_dict = { + 'http_uri': 'http://gajim.org:5280/http-bind', + 'http_version': 'HTTP/1.1', + 'http_persistent': True, + 'add_proxy_headers': False + } + + def _get_transport(self, http_dict, proxy_dict=None): + return transports_nb.NonBlockingHTTP( + raise_event=None, + on_disconnect=None, + idlequeue=self.idlequeue_thread.iq, + estabilish_tls=False, + certs=None, + on_http_request_possible=lambda: None, + on_persistent_fallback=None, + http_dict=http_dict, + proxy_dict=proxy_dict, + ) + + def test_parse_own_http_message(self): + ''' Build a HTTP message and try to parse it afterwards ''' + transport = self._get_transport(self.bosh_http_dict) + + data = "Please don't fail!" + http_message = transport.build_http_message(data) + statusline, headers, http_body = transport.parse_http_message( + http_message) + + self.assertTrue(statusline and isinstance(statusline, list)) + self.assertTrue(headers and isinstance(headers, dict)) + self.assertEqual(data, http_body, msg='Input and output are different') + + def test_receive_http_message(self): + ''' Let _on_receive handle some http messages ''' + transport = self._get_transport(self.bosh_http_dict) + + header = ("HTTP/1.1 200 OK\r\nContent-Type: text/xml; charset=utf-8\r\n" + + "Content-Length: 88\r\n\r\n") + payload = "Please don't fail!" + body = "%s" \ + % payload + message = "%s%s" % (header, body) + + # try to receive in one go + transport.onreceive(self.expect_receive(body, msg='Failed: In one go')) + transport._on_receive(message) + self.assertTrue(self.have_received_expected(), msg='Failed: In one go') + +# FIXME: Not yet implemented. +# def test_receive_http_message_in_chunks(self): +# ''' Let _on_receive handle some chunked http messages ''' +# transport = self._get_transport(self.bosh_http_dict) +# +# header = ("HTTP/1.1 200 OK\r\nContent-Type: text/xml; charset=utf-8\r\n" + +# "Content-Length: 88\r\n\r\n") +# payload = "Please don't fail!" +# body = "%s" \ +# % payload +# message = "%s%s" % (header, body) +# +# chunk1, chunk2, chunk3 = message[:20], message[20:73], message[73:] +# nextmessage_chunk = "\r\n\r\nHTTP/1.1 200 OK\r\nContent-Type: text/x" +# chunks = (chunk1, chunk2, chunk3, nextmessage_chunk) +# +# transport.onreceive(self.expect_receive(body, msg='Failed: In chunks')) +# for chunk in chunks: +# transport._on_receive(chunk) +# self.assertTrue(self.have_received_expected(), msg='Failed: In chunks') + +if __name__ == '__main__': + unittest.main() + +# vim: se ts=3: -- cgit v1.2.3