diff options
author | Éric Araujo <merwok@netwok.org> | 2010-02-08 17:08:40 +0300 |
---|---|---|
committer | Éric Araujo <merwok@netwok.org> | 2010-02-08 17:08:40 +0300 |
commit | fedd7dc8e22950f0dbe297443860662368c249d1 (patch) | |
tree | a6426601d50b3020348fd6574107c9fd6f9dfb0e /test/integration | |
parent | 1a69ea93f1316e9f98d50ac8a212498b42d3cf60 (diff) |
convert tabs to spaces in source code thanks to reindent.py
holy diff batman!
Diffstat (limited to 'test/integration')
-rw-r--r-- | test/integration/__init__.py | 2 | ||||
-rw-r--r-- | test/integration/test_gui_event_integration.py | 234 | ||||
-rw-r--r-- | test/integration/test_resolver.py | 158 | ||||
-rw-r--r-- | test/integration/test_roster.py | 336 | ||||
-rw-r--r-- | test/integration/test_xmpp_client_nb.py | 282 | ||||
-rw-r--r-- | test/integration/test_xmpp_transports_nb.py | 506 |
6 files changed, 754 insertions, 764 deletions
diff --git a/test/integration/__init__.py b/test/integration/__init__.py index 0adf844f1..aaeacfbef 100644 --- a/test/integration/__init__.py +++ b/test/integration/__init__.py @@ -3,4 +3,4 @@ This package contains integration tests. Integration tests are tests which require or include UI, network or both. -'''
\ No newline at end of file +''' diff --git a/test/integration/test_gui_event_integration.py b/test/integration/test_gui_event_integration.py index a1eadea60..c5999389f 100644 --- a/test/integration/test_gui_event_integration.py +++ b/test/integration/test_gui_event_integration.py @@ -23,171 +23,169 @@ import roster_window import notify class TestStatusChange(unittest.TestCase): - '''tests gajim.py's incredibly complex handle_event_notify''' + '''tests gajim.py's incredibly complex handle_event_notify''' - def setUp(self): - - gajim.connections = {} - gajim.contacts = contacts_module.LegacyContactsAPI() - gajim.interface.roster = roster_window.RosterWindow() + def setUp(self): - for acc in contacts: - gajim.connections[acc] = MockConnection(acc) + gajim.connections = {} + gajim.contacts = contacts_module.LegacyContactsAPI() + gajim.interface.roster = roster_window.RosterWindow() - gajim.interface.roster.fill_contacts_and_groups_dicts(contacts[acc], - acc) - gajim.interface.roster.add_account(acc) - gajim.interface.roster.add_account_contacts(acc) + for acc in contacts: + gajim.connections[acc] = MockConnection(acc) - self.assertEqual(0, len(notify.notifications)) + gajim.interface.roster.fill_contacts_and_groups_dicts(contacts[acc], + acc) + gajim.interface.roster.add_account(acc) + gajim.interface.roster.add_account_contacts(acc) - def tearDown(self): - notify.notifications = [] + self.assertEqual(0, len(notify.notifications)) - def contact_comes_online(self, account, jid, resource, prio): - '''a remote contact comes online''' - gajim.interface.handle_event_notify(account, (jid, 'online', "I'm back!", - resource, prio, None, time.time(), None)) + def tearDown(self): + notify.notifications = [] - contact = None - for c in gajim.contacts.get_contacts(account, jid): - if c.resource == resource: - contact = c - break + def contact_comes_online(self, account, jid, resource, prio): + '''a remote contact comes online''' + gajim.interface.handle_event_notify(account, (jid, 'online', "I'm back!", + resource, prio, None, time.time(), None)) - self.assertEqual('online', contact.show) - self.assertEqual("I'm back!", contact.status) - self.assertEqual(prio, contact.priority) + contact = None + for c in gajim.contacts.get_contacts(account, jid): + if c.resource == resource: + contact = c + break - # the most recent notification is that the contact connected - self.assertEqual('contact_connected', notify.notifications[-1][0]) + self.assertEqual('online', contact.show) + self.assertEqual("I'm back!", contact.status) + self.assertEqual(prio, contact.priority) - def contact_goes_offline(self, account, jid, resource, prio, - still_exists = True): - '''a remote contact goes offline.''' - gajim.interface.handle_event_notify(account, (jid, 'offline', 'Goodbye!', - resource, prio, None, time.time(), None)) + # the most recent notification is that the contact connected + self.assertEqual('contact_connected', notify.notifications[-1][0]) - contact = None - for c in gajim.contacts.get_contacts(account, jid): - if c.resource == resource: - contact = c - break + def contact_goes_offline(self, account, jid, resource, prio, + still_exists = True): + '''a remote contact goes offline.''' + gajim.interface.handle_event_notify(account, (jid, 'offline', 'Goodbye!', + resource, prio, None, time.time(), None)) - if not still_exists: - self.assert_(contact is None) - return + contact = None + for c in gajim.contacts.get_contacts(account, jid): + if c.resource == resource: + contact = c + break - self.assertEqual('offline', contact.show) - self.assertEqual('Goodbye!', contact.status) - self.assertEqual(prio, contact.priority) + if not still_exists: + self.assert_(contact is None) + return - self.assertEqual('contact_disconnected', notify.notifications[-1][0]) + self.assertEqual('offline', contact.show) + self.assertEqual('Goodbye!', contact.status) + self.assertEqual(prio, contact.priority) - def user_starts_chatting(self, jid, account, resource=None): - '''the user opens a chat window and starts talking''' - ctrl = MockChatControl(jid, account) - win = MockWindow() - win.new_tab(ctrl) - gajim.interface.msg_win_mgr._windows['test'] = win + self.assertEqual('contact_disconnected', notify.notifications[-1][0]) - if resource: - jid = jid + '/' + resource + def user_starts_chatting(self, jid, account, resource=None): + '''the user opens a chat window and starts talking''' + ctrl = MockChatControl(jid, account) + win = MockWindow() + win.new_tab(ctrl) + gajim.interface.msg_win_mgr._windows['test'] = win - # a basic session is started - session = gajim.connections[account1].make_new_session(jid, - '01234567890abcdef', cls=MockSession) - ctrl.set_session(session) + if resource: + jid = jid + '/' + resource - return ctrl + # a basic session is started + session = gajim.connections[account1].make_new_session(jid, + '01234567890abcdef', cls=MockSession) + ctrl.set_session(session) - def user_starts_esession(self, jid, resource, account): - '''the user opens a chat window and starts an encrypted session''' - ctrl = self.user_starts_chatting(jid, account, resource) - ctrl.session.status = 'active' - ctrl.session.enable_encryption = True + return ctrl - return ctrl + def user_starts_esession(self, jid, resource, account): + '''the user opens a chat window and starts an encrypted session''' + ctrl = self.user_starts_chatting(jid, account, resource) + ctrl.session.status = 'active' + ctrl.session.enable_encryption = True - def test_contact_comes_online(self): - jid = 'default1@gajim.org' + return ctrl - # contact is offline initially - contacts = gajim.contacts.get_contacts(account1, jid) - self.assertEqual(1, len(contacts)) - self.assertEqual('offline', contacts[0].show) - self.assertEqual('', contacts[0].status) + def test_contact_comes_online(self): + jid = 'default1@gajim.org' - self.contact_comes_online(account1, jid, 'lowprio', 1) + # contact is offline initially + contacts = gajim.contacts.get_contacts(account1, jid) + self.assertEqual(1, len(contacts)) + self.assertEqual('offline', contacts[0].show) + self.assertEqual('', contacts[0].status) - def test_contact_goes_offline(self): - jid = 'default1@gajim.org' + self.contact_comes_online(account1, jid, 'lowprio', 1) - self.contact_comes_online(account1, jid, 'lowprio', 1) + def test_contact_goes_offline(self): + jid = 'default1@gajim.org' - ctrl = self.user_starts_chatting(jid, account1) - orig_sess = ctrl.session + self.contact_comes_online(account1, jid, 'lowprio', 1) - self.contact_goes_offline(account1, jid, 'lowprio', 1) + ctrl = self.user_starts_chatting(jid, account1) + orig_sess = ctrl.session - # session hasn't changed since we were talking to the bare jid - self.assertEqual(orig_sess, ctrl.session) + self.contact_goes_offline(account1, jid, 'lowprio', 1) - def test_two_resources_higher_comes_online(self): - jid = 'default1@gajim.org' + # session hasn't changed since we were talking to the bare jid + self.assertEqual(orig_sess, ctrl.session) - self.contact_comes_online(account1, jid, 'lowprio', 1) + def test_two_resources_higher_comes_online(self): + jid = 'default1@gajim.org' - ctrl = self.user_starts_chatting(jid, account1) + self.contact_comes_online(account1, jid, 'lowprio', 1) - self.contact_comes_online(account1, jid, 'highprio', 50) + ctrl = self.user_starts_chatting(jid, account1) - # old session was dropped - self.assertEqual(None, ctrl.session) + self.contact_comes_online(account1, jid, 'highprio', 50) - def test_two_resources_higher_goes_offline(self): - jid = 'default1@gajim.org' + # old session was dropped + self.assertEqual(None, ctrl.session) - self.contact_comes_online(account1, jid, 'lowprio', 1) - self.contact_comes_online(account1, jid, 'highprio', 50) + def test_two_resources_higher_goes_offline(self): + jid = 'default1@gajim.org' - ctrl = self.user_starts_chatting(jid, account1) + self.contact_comes_online(account1, jid, 'lowprio', 1) + self.contact_comes_online(account1, jid, 'highprio', 50) - self.contact_goes_offline(account1, jid, 'highprio', 50, - still_exists=False) + ctrl = self.user_starts_chatting(jid, account1) - # old session was dropped - self.assertEqual(None, ctrl.session) + self.contact_goes_offline(account1, jid, 'highprio', 50, + still_exists=False) - def test_two_resources_higher_comes_online_with_esession(self): - jid = 'default1@gajim.org' + # old session was dropped + self.assertEqual(None, ctrl.session) - self.contact_comes_online(account1, jid, 'lowprio', 1) + def test_two_resources_higher_comes_online_with_esession(self): + jid = 'default1@gajim.org' - ctrl = self.user_starts_esession(jid, 'lowprio', account1) + self.contact_comes_online(account1, jid, 'lowprio', 1) - self.contact_comes_online(account1, jid, 'highprio', 50) + ctrl = self.user_starts_esession(jid, 'lowprio', account1) - # session was associated with the low priority full jid, so it should - # have been removed from the control - self.assertEqual(None, ctrl.session) + self.contact_comes_online(account1, jid, 'highprio', 50) - def test_two_resources_higher_goes_offline_with_esession(self): - jid = 'default1@gajim.org' + # session was associated with the low priority full jid, so it should + # have been removed from the control + self.assertEqual(None, ctrl.session) - self.contact_comes_online(account1, jid, 'lowprio', 1) - self.contact_comes_online(account1, jid, 'highprio', 50) + def test_two_resources_higher_goes_offline_with_esession(self): + jid = 'default1@gajim.org' - ctrl = self.user_starts_esession(jid, 'highprio', account1) + self.contact_comes_online(account1, jid, 'lowprio', 1) + self.contact_comes_online(account1, jid, 'highprio', 50) - self.contact_goes_offline(account1, jid, 'highprio', 50, - still_exists=False) + ctrl = self.user_starts_esession(jid, 'highprio', account1) - # session was associated with the high priority full jid, so it should - # have been removed from the control - self.assertEqual(None, ctrl.session) + self.contact_goes_offline(account1, jid, 'highprio', 50, + still_exists=False) -if __name__ == '__main__': - unittest.main() + # session was associated with the high priority full jid, so it should + # have been removed from the control + self.assertEqual(None, ctrl.session) -# vim: se ts=3: +if __name__ == '__main__': + unittest.main() diff --git a/test/integration/test_resolver.py b/test/integration/test_resolver.py index 1836d3c7b..d80ffee87 100644 --- a/test/integration/test_resolver.py +++ b/test/integration/test_resolver.py @@ -16,87 +16,85 @@ NONSENSE_NAME = 'sfsdfsdfsdf.sdfs.fsd' JABBERCZ_TXT_NAME = '_xmppconnect.jabber.cz' JABBERCZ_SRV_NAME = '_xmpp-client._tcp.jabber.cz' -TEST_LIST = [(GMAIL_SRV_NAME, 'srv', True), - (NONSENSE_NAME, 'srv', False), - (JABBERCZ_SRV_NAME, 'srv', True)] +TEST_LIST = [(GMAIL_SRV_NAME, 'srv', True), + (NONSENSE_NAME, 'srv', False), + (JABBERCZ_SRV_NAME, 'srv', True)] class TestResolver(unittest.TestCase): - ''' - Test for LibAsyncNSResolver and NSLookupResolver. Requires working - network connection. - ''' - def setUp(self): - self.idlequeue_thread = IdleQueueThread() - self.idlequeue_thread.start() - - self.iq = self.idlequeue_thread.iq - self._reset() - self.resolver = None - - def tearDown(self): - self.idlequeue_thread.stop_thread() - self.idlequeue_thread.join() - - def _reset(self): - self.flag = False - self.expect_results = False - self.nslookup = False - self.resolver = None - - def testLibAsyncNSResolver(self): - self._reset() - if not resolver.USE_LIBASYNCNS: - print 'testLibAsyncResolver: libasyncns-python not installed' - return - self.resolver = resolver.LibAsyncNSResolver() - - for name, type, expect_results in TEST_LIST: - self.expect_results = expect_results - self._runLANSR(name, type) - self.flag = False - - def _runLANSR(self, name, type): - self.resolver.resolve( - host = name, - type = type, - on_ready = self._myonready) - while not self.flag: - time.sleep(1) - self.resolver.process() - - def _myonready(self, name, result_set): - if __name__ == '__main__': - from pprint import pprint - pprint('on_ready called ...') - pprint('hostname: %s' % name) - pprint('result set: %s' % result_set) - pprint('res.resolved_hosts: %s' % self.resolver.resolved_hosts) - pprint('') - if self.expect_results: - self.assert_(len(result_set) > 0) - else: - self.assert_(result_set == []) - self.flag = True - if self.nslookup: - self._testNSLR() - - def testNSLookupResolver(self): - self._reset() - self.nslookup = True - self.resolver = resolver.NSLookupResolver(self.iq) - self.test_list = TEST_LIST - self._testNSLR() - - def _testNSLR(self): - if self.test_list == []: - return - name, type, self.expect_results = self.test_list.pop() - self.resolver.resolve( - host = name, - type = type, - on_ready = self._myonready) + ''' + Test for LibAsyncNSResolver and NSLookupResolver. Requires working + network connection. + ''' + def setUp(self): + self.idlequeue_thread = IdleQueueThread() + self.idlequeue_thread.start() + + self.iq = self.idlequeue_thread.iq + self._reset() + self.resolver = None + + def tearDown(self): + self.idlequeue_thread.stop_thread() + self.idlequeue_thread.join() + + def _reset(self): + self.flag = False + self.expect_results = False + self.nslookup = False + self.resolver = None + + def testLibAsyncNSResolver(self): + self._reset() + if not resolver.USE_LIBASYNCNS: + print 'testLibAsyncResolver: libasyncns-python not installed' + return + self.resolver = resolver.LibAsyncNSResolver() + + for name, type, expect_results in TEST_LIST: + self.expect_results = expect_results + self._runLANSR(name, type) + self.flag = False + + def _runLANSR(self, name, type): + self.resolver.resolve( + host = name, + type = type, + on_ready = self._myonready) + while not self.flag: + time.sleep(1) + self.resolver.process() + + def _myonready(self, name, result_set): + if __name__ == '__main__': + from pprint import pprint + pprint('on_ready called ...') + pprint('hostname: %s' % name) + pprint('result set: %s' % result_set) + pprint('res.resolved_hosts: %s' % self.resolver.resolved_hosts) + pprint('') + if self.expect_results: + self.assert_(len(result_set) > 0) + else: + self.assert_(result_set == []) + self.flag = True + if self.nslookup: + self._testNSLR() + + def testNSLookupResolver(self): + self._reset() + self.nslookup = True + self.resolver = resolver.NSLookupResolver(self.iq) + self.test_list = TEST_LIST + self._testNSLR() + + def _testNSLR(self): + if self.test_list == []: + return + name, type, self.expect_results = self.test_list.pop() + self.resolver.resolve( + host = name, + type = type, + on_ready = self._myonready) if __name__ == '__main__': - unittest.main() - -# vim: se ts=3: + unittest.main() diff --git a/test/integration/test_roster.py b/test/integration/test_roster.py index 0be85aa11..8b71df122 100644 --- a/test/integration/test_roster.py +++ b/test/integration/test_roster.py @@ -17,189 +17,187 @@ gajim.get_jid_from_account = lambda acc: 'myjid@' + acc class TestRosterWindow(unittest.TestCase): - def setUp(self): - gajim.interface = MockInterface() - - self.C_NAME = roster_window.C_NAME - self.C_TYPE = roster_window.C_TYPE - self.C_JID = roster_window.C_JID - self.C_ACCOUNT = roster_window.C_ACCOUNT - - # Add after creating RosterWindow - # We want to test the filling explicitly - gajim.contacts = contacts_module.LegacyContactsAPI() - gajim.connections = {} - self.roster = roster_window.RosterWindow() - - for acc in contacts: - gajim.connections[acc] = MockConnection(acc) - gajim.contacts.add_account(acc) - - ### Custom assertions - def assert_all_contacts_are_in_roster(self, acc): - for jid in contacts[acc]: - self.assert_contact_is_in_roster(jid, acc) - - def assert_contact_is_in_roster(self, jid, account): - contacts = gajim.contacts.get_contacts(account, jid) - # check for all resources - for contact in contacts: - iters = self.roster._get_contact_iter(jid, account, - model=self.roster.model) - - if jid != gajim.get_jid_from_account(account): - # We don't care for groups of SelfContact - self.assertTrue(len(iters) == len(contact.get_shown_groups()), - msg='Contact is not in all his groups') - - # Are we big brother? - bb_jid = None - bb_account = None - family = gajim.contacts.get_metacontacts_family(account, jid) - if family: - nearby_family, bb_jid, bb_account = \ - self.roster._get_nearby_family_and_big_brother(family, account) - - is_in_nearby_family = (jid, account) in ( - (data['jid'], data['account']) for data in nearby_family) - self.assertTrue(is_in_nearby_family, - msg='Contact not in his own nearby family') - - is_big_brother = (bb_jid, bb_account) == (jid, account) - - # check for each group tag - for titerC in iters: - self.assertTrue(self.roster.model.iter_is_valid(titerC), - msg='Contact iter invalid') - - c_model = self.roster.model[titerC] - self.assertEquals(contact.get_shown_name(), c_model[self.C_NAME], - msg='Contact name missmatch') - self.assertEquals(contact.jid, c_model[self.C_JID], - msg='Jid missmatch') - - if not self.roster.regroup: - self.assertEquals(account, c_model[self.C_ACCOUNT], - msg='Account missmatch') - - # Check for correct nesting - parent_iter = self.roster.model.iter_parent(titerC) - p_model = self.roster.model[parent_iter] - if family: - if is_big_brother: - self.assertTrue(p_model[self.C_TYPE] == 'group', - msg='Big Brother is not on top') - else: - self.assertTrue(p_model[self.C_TYPE] == 'contact', - msg='Little Brother brother has no BigB') - else: - if jid == gajim.get_jid_from_account(account): - self.assertTrue(p_model[self.C_TYPE] == 'account', - msg='SelfContact is not on top') - else: - self.assertTrue(p_model[self.C_TYPE] == 'group', - msg='Contact not found in a group') - - def assert_group_is_in_roster(self, group, account): - #TODO - pass - - def assert_account_is_in_roster(self, acc): - titerA = self.roster._get_account_iter(acc, model=self.roster.model) - self.assertTrue(self.roster.model.iter_is_valid(titerA), - msg='Account iter is invalid') - - acc_model = self.roster.model[titerA] - self.assertEquals(acc_model[self.C_TYPE], 'account', - msg='No account found') - - if not self.roster.regroup: - self.assertEquals(acc_model[self.C_ACCOUNT], acc, - msg='Account not found') - - self_jid = gajim.get_jid_from_account(acc) - self.assertEquals(acc_model[self.C_JID], self_jid, - msg='Account JID not found in account row') - - def assert_model_is_in_sync(self): - #TODO: check that iter_n_children returns the correct numbers - pass - - # tests - def test_fill_contacts_and_groups_dicts(self): - for acc in contacts: - self.roster.fill_contacts_and_groups_dicts(contacts[acc], acc) - - for jid in contacts[acc]: - instances = gajim.contacts.get_contacts(acc, jid) - - # Created a contact for each single jid? - self.assertTrue(len(instances) == 1) - - # Contacts kept their info - contact = instances[0] - self.assertEquals(contact.groups, contacts[acc][jid]['groups'], - msg='Group Missmatch') - - groups = contacts[acc][jid]['groups'] or ['General',] - - def test_fill_roster_model(self): - for acc in contacts: - self.roster.fill_contacts_and_groups_dicts(contacts[acc], acc) - - self.roster.add_account(acc) - self.assert_account_is_in_roster(acc) - - self.roster.add_account_contacts(acc) - self.assert_all_contacts_are_in_roster(acc) - - self.assert_model_is_in_sync() + def setUp(self): + gajim.interface = MockInterface() + + self.C_NAME = roster_window.C_NAME + self.C_TYPE = roster_window.C_TYPE + self.C_JID = roster_window.C_JID + self.C_ACCOUNT = roster_window.C_ACCOUNT + + # Add after creating RosterWindow + # We want to test the filling explicitly + gajim.contacts = contacts_module.LegacyContactsAPI() + gajim.connections = {} + self.roster = roster_window.RosterWindow() + + for acc in contacts: + gajim.connections[acc] = MockConnection(acc) + gajim.contacts.add_account(acc) + + ### Custom assertions + def assert_all_contacts_are_in_roster(self, acc): + for jid in contacts[acc]: + self.assert_contact_is_in_roster(jid, acc) + + def assert_contact_is_in_roster(self, jid, account): + contacts = gajim.contacts.get_contacts(account, jid) + # check for all resources + for contact in contacts: + iters = self.roster._get_contact_iter(jid, account, + model=self.roster.model) + + if jid != gajim.get_jid_from_account(account): + # We don't care for groups of SelfContact + self.assertTrue(len(iters) == len(contact.get_shown_groups()), + msg='Contact is not in all his groups') + + # Are we big brother? + bb_jid = None + bb_account = None + family = gajim.contacts.get_metacontacts_family(account, jid) + if family: + nearby_family, bb_jid, bb_account = \ + self.roster._get_nearby_family_and_big_brother(family, account) + + is_in_nearby_family = (jid, account) in ( + (data['jid'], data['account']) for data in nearby_family) + self.assertTrue(is_in_nearby_family, + msg='Contact not in his own nearby family') + + is_big_brother = (bb_jid, bb_account) == (jid, account) + + # check for each group tag + for titerC in iters: + self.assertTrue(self.roster.model.iter_is_valid(titerC), + msg='Contact iter invalid') + + c_model = self.roster.model[titerC] + self.assertEquals(contact.get_shown_name(), c_model[self.C_NAME], + msg='Contact name missmatch') + self.assertEquals(contact.jid, c_model[self.C_JID], + msg='Jid missmatch') + + if not self.roster.regroup: + self.assertEquals(account, c_model[self.C_ACCOUNT], + msg='Account missmatch') + + # Check for correct nesting + parent_iter = self.roster.model.iter_parent(titerC) + p_model = self.roster.model[parent_iter] + if family: + if is_big_brother: + self.assertTrue(p_model[self.C_TYPE] == 'group', + msg='Big Brother is not on top') + else: + self.assertTrue(p_model[self.C_TYPE] == 'contact', + msg='Little Brother brother has no BigB') + else: + if jid == gajim.get_jid_from_account(account): + self.assertTrue(p_model[self.C_TYPE] == 'account', + msg='SelfContact is not on top') + else: + self.assertTrue(p_model[self.C_TYPE] == 'group', + msg='Contact not found in a group') + + def assert_group_is_in_roster(self, group, account): + #TODO + pass + + def assert_account_is_in_roster(self, acc): + titerA = self.roster._get_account_iter(acc, model=self.roster.model) + self.assertTrue(self.roster.model.iter_is_valid(titerA), + msg='Account iter is invalid') + + acc_model = self.roster.model[titerA] + self.assertEquals(acc_model[self.C_TYPE], 'account', + msg='No account found') + + if not self.roster.regroup: + self.assertEquals(acc_model[self.C_ACCOUNT], acc, + msg='Account not found') + + self_jid = gajim.get_jid_from_account(acc) + self.assertEquals(acc_model[self.C_JID], self_jid, + msg='Account JID not found in account row') + + def assert_model_is_in_sync(self): + #TODO: check that iter_n_children returns the correct numbers + pass + + # tests + def test_fill_contacts_and_groups_dicts(self): + for acc in contacts: + self.roster.fill_contacts_and_groups_dicts(contacts[acc], acc) + + for jid in contacts[acc]: + instances = gajim.contacts.get_contacts(acc, jid) + + # Created a contact for each single jid? + self.assertTrue(len(instances) == 1) + + # Contacts kept their info + contact = instances[0] + self.assertEquals(contact.groups, contacts[acc][jid]['groups'], + msg='Group Missmatch') + + groups = contacts[acc][jid]['groups'] or ['General',] + + def test_fill_roster_model(self): + for acc in contacts: + self.roster.fill_contacts_and_groups_dicts(contacts[acc], acc) + + self.roster.add_account(acc) + self.assert_account_is_in_roster(acc) + + self.roster.add_account_contacts(acc) + self.assert_all_contacts_are_in_roster(acc) + + self.assert_model_is_in_sync() class TestRosterWindowRegrouped(TestRosterWindow): - def setUp(self): - gajim.config.set('mergeaccounts', True) - TestRosterWindow.setUp(self) + def setUp(self): + gajim.config.set('mergeaccounts', True) + TestRosterWindow.setUp(self) - def test_toggle_regroup(self): - self.roster.regroup = not self.roster.regroup - self.roster.setup_and_draw_roster() - self.roster.regroup = not self.roster.regroup - self.roster.setup_and_draw_roster() + def test_toggle_regroup(self): + self.roster.regroup = not self.roster.regroup + self.roster.setup_and_draw_roster() + self.roster.regroup = not self.roster.regroup + self.roster.setup_and_draw_roster() class TestRosterWindowMetaContacts(TestRosterWindowRegrouped): - def test_receive_metacontact_data(self): - for complete_data in metacontact_data: - t_acc = complete_data[0]['account'] - t_jid = complete_data[0]['jid'] - data = complete_data[1:] - for brother in data: - acc = brother['account'] - jid = brother['jid'] - gajim.contacts.add_metacontact(t_acc, t_jid, acc, jid) - self.roster.setup_and_draw_roster() + def test_receive_metacontact_data(self): + for complete_data in metacontact_data: + t_acc = complete_data[0]['account'] + t_jid = complete_data[0]['jid'] + data = complete_data[1:] + for brother in data: + acc = brother['account'] + jid = brother['jid'] + gajim.contacts.add_metacontact(t_acc, t_jid, acc, jid) + self.roster.setup_and_draw_roster() - def test_connect_new_metacontact(self): - self.test_fill_roster_model() + def test_connect_new_metacontact(self): + self.test_fill_roster_model() - jid = u'coolstuff@gajim.org' - contact = gajim.contacts.create_contact(jid, account1) - gajim.contacts.add_contact(account1, contact) - self.roster.add_contact(jid, account1) - self.roster.chg_contact_status(contact, 'offline', '', account1) + jid = u'coolstuff@gajim.org' + contact = gajim.contacts.create_contact(jid, account1) + gajim.contacts.add_contact(account1, contact) + self.roster.add_contact(jid, account1) + self.roster.chg_contact_status(contact, 'offline', '', account1) - gajim.contacts.add_metacontact(account1, u'samejid@gajim.org', - account1, jid) - self.roster.chg_contact_status(contact, 'online', '', account1) + gajim.contacts.add_metacontact(account1, u'samejid@gajim.org', + account1, jid) + self.roster.chg_contact_status(contact, 'online', '', account1) - self.assert_model_is_in_sync() + self.assert_model_is_in_sync() if __name__ == '__main__': - unittest.main() - -# vim: se ts=3: + unittest.main() diff --git a/test/integration/test_xmpp_client_nb.py b/test/integration/test_xmpp_client_nb.py index 24a54a3ca..2efdcfe91 100644 --- a/test/integration/test_xmpp_client_nb.py +++ b/test/integration/test_xmpp_client_nb.py @@ -24,148 +24,146 @@ xmpp_server_port = ('gajim.org', 5222) credentials = ['unittest', 'testtest', 'res'] class TestNonBlockingClient(unittest.TestCase): - ''' - Test Cases class for NonBlockingClient. - ''' - def setUp(self): - ''' IdleQueue thread is run and dummy connection is created. ''' - self.idlequeue_thread = IdleQueueThread() - self.connection = MockConnection() # for dummy callbacks - self.idlequeue_thread.start() - - def tearDown(self): - ''' IdleQueue thread is stopped. ''' - self.idlequeue_thread.stop_thread() - self.idlequeue_thread.join() - - self.client = None - - def open_stream(self, server_port, wrong_pass=False): - ''' - Method opening the XMPP connection. It returns when <stream:features> - is received from server. - - :param server_port: tuple of (hostname, port) for where the client should - connect. - ''' - - class TempConnection(): - def get_password(self, cb): - if wrong_pass: - cb('wrong pass') - else: - cb(credentials[1]) - def on_connect_failure(self): - pass - - self.client = client_nb.NonBlockingClient( - domain=server_port[0], - idlequeue=self.idlequeue_thread.iq, - caller=Mock(realClass=TempConnection)) - - self.client.connect( - hostname=server_port[0], - port=server_port[1], - on_connect=lambda *args: self.connection.on_connect(True, *args), - on_connect_failure=lambda *args: self.connection.on_connect( - False, *args)) - - self.assert_(self.connection.wait(), - msg='waiting for callback from client constructor') - - # if on_connect was called, client has to be connected and vice versa - if self.connection.connect_succeeded: - self.assert_(self.client.get_connect_type()) - else: - self.assert_(not self.client.get_connect_type()) - - def client_auth(self, username, password, resource, sasl): - ''' - Method authenticating connected client with supplied credentials. Returns - when authentication is over. - - :param sasl: whether to use sasl (sasl=1) or old (sasl=0) authentication - :todo: to check and be more specific about when it returns - (bind, session..) - ''' - self.client.auth(username, password, resource, sasl, - on_auth=self.connection.on_auth) - - self.assert_(self.connection.wait(), msg='waiting for authentication') - - def do_disconnect(self): - ''' - Does disconnecting of connected client. Returns when TCP connection is - closed. - ''' - self.client.RegisterDisconnectHandler(self.connection.set_event) - self.client.disconnect() - - self.assertTrue(self.connection.wait(), msg='waiting for disconnecting') - - def test_proper_connect_sasl(self): - ''' - The ideal testcase - client is connected, authenticated with SASL and - then disconnected. - ''' - self.open_stream(xmpp_server_port) - - # if client is not connected, lets raise the AssertionError - self.assert_(self.client.get_connect_type()) - # client.disconnect() is already called from NBClient via - # _on_connected_failure, no need to call it here - - self.client_auth(credentials[0], credentials[1], credentials[2], sasl=1) - self.assert_(self.connection.con) - self.assert_(self.connection.auth=='sasl', msg='Unable to auth via SASL') - - self.do_disconnect() - - def test_proper_connect_oldauth(self): - ''' - The ideal testcase - client is connected, authenticated with old auth and - then disconnected. - ''' - self.open_stream(xmpp_server_port) - self.assert_(self.client.get_connect_type()) - self.client_auth(credentials[0], credentials[1], credentials[2], sasl=0) - self.assert_(self.connection.con) - features = self.client.Dispatcher.Stream.features - if not features.getTag('auth'): - print "Server doesn't support old authentication type, ignoring test" - else: - self.assert_(self.connection.auth=='old_auth', - msg='Unable to auth via old_auth') - self.do_disconnect() - - def test_connect_to_nonexisting_host(self): - ''' - Connect to nonexisting host. DNS request for A records should return - nothing. - ''' - self.open_stream(('fdsfsdf.fdsf.fss', 5222)) - self.assert_(not self.client.get_connect_type()) - - def test_connect_to_wrong_port(self): - ''' - Connect to nonexisting server. DNS request for A records should return an - IP but there shouldn't be XMPP server running on specified port. - ''' - self.open_stream((xmpp_server_port[0], 31337)) - self.assert_(not self.client.get_connect_type()) - - def test_connect_with_wrong_creds(self): - ''' - Connecting with invalid password. - ''' - self.open_stream(xmpp_server_port, wrong_pass=True) - self.assert_(self.client.get_connect_type()) - self.client_auth(credentials[0], 'wrong pass', credentials[2], sasl=1) - self.assert_(self.connection.auth is None) - self.do_disconnect() + ''' + Test Cases class for NonBlockingClient. + ''' + def setUp(self): + ''' IdleQueue thread is run and dummy connection is created. ''' + self.idlequeue_thread = IdleQueueThread() + self.connection = MockConnection() # for dummy callbacks + self.idlequeue_thread.start() + + def tearDown(self): + ''' IdleQueue thread is stopped. ''' + self.idlequeue_thread.stop_thread() + self.idlequeue_thread.join() + + self.client = None + + def open_stream(self, server_port, wrong_pass=False): + ''' + Method opening the XMPP connection. It returns when <stream:features> + is received from server. + + :param server_port: tuple of (hostname, port) for where the client should + connect. + ''' + + class TempConnection(): + def get_password(self, cb): + if wrong_pass: + cb('wrong pass') + else: + cb(credentials[1]) + def on_connect_failure(self): + pass + + self.client = client_nb.NonBlockingClient( + domain=server_port[0], + idlequeue=self.idlequeue_thread.iq, + caller=Mock(realClass=TempConnection)) + + self.client.connect( + hostname=server_port[0], + port=server_port[1], + on_connect=lambda *args: self.connection.on_connect(True, *args), + on_connect_failure=lambda *args: self.connection.on_connect( + False, *args)) + + self.assert_(self.connection.wait(), + msg='waiting for callback from client constructor') + + # if on_connect was called, client has to be connected and vice versa + if self.connection.connect_succeeded: + self.assert_(self.client.get_connect_type()) + else: + self.assert_(not self.client.get_connect_type()) + + def client_auth(self, username, password, resource, sasl): + ''' + Method authenticating connected client with supplied credentials. Returns + when authentication is over. + + :param sasl: whether to use sasl (sasl=1) or old (sasl=0) authentication + :todo: to check and be more specific about when it returns + (bind, session..) + ''' + self.client.auth(username, password, resource, sasl, + on_auth=self.connection.on_auth) + + self.assert_(self.connection.wait(), msg='waiting for authentication') + + def do_disconnect(self): + ''' + Does disconnecting of connected client. Returns when TCP connection is + closed. + ''' + self.client.RegisterDisconnectHandler(self.connection.set_event) + self.client.disconnect() + + self.assertTrue(self.connection.wait(), msg='waiting for disconnecting') + + def test_proper_connect_sasl(self): + ''' + The ideal testcase - client is connected, authenticated with SASL and + then disconnected. + ''' + self.open_stream(xmpp_server_port) + + # if client is not connected, lets raise the AssertionError + self.assert_(self.client.get_connect_type()) + # client.disconnect() is already called from NBClient via + # _on_connected_failure, no need to call it here + + self.client_auth(credentials[0], credentials[1], credentials[2], sasl=1) + self.assert_(self.connection.con) + self.assert_(self.connection.auth=='sasl', msg='Unable to auth via SASL') + + self.do_disconnect() + + def test_proper_connect_oldauth(self): + ''' + The ideal testcase - client is connected, authenticated with old auth and + then disconnected. + ''' + self.open_stream(xmpp_server_port) + self.assert_(self.client.get_connect_type()) + self.client_auth(credentials[0], credentials[1], credentials[2], sasl=0) + self.assert_(self.connection.con) + features = self.client.Dispatcher.Stream.features + if not features.getTag('auth'): + print "Server doesn't support old authentication type, ignoring test" + else: + self.assert_(self.connection.auth=='old_auth', + msg='Unable to auth via old_auth') + self.do_disconnect() + + def test_connect_to_nonexisting_host(self): + ''' + Connect to nonexisting host. DNS request for A records should return + nothing. + ''' + self.open_stream(('fdsfsdf.fdsf.fss', 5222)) + self.assert_(not self.client.get_connect_type()) + + def test_connect_to_wrong_port(self): + ''' + Connect to nonexisting server. DNS request for A records should return an + IP but there shouldn't be XMPP server running on specified port. + ''' + self.open_stream((xmpp_server_port[0], 31337)) + self.assert_(not self.client.get_connect_type()) + + def test_connect_with_wrong_creds(self): + ''' + Connecting with invalid password. + ''' + self.open_stream(xmpp_server_port, wrong_pass=True) + self.assert_(self.client.get_connect_type()) + self.client_auth(credentials[0], 'wrong pass', credentials[2], sasl=1) + self.assert_(self.connection.auth is None) + self.do_disconnect() if __name__ == '__main__': - unittest.main() - -# vim: se ts=3: + unittest.main() diff --git a/test/integration/test_xmpp_transports_nb.py b/test/integration/test_xmpp_transports_nb.py index ef9908903..f7de8acf9 100644 --- a/test/integration/test_xmpp_transports_nb.py +++ b/test/integration/test_xmpp_transports_nb.py @@ -14,265 +14,263 @@ from common.xmpp import transports_nb class AbstractTransportTest(unittest.TestCase): - ''' Encapsulates Idlequeue instantiation for transports and more...''' - - def setUp(self): - ''' IdleQueue thread is run and dummy connection is created. ''' - self.idlequeue_thread = IdleQueueThread() - self.idlequeue_thread.start() - self._setup_hook() - - def tearDown(self): - ''' IdleQueue thread is stopped. ''' - self._teardown_hook() - self.idlequeue_thread.stop_thread() - self.idlequeue_thread.join() - - def _setup_hook(self): - pass - - def _teardown_hook(self): - pass - - def expect_receive(self, expected, count=1, msg=None): - ''' - Returns a callback function that will assert whether the data passed to - it equals the one specified when calling this function. - - Can be used to make sure transport dispatch correct data. - ''' - def receive(data, *args, **kwargs): - self.assertEqual(data, expected, msg=msg) - self._expected_count -= 1 - self._expected_count = count - return receive - - def have_received_expected(self): - ''' - Plays together with expect_receive(). Will return true if expected_rcv - callback was called as often as specified - ''' - return self._expected_count == 0 + ''' Encapsulates Idlequeue instantiation for transports and more...''' + + def setUp(self): + ''' IdleQueue thread is run and dummy connection is created. ''' + self.idlequeue_thread = IdleQueueThread() + self.idlequeue_thread.start() + self._setup_hook() + + def tearDown(self): + ''' IdleQueue thread is stopped. ''' + self._teardown_hook() + self.idlequeue_thread.stop_thread() + self.idlequeue_thread.join() + + def _setup_hook(self): + pass + + def _teardown_hook(self): + pass + + def expect_receive(self, expected, count=1, msg=None): + ''' + Returns a callback function that will assert whether the data passed to + it equals the one specified when calling this function. + + Can be used to make sure transport dispatch correct data. + ''' + def receive(data, *args, **kwargs): + self.assertEqual(data, expected, msg=msg) + self._expected_count -= 1 + self._expected_count = count + return receive + + def have_received_expected(self): + ''' + Plays together with expect_receive(). Will return true if expected_rcv + callback was called as often as specified + ''' + return self._expected_count == 0 class TestNonBlockingTCP(AbstractTransportTest): - ''' - Test class for NonBlockingTCP. Will actually try to connect to an existing - XMPP server. - ''' - class MockClient(IdleMock): - ''' Simple client to test transport functionality ''' - def __init__(self, idlequeue, testcase): - self.idlequeue = idlequeue - self.testcase = testcase - IdleMock.__init__(self) - - def do_connect(self, establish_tls=False, proxy_dict=None): - try: - ips = socket.getaddrinfo('gajim.org', 5222, - socket.AF_UNSPEC,socket.SOCK_STREAM) - ip = ips[0] - except socket.error, e: - self.testcase.fail(msg=str(e)) - - self.socket = transports_nb.NonBlockingTCP( - raise_event=lambda event_type, data: self.testcase.assertTrue( - event_type and data), - on_disconnect=lambda: self.on_success(mode='SocketDisconnect'), - idlequeue=self.idlequeue, - estabilish_tls=establish_tls, - certs=('../data/other/cacerts.pem', 'tmp/cacerts.pem'), - proxy_dict=proxy_dict) - - self.socket.PlugIn(self) - - self.socket.connect(conn_5tuple=ip, - on_connect=lambda: self.on_success(mode='TCPconnect'), - on_connect_failure=self.on_failure) - self.testcase.assertTrue(self.wait(), msg='Connection timed out') - - def do_disconnect(self): - self.socket.disconnect() - self.testcase.assertTrue(self.wait(), msg='Disconnect timed out') - - def on_failure(self, err_message): - self.set_event() - self.testcase.fail(msg=err_message) - - def on_success(self, mode, data=None): - if mode == "TCPconnect": - pass - if mode == "SocketDisconnect": - pass - self.set_event() - - def _setup_hook(self): - self.client = self.MockClient(idlequeue=self.idlequeue_thread.iq, - testcase=self) - - def _teardown_hook(self): - if self.client.socket.state == 'CONNECTED': - self.client.do_disconnect() - - def test_connect_disconnect_plain(self): - ''' Establish plain connection ''' - self.client.do_connect(establish_tls=False) - self.assertEquals(self.client.socket.state, 'CONNECTED') - self.client.do_disconnect() - self.assertEquals(self.client.socket.state, 'DISCONNECTED') - -# def test_connect_disconnect_ssl(self): -# ''' Establish SSL (not TLS) connection ''' -# self.client.do_connect(establish_tls=True) -# self.assertEquals(self.client.socket.state, 'CONNECTED') -# self.client.do_disconnect() -# self.assertEquals(self.client.socket.state, 'DISCONNECTED') - - def test_do_receive(self): - ''' Test _do_receive method by overwriting socket.recv ''' - self.client.do_connect() - sock = self.client.socket - - # transport shall receive data - data = "Please don't fail" - sock._recv = lambda buffer: data - sock.onreceive(self.expect_receive(data)) - sock._do_receive() - self.assertTrue(self.have_received_expected(), msg='Did not receive data') - self.assert_(self.client.socket.state == 'CONNECTED') - - # transport shall do nothing as an non-fatal SSL is simulated - sock._recv = lambda buffer: None - sock.onreceive(self.assertFalse) # we did not receive anything... - sock._do_receive() - self.assert_(self.client.socket.state == 'CONNECTED') - - # transport shall disconnect as remote side closed the connection - sock._recv = lambda buffer: '' - sock.onreceive(self.assertFalse) # we did not receive anything... - sock._do_receive() - self.assert_(self.client.socket.state == 'DISCONNECTED') - - def test_do_send(self): - ''' Test _do_send method by overwriting socket.send ''' - self.client.do_connect() - sock = self.client.socket - - outgoing = [] # what we have actually send to our socket.socket - data_part1 = "Please don't " - data_part2 = "fail!" - data_complete = data_part1 + data_part2 - - # Simulate everything could be send in one go - def _send_all(data): - outgoing.append(data) - return len(data) - sock._send = _send_all - sock.send(data_part1) - sock.send(data_part2) - sock._do_send() - sock._do_send() - self.assertTrue(self.client.socket.state == 'CONNECTED') - self.assertTrue(data_part1 in outgoing and data_part2 in outgoing) - self.assertFalse(sock.sendqueue and sock.sendbuff, - msg='There is still unsend data in buffers') - - # Simulate data could only be sent in chunks - self.chunk_count = 0 - outgoing = [] - def _send_chunks(data): - if self.chunk_count == 0: - outgoing.append(data_part1) - self.chunk_count += 1 - return len(data_part1) - else: - outgoing.append(data_part2) - return len(data_part2) - sock._send = _send_chunks - sock.send(data_complete) - sock._do_send() # process first chunk - sock._do_send() # process the second one - self.assertTrue(self.client.socket.state == 'CONNECTED') - self.assertTrue(data_part1 in outgoing and data_part2 in outgoing) - self.assertFalse(sock.sendqueue and sock.sendbuff, - msg='There is still unsend data in buffers') + ''' + Test class for NonBlockingTCP. Will actually try to connect to an existing + XMPP server. + ''' + class MockClient(IdleMock): + ''' Simple client to test transport functionality ''' + def __init__(self, idlequeue, testcase): + self.idlequeue = idlequeue + self.testcase = testcase + IdleMock.__init__(self) + + def do_connect(self, establish_tls=False, proxy_dict=None): + try: + ips = socket.getaddrinfo('gajim.org', 5222, + socket.AF_UNSPEC,socket.SOCK_STREAM) + ip = ips[0] + except socket.error, e: + self.testcase.fail(msg=str(e)) + + self.socket = transports_nb.NonBlockingTCP( + raise_event=lambda event_type, data: self.testcase.assertTrue( + event_type and data), + on_disconnect=lambda: self.on_success(mode='SocketDisconnect'), + idlequeue=self.idlequeue, + estabilish_tls=establish_tls, + certs=('../data/other/cacerts.pem', 'tmp/cacerts.pem'), + proxy_dict=proxy_dict) + + self.socket.PlugIn(self) + + self.socket.connect(conn_5tuple=ip, + on_connect=lambda: self.on_success(mode='TCPconnect'), + on_connect_failure=self.on_failure) + self.testcase.assertTrue(self.wait(), msg='Connection timed out') + + def do_disconnect(self): + self.socket.disconnect() + self.testcase.assertTrue(self.wait(), msg='Disconnect timed out') + + def on_failure(self, err_message): + self.set_event() + self.testcase.fail(msg=err_message) + + def on_success(self, mode, data=None): + if mode == "TCPconnect": + pass + if mode == "SocketDisconnect": + pass + self.set_event() + + def _setup_hook(self): + self.client = self.MockClient(idlequeue=self.idlequeue_thread.iq, + testcase=self) + + def _teardown_hook(self): + if self.client.socket.state == 'CONNECTED': + self.client.do_disconnect() + + def test_connect_disconnect_plain(self): + ''' Establish plain connection ''' + self.client.do_connect(establish_tls=False) + self.assertEquals(self.client.socket.state, 'CONNECTED') + self.client.do_disconnect() + self.assertEquals(self.client.socket.state, 'DISCONNECTED') + +# def test_connect_disconnect_ssl(self): +# ''' Establish SSL (not TLS) connection ''' +# self.client.do_connect(establish_tls=True) +# self.assertEquals(self.client.socket.state, 'CONNECTED') +# self.client.do_disconnect() +# self.assertEquals(self.client.socket.state, 'DISCONNECTED') + + def test_do_receive(self): + ''' Test _do_receive method by overwriting socket.recv ''' + self.client.do_connect() + sock = self.client.socket + + # transport shall receive data + data = "Please don't fail" + sock._recv = lambda buffer: data + sock.onreceive(self.expect_receive(data)) + sock._do_receive() + self.assertTrue(self.have_received_expected(), msg='Did not receive data') + self.assert_(self.client.socket.state == 'CONNECTED') + + # transport shall do nothing as an non-fatal SSL is simulated + sock._recv = lambda buffer: None + sock.onreceive(self.assertFalse) # we did not receive anything... + sock._do_receive() + self.assert_(self.client.socket.state == 'CONNECTED') + + # transport shall disconnect as remote side closed the connection + sock._recv = lambda buffer: '' + sock.onreceive(self.assertFalse) # we did not receive anything... + sock._do_receive() + self.assert_(self.client.socket.state == 'DISCONNECTED') + + def test_do_send(self): + ''' Test _do_send method by overwriting socket.send ''' + self.client.do_connect() + sock = self.client.socket + + outgoing = [] # what we have actually send to our socket.socket + data_part1 = "Please don't " + data_part2 = "fail!" + data_complete = data_part1 + data_part2 + + # Simulate everything could be send in one go + def _send_all(data): + outgoing.append(data) + return len(data) + sock._send = _send_all + sock.send(data_part1) + sock.send(data_part2) + sock._do_send() + sock._do_send() + self.assertTrue(self.client.socket.state == 'CONNECTED') + self.assertTrue(data_part1 in outgoing and data_part2 in outgoing) + self.assertFalse(sock.sendqueue and sock.sendbuff, + msg='There is still unsend data in buffers') + + # Simulate data could only be sent in chunks + self.chunk_count = 0 + outgoing = [] + def _send_chunks(data): + if self.chunk_count == 0: + outgoing.append(data_part1) + self.chunk_count += 1 + return len(data_part1) + else: + outgoing.append(data_part2) + return len(data_part2) + sock._send = _send_chunks + sock.send(data_complete) + sock._do_send() # process first chunk + sock._do_send() # process the second one + self.assertTrue(self.client.socket.state == 'CONNECTED') + self.assertTrue(data_part1 in outgoing and data_part2 in outgoing) + self.assertFalse(sock.sendqueue and sock.sendbuff, + msg='There is still unsend data in buffers') class TestNonBlockingHTTP(AbstractTransportTest): - ''' Test class for NonBlockingHTTP transport''' - - bosh_http_dict = { - 'http_uri': 'http://gajim.org:5280/http-bind', - 'http_version': 'HTTP/1.1', - 'http_persistent': True, - 'add_proxy_headers': False - } - - def _get_transport(self, http_dict, proxy_dict=None): - return transports_nb.NonBlockingHTTP( - raise_event=None, - on_disconnect=None, - idlequeue=self.idlequeue_thread.iq, - estabilish_tls=False, - certs=None, - on_http_request_possible=lambda: None, - on_persistent_fallback=None, - http_dict=http_dict, - proxy_dict=proxy_dict, - ) - - def test_parse_own_http_message(self): - ''' Build a HTTP message and try to parse it afterwards ''' - transport = self._get_transport(self.bosh_http_dict) - - data = "<test>Please don't fail!</test>" - http_message = transport.build_http_message(data) - statusline, headers, http_body, buffer_rest = transport.parse_http_message( - http_message) - - self.assertFalse(bool(buffer_rest)) - self.assertTrue(statusline and isinstance(statusline, list)) - self.assertTrue(headers and isinstance(headers, dict)) - self.assertEqual(data, http_body, msg='Input and output are different') - - def test_receive_http_message(self): - ''' Let _on_receive handle some http messages ''' - transport = self._get_transport(self.bosh_http_dict) - - header = ("HTTP/1.1 200 OK\r\nContent-Type: text/xml; charset=utf-8\r\n" + - "Content-Length: 88\r\n\r\n") - payload = "<test>Please don't fail!</test>" - body = "<body xmlns='http://jabber.org/protocol/httpbind'>%s</body>" \ - % payload - message = "%s%s" % (header, body) - - # try to receive in one go - transport.onreceive(self.expect_receive(body, msg='Failed: In one go')) - transport._on_receive(message) - self.assertTrue(self.have_received_expected(), msg='Failed: In one go') - - def test_receive_http_message_in_chunks(self): - ''' Let _on_receive handle some chunked http messages ''' - transport = self._get_transport(self.bosh_http_dict) - - payload = "<test>Please don't fail!\n\n</test>" - body = "<body xmlns='http://jabber.org/protocol/httpbind'>%s</body>" \ - % payload - header = "HTTP/1.1 200 OK\r\nContent-Type: text/xml; charset=utf-8\r\n" +\ - "Content-Length: %i\r\n\r\n" % len(body) - message = "%s%s" % (header, body) - - chunk1, chunk2, chunk3, chunk4 = message[:20], message[20:73], \ - message[73:85], message[85:] - nextmessage_chunk = "\r\n\r\nHTTP/1.1 200 OK\r\nContent-Type: text/x" - chunks = (chunk1, chunk2, chunk3, chunk4, nextmessage_chunk) - - transport.onreceive(self.expect_receive(body, msg='Failed: In chunks')) - for chunk in chunks: - transport._on_receive(chunk) - self.assertTrue(self.have_received_expected(), msg='Failed: In chunks') + ''' Test class for NonBlockingHTTP transport''' + + bosh_http_dict = { + 'http_uri': 'http://gajim.org:5280/http-bind', + 'http_version': 'HTTP/1.1', + 'http_persistent': True, + 'add_proxy_headers': False + } + + def _get_transport(self, http_dict, proxy_dict=None): + return transports_nb.NonBlockingHTTP( + raise_event=None, + on_disconnect=None, + idlequeue=self.idlequeue_thread.iq, + estabilish_tls=False, + certs=None, + on_http_request_possible=lambda: None, + on_persistent_fallback=None, + http_dict=http_dict, + proxy_dict=proxy_dict, + ) + + def test_parse_own_http_message(self): + ''' Build a HTTP message and try to parse it afterwards ''' + transport = self._get_transport(self.bosh_http_dict) + + data = "<test>Please don't fail!</test>" + http_message = transport.build_http_message(data) + statusline, headers, http_body, buffer_rest = transport.parse_http_message( + http_message) + + self.assertFalse(bool(buffer_rest)) + self.assertTrue(statusline and isinstance(statusline, list)) + self.assertTrue(headers and isinstance(headers, dict)) + self.assertEqual(data, http_body, msg='Input and output are different') + + def test_receive_http_message(self): + ''' Let _on_receive handle some http messages ''' + transport = self._get_transport(self.bosh_http_dict) + + header = ("HTTP/1.1 200 OK\r\nContent-Type: text/xml; charset=utf-8\r\n" + + "Content-Length: 88\r\n\r\n") + payload = "<test>Please don't fail!</test>" + body = "<body xmlns='http://jabber.org/protocol/httpbind'>%s</body>" \ + % payload + message = "%s%s" % (header, body) + + # try to receive in one go + transport.onreceive(self.expect_receive(body, msg='Failed: In one go')) + transport._on_receive(message) + self.assertTrue(self.have_received_expected(), msg='Failed: In one go') + + def test_receive_http_message_in_chunks(self): + ''' Let _on_receive handle some chunked http messages ''' + transport = self._get_transport(self.bosh_http_dict) + + payload = "<test>Please don't fail!\n\n</test>" + body = "<body xmlns='http://jabber.org/protocol/httpbind'>%s</body>" \ + % payload + header = "HTTP/1.1 200 OK\r\nContent-Type: text/xml; charset=utf-8\r\n" +\ + "Content-Length: %i\r\n\r\n" % len(body) + message = "%s%s" % (header, body) + + chunk1, chunk2, chunk3, chunk4 = message[:20], message[20:73], \ + message[73:85], message[85:] + nextmessage_chunk = "\r\n\r\nHTTP/1.1 200 OK\r\nContent-Type: text/x" + chunks = (chunk1, chunk2, chunk3, chunk4, nextmessage_chunk) + + transport.onreceive(self.expect_receive(body, msg='Failed: In chunks')) + for chunk in chunks: + transport._on_receive(chunk) + self.assertTrue(self.have_received_expected(), msg='Failed: In chunks') if __name__ == '__main__': - unittest.main() - -# vim: se ts=3: + unittest.main() |