Welcome to mirror list, hosted at ThFree Co, Russian Federation.

dev.gajim.org/gajim/gajim.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorÉric Araujo <merwok@netwok.org>2010-02-08 17:08:40 +0300
committerÉric Araujo <merwok@netwok.org>2010-02-08 17:08:40 +0300
commitfedd7dc8e22950f0dbe297443860662368c249d1 (patch)
treea6426601d50b3020348fd6574107c9fd6f9dfb0e /test/integration
parent1a69ea93f1316e9f98d50ac8a212498b42d3cf60 (diff)
convert tabs to spaces in source code thanks to reindent.py
holy diff batman!
Diffstat (limited to 'test/integration')
-rw-r--r--test/integration/__init__.py2
-rw-r--r--test/integration/test_gui_event_integration.py234
-rw-r--r--test/integration/test_resolver.py158
-rw-r--r--test/integration/test_roster.py336
-rw-r--r--test/integration/test_xmpp_client_nb.py282
-rw-r--r--test/integration/test_xmpp_transports_nb.py506
6 files changed, 754 insertions, 764 deletions
diff --git a/test/integration/__init__.py b/test/integration/__init__.py
index 0adf844f1..aaeacfbef 100644
--- a/test/integration/__init__.py
+++ b/test/integration/__init__.py
@@ -3,4 +3,4 @@
This package contains integration tests. Integration tests are tests
which require or include UI, network or both.
-''' \ No newline at end of file
+'''
diff --git a/test/integration/test_gui_event_integration.py b/test/integration/test_gui_event_integration.py
index a1eadea60..c5999389f 100644
--- a/test/integration/test_gui_event_integration.py
+++ b/test/integration/test_gui_event_integration.py
@@ -23,171 +23,169 @@ import roster_window
import notify
class TestStatusChange(unittest.TestCase):
- '''tests gajim.py's incredibly complex handle_event_notify'''
+ '''tests gajim.py's incredibly complex handle_event_notify'''
- def setUp(self):
-
- gajim.connections = {}
- gajim.contacts = contacts_module.LegacyContactsAPI()
- gajim.interface.roster = roster_window.RosterWindow()
+ def setUp(self):
- for acc in contacts:
- gajim.connections[acc] = MockConnection(acc)
+ gajim.connections = {}
+ gajim.contacts = contacts_module.LegacyContactsAPI()
+ gajim.interface.roster = roster_window.RosterWindow()
- gajim.interface.roster.fill_contacts_and_groups_dicts(contacts[acc],
- acc)
- gajim.interface.roster.add_account(acc)
- gajim.interface.roster.add_account_contacts(acc)
+ for acc in contacts:
+ gajim.connections[acc] = MockConnection(acc)
- self.assertEqual(0, len(notify.notifications))
+ gajim.interface.roster.fill_contacts_and_groups_dicts(contacts[acc],
+ acc)
+ gajim.interface.roster.add_account(acc)
+ gajim.interface.roster.add_account_contacts(acc)
- def tearDown(self):
- notify.notifications = []
+ self.assertEqual(0, len(notify.notifications))
- def contact_comes_online(self, account, jid, resource, prio):
- '''a remote contact comes online'''
- gajim.interface.handle_event_notify(account, (jid, 'online', "I'm back!",
- resource, prio, None, time.time(), None))
+ def tearDown(self):
+ notify.notifications = []
- contact = None
- for c in gajim.contacts.get_contacts(account, jid):
- if c.resource == resource:
- contact = c
- break
+ def contact_comes_online(self, account, jid, resource, prio):
+ '''a remote contact comes online'''
+ gajim.interface.handle_event_notify(account, (jid, 'online', "I'm back!",
+ resource, prio, None, time.time(), None))
- self.assertEqual('online', contact.show)
- self.assertEqual("I'm back!", contact.status)
- self.assertEqual(prio, contact.priority)
+ contact = None
+ for c in gajim.contacts.get_contacts(account, jid):
+ if c.resource == resource:
+ contact = c
+ break
- # the most recent notification is that the contact connected
- self.assertEqual('contact_connected', notify.notifications[-1][0])
+ self.assertEqual('online', contact.show)
+ self.assertEqual("I'm back!", contact.status)
+ self.assertEqual(prio, contact.priority)
- def contact_goes_offline(self, account, jid, resource, prio,
- still_exists = True):
- '''a remote contact goes offline.'''
- gajim.interface.handle_event_notify(account, (jid, 'offline', 'Goodbye!',
- resource, prio, None, time.time(), None))
+ # the most recent notification is that the contact connected
+ self.assertEqual('contact_connected', notify.notifications[-1][0])
- contact = None
- for c in gajim.contacts.get_contacts(account, jid):
- if c.resource == resource:
- contact = c
- break
+ def contact_goes_offline(self, account, jid, resource, prio,
+ still_exists = True):
+ '''a remote contact goes offline.'''
+ gajim.interface.handle_event_notify(account, (jid, 'offline', 'Goodbye!',
+ resource, prio, None, time.time(), None))
- if not still_exists:
- self.assert_(contact is None)
- return
+ contact = None
+ for c in gajim.contacts.get_contacts(account, jid):
+ if c.resource == resource:
+ contact = c
+ break
- self.assertEqual('offline', contact.show)
- self.assertEqual('Goodbye!', contact.status)
- self.assertEqual(prio, contact.priority)
+ if not still_exists:
+ self.assert_(contact is None)
+ return
- self.assertEqual('contact_disconnected', notify.notifications[-1][0])
+ self.assertEqual('offline', contact.show)
+ self.assertEqual('Goodbye!', contact.status)
+ self.assertEqual(prio, contact.priority)
- def user_starts_chatting(self, jid, account, resource=None):
- '''the user opens a chat window and starts talking'''
- ctrl = MockChatControl(jid, account)
- win = MockWindow()
- win.new_tab(ctrl)
- gajim.interface.msg_win_mgr._windows['test'] = win
+ self.assertEqual('contact_disconnected', notify.notifications[-1][0])
- if resource:
- jid = jid + '/' + resource
+ def user_starts_chatting(self, jid, account, resource=None):
+ '''the user opens a chat window and starts talking'''
+ ctrl = MockChatControl(jid, account)
+ win = MockWindow()
+ win.new_tab(ctrl)
+ gajim.interface.msg_win_mgr._windows['test'] = win
- # a basic session is started
- session = gajim.connections[account1].make_new_session(jid,
- '01234567890abcdef', cls=MockSession)
- ctrl.set_session(session)
+ if resource:
+ jid = jid + '/' + resource
- return ctrl
+ # a basic session is started
+ session = gajim.connections[account1].make_new_session(jid,
+ '01234567890abcdef', cls=MockSession)
+ ctrl.set_session(session)
- def user_starts_esession(self, jid, resource, account):
- '''the user opens a chat window and starts an encrypted session'''
- ctrl = self.user_starts_chatting(jid, account, resource)
- ctrl.session.status = 'active'
- ctrl.session.enable_encryption = True
+ return ctrl
- return ctrl
+ def user_starts_esession(self, jid, resource, account):
+ '''the user opens a chat window and starts an encrypted session'''
+ ctrl = self.user_starts_chatting(jid, account, resource)
+ ctrl.session.status = 'active'
+ ctrl.session.enable_encryption = True
- def test_contact_comes_online(self):
- jid = 'default1@gajim.org'
+ return ctrl
- # contact is offline initially
- contacts = gajim.contacts.get_contacts(account1, jid)
- self.assertEqual(1, len(contacts))
- self.assertEqual('offline', contacts[0].show)
- self.assertEqual('', contacts[0].status)
+ def test_contact_comes_online(self):
+ jid = 'default1@gajim.org'
- self.contact_comes_online(account1, jid, 'lowprio', 1)
+ # contact is offline initially
+ contacts = gajim.contacts.get_contacts(account1, jid)
+ self.assertEqual(1, len(contacts))
+ self.assertEqual('offline', contacts[0].show)
+ self.assertEqual('', contacts[0].status)
- def test_contact_goes_offline(self):
- jid = 'default1@gajim.org'
+ self.contact_comes_online(account1, jid, 'lowprio', 1)
- self.contact_comes_online(account1, jid, 'lowprio', 1)
+ def test_contact_goes_offline(self):
+ jid = 'default1@gajim.org'
- ctrl = self.user_starts_chatting(jid, account1)
- orig_sess = ctrl.session
+ self.contact_comes_online(account1, jid, 'lowprio', 1)
- self.contact_goes_offline(account1, jid, 'lowprio', 1)
+ ctrl = self.user_starts_chatting(jid, account1)
+ orig_sess = ctrl.session
- # session hasn't changed since we were talking to the bare jid
- self.assertEqual(orig_sess, ctrl.session)
+ self.contact_goes_offline(account1, jid, 'lowprio', 1)
- def test_two_resources_higher_comes_online(self):
- jid = 'default1@gajim.org'
+ # session hasn't changed since we were talking to the bare jid
+ self.assertEqual(orig_sess, ctrl.session)
- self.contact_comes_online(account1, jid, 'lowprio', 1)
+ def test_two_resources_higher_comes_online(self):
+ jid = 'default1@gajim.org'
- ctrl = self.user_starts_chatting(jid, account1)
+ self.contact_comes_online(account1, jid, 'lowprio', 1)
- self.contact_comes_online(account1, jid, 'highprio', 50)
+ ctrl = self.user_starts_chatting(jid, account1)
- # old session was dropped
- self.assertEqual(None, ctrl.session)
+ self.contact_comes_online(account1, jid, 'highprio', 50)
- def test_two_resources_higher_goes_offline(self):
- jid = 'default1@gajim.org'
+ # old session was dropped
+ self.assertEqual(None, ctrl.session)
- self.contact_comes_online(account1, jid, 'lowprio', 1)
- self.contact_comes_online(account1, jid, 'highprio', 50)
+ def test_two_resources_higher_goes_offline(self):
+ jid = 'default1@gajim.org'
- ctrl = self.user_starts_chatting(jid, account1)
+ self.contact_comes_online(account1, jid, 'lowprio', 1)
+ self.contact_comes_online(account1, jid, 'highprio', 50)
- self.contact_goes_offline(account1, jid, 'highprio', 50,
- still_exists=False)
+ ctrl = self.user_starts_chatting(jid, account1)
- # old session was dropped
- self.assertEqual(None, ctrl.session)
+ self.contact_goes_offline(account1, jid, 'highprio', 50,
+ still_exists=False)
- def test_two_resources_higher_comes_online_with_esession(self):
- jid = 'default1@gajim.org'
+ # old session was dropped
+ self.assertEqual(None, ctrl.session)
- self.contact_comes_online(account1, jid, 'lowprio', 1)
+ def test_two_resources_higher_comes_online_with_esession(self):
+ jid = 'default1@gajim.org'
- ctrl = self.user_starts_esession(jid, 'lowprio', account1)
+ self.contact_comes_online(account1, jid, 'lowprio', 1)
- self.contact_comes_online(account1, jid, 'highprio', 50)
+ ctrl = self.user_starts_esession(jid, 'lowprio', account1)
- # session was associated with the low priority full jid, so it should
- # have been removed from the control
- self.assertEqual(None, ctrl.session)
+ self.contact_comes_online(account1, jid, 'highprio', 50)
- def test_two_resources_higher_goes_offline_with_esession(self):
- jid = 'default1@gajim.org'
+ # session was associated with the low priority full jid, so it should
+ # have been removed from the control
+ self.assertEqual(None, ctrl.session)
- self.contact_comes_online(account1, jid, 'lowprio', 1)
- self.contact_comes_online(account1, jid, 'highprio', 50)
+ def test_two_resources_higher_goes_offline_with_esession(self):
+ jid = 'default1@gajim.org'
- ctrl = self.user_starts_esession(jid, 'highprio', account1)
+ self.contact_comes_online(account1, jid, 'lowprio', 1)
+ self.contact_comes_online(account1, jid, 'highprio', 50)
- self.contact_goes_offline(account1, jid, 'highprio', 50,
- still_exists=False)
+ ctrl = self.user_starts_esession(jid, 'highprio', account1)
- # session was associated with the high priority full jid, so it should
- # have been removed from the control
- self.assertEqual(None, ctrl.session)
+ self.contact_goes_offline(account1, jid, 'highprio', 50,
+ still_exists=False)
-if __name__ == '__main__':
- unittest.main()
+ # session was associated with the high priority full jid, so it should
+ # have been removed from the control
+ self.assertEqual(None, ctrl.session)
-# vim: se ts=3:
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/integration/test_resolver.py b/test/integration/test_resolver.py
index 1836d3c7b..d80ffee87 100644
--- a/test/integration/test_resolver.py
+++ b/test/integration/test_resolver.py
@@ -16,87 +16,85 @@ NONSENSE_NAME = 'sfsdfsdfsdf.sdfs.fsd'
JABBERCZ_TXT_NAME = '_xmppconnect.jabber.cz'
JABBERCZ_SRV_NAME = '_xmpp-client._tcp.jabber.cz'
-TEST_LIST = [(GMAIL_SRV_NAME, 'srv', True),
- (NONSENSE_NAME, 'srv', False),
- (JABBERCZ_SRV_NAME, 'srv', True)]
+TEST_LIST = [(GMAIL_SRV_NAME, 'srv', True),
+ (NONSENSE_NAME, 'srv', False),
+ (JABBERCZ_SRV_NAME, 'srv', True)]
class TestResolver(unittest.TestCase):
- '''
- Test for LibAsyncNSResolver and NSLookupResolver. Requires working
- network connection.
- '''
- def setUp(self):
- self.idlequeue_thread = IdleQueueThread()
- self.idlequeue_thread.start()
-
- self.iq = self.idlequeue_thread.iq
- self._reset()
- self.resolver = None
-
- def tearDown(self):
- self.idlequeue_thread.stop_thread()
- self.idlequeue_thread.join()
-
- def _reset(self):
- self.flag = False
- self.expect_results = False
- self.nslookup = False
- self.resolver = None
-
- def testLibAsyncNSResolver(self):
- self._reset()
- if not resolver.USE_LIBASYNCNS:
- print 'testLibAsyncResolver: libasyncns-python not installed'
- return
- self.resolver = resolver.LibAsyncNSResolver()
-
- for name, type, expect_results in TEST_LIST:
- self.expect_results = expect_results
- self._runLANSR(name, type)
- self.flag = False
-
- def _runLANSR(self, name, type):
- self.resolver.resolve(
- host = name,
- type = type,
- on_ready = self._myonready)
- while not self.flag:
- time.sleep(1)
- self.resolver.process()
-
- def _myonready(self, name, result_set):
- if __name__ == '__main__':
- from pprint import pprint
- pprint('on_ready called ...')
- pprint('hostname: %s' % name)
- pprint('result set: %s' % result_set)
- pprint('res.resolved_hosts: %s' % self.resolver.resolved_hosts)
- pprint('')
- if self.expect_results:
- self.assert_(len(result_set) > 0)
- else:
- self.assert_(result_set == [])
- self.flag = True
- if self.nslookup:
- self._testNSLR()
-
- def testNSLookupResolver(self):
- self._reset()
- self.nslookup = True
- self.resolver = resolver.NSLookupResolver(self.iq)
- self.test_list = TEST_LIST
- self._testNSLR()
-
- def _testNSLR(self):
- if self.test_list == []:
- return
- name, type, self.expect_results = self.test_list.pop()
- self.resolver.resolve(
- host = name,
- type = type,
- on_ready = self._myonready)
+ '''
+ Test for LibAsyncNSResolver and NSLookupResolver. Requires working
+ network connection.
+ '''
+ def setUp(self):
+ self.idlequeue_thread = IdleQueueThread()
+ self.idlequeue_thread.start()
+
+ self.iq = self.idlequeue_thread.iq
+ self._reset()
+ self.resolver = None
+
+ def tearDown(self):
+ self.idlequeue_thread.stop_thread()
+ self.idlequeue_thread.join()
+
+ def _reset(self):
+ self.flag = False
+ self.expect_results = False
+ self.nslookup = False
+ self.resolver = None
+
+ def testLibAsyncNSResolver(self):
+ self._reset()
+ if not resolver.USE_LIBASYNCNS:
+ print 'testLibAsyncResolver: libasyncns-python not installed'
+ return
+ self.resolver = resolver.LibAsyncNSResolver()
+
+ for name, type, expect_results in TEST_LIST:
+ self.expect_results = expect_results
+ self._runLANSR(name, type)
+ self.flag = False
+
+ def _runLANSR(self, name, type):
+ self.resolver.resolve(
+ host = name,
+ type = type,
+ on_ready = self._myonready)
+ while not self.flag:
+ time.sleep(1)
+ self.resolver.process()
+
+ def _myonready(self, name, result_set):
+ if __name__ == '__main__':
+ from pprint import pprint
+ pprint('on_ready called ...')
+ pprint('hostname: %s' % name)
+ pprint('result set: %s' % result_set)
+ pprint('res.resolved_hosts: %s' % self.resolver.resolved_hosts)
+ pprint('')
+ if self.expect_results:
+ self.assert_(len(result_set) > 0)
+ else:
+ self.assert_(result_set == [])
+ self.flag = True
+ if self.nslookup:
+ self._testNSLR()
+
+ def testNSLookupResolver(self):
+ self._reset()
+ self.nslookup = True
+ self.resolver = resolver.NSLookupResolver(self.iq)
+ self.test_list = TEST_LIST
+ self._testNSLR()
+
+ def _testNSLR(self):
+ if self.test_list == []:
+ return
+ name, type, self.expect_results = self.test_list.pop()
+ self.resolver.resolve(
+ host = name,
+ type = type,
+ on_ready = self._myonready)
if __name__ == '__main__':
- unittest.main()
-
-# vim: se ts=3:
+ unittest.main()
diff --git a/test/integration/test_roster.py b/test/integration/test_roster.py
index 0be85aa11..8b71df122 100644
--- a/test/integration/test_roster.py
+++ b/test/integration/test_roster.py
@@ -17,189 +17,187 @@ gajim.get_jid_from_account = lambda acc: 'myjid@' + acc
class TestRosterWindow(unittest.TestCase):
- def setUp(self):
- gajim.interface = MockInterface()
-
- self.C_NAME = roster_window.C_NAME
- self.C_TYPE = roster_window.C_TYPE
- self.C_JID = roster_window.C_JID
- self.C_ACCOUNT = roster_window.C_ACCOUNT
-
- # Add after creating RosterWindow
- # We want to test the filling explicitly
- gajim.contacts = contacts_module.LegacyContactsAPI()
- gajim.connections = {}
- self.roster = roster_window.RosterWindow()
-
- for acc in contacts:
- gajim.connections[acc] = MockConnection(acc)
- gajim.contacts.add_account(acc)
-
- ### Custom assertions
- def assert_all_contacts_are_in_roster(self, acc):
- for jid in contacts[acc]:
- self.assert_contact_is_in_roster(jid, acc)
-
- def assert_contact_is_in_roster(self, jid, account):
- contacts = gajim.contacts.get_contacts(account, jid)
- # check for all resources
- for contact in contacts:
- iters = self.roster._get_contact_iter(jid, account,
- model=self.roster.model)
-
- if jid != gajim.get_jid_from_account(account):
- # We don't care for groups of SelfContact
- self.assertTrue(len(iters) == len(contact.get_shown_groups()),
- msg='Contact is not in all his groups')
-
- # Are we big brother?
- bb_jid = None
- bb_account = None
- family = gajim.contacts.get_metacontacts_family(account, jid)
- if family:
- nearby_family, bb_jid, bb_account = \
- self.roster._get_nearby_family_and_big_brother(family, account)
-
- is_in_nearby_family = (jid, account) in (
- (data['jid'], data['account']) for data in nearby_family)
- self.assertTrue(is_in_nearby_family,
- msg='Contact not in his own nearby family')
-
- is_big_brother = (bb_jid, bb_account) == (jid, account)
-
- # check for each group tag
- for titerC in iters:
- self.assertTrue(self.roster.model.iter_is_valid(titerC),
- msg='Contact iter invalid')
-
- c_model = self.roster.model[titerC]
- self.assertEquals(contact.get_shown_name(), c_model[self.C_NAME],
- msg='Contact name missmatch')
- self.assertEquals(contact.jid, c_model[self.C_JID],
- msg='Jid missmatch')
-
- if not self.roster.regroup:
- self.assertEquals(account, c_model[self.C_ACCOUNT],
- msg='Account missmatch')
-
- # Check for correct nesting
- parent_iter = self.roster.model.iter_parent(titerC)
- p_model = self.roster.model[parent_iter]
- if family:
- if is_big_brother:
- self.assertTrue(p_model[self.C_TYPE] == 'group',
- msg='Big Brother is not on top')
- else:
- self.assertTrue(p_model[self.C_TYPE] == 'contact',
- msg='Little Brother brother has no BigB')
- else:
- if jid == gajim.get_jid_from_account(account):
- self.assertTrue(p_model[self.C_TYPE] == 'account',
- msg='SelfContact is not on top')
- else:
- self.assertTrue(p_model[self.C_TYPE] == 'group',
- msg='Contact not found in a group')
-
- def assert_group_is_in_roster(self, group, account):
- #TODO
- pass
-
- def assert_account_is_in_roster(self, acc):
- titerA = self.roster._get_account_iter(acc, model=self.roster.model)
- self.assertTrue(self.roster.model.iter_is_valid(titerA),
- msg='Account iter is invalid')
-
- acc_model = self.roster.model[titerA]
- self.assertEquals(acc_model[self.C_TYPE], 'account',
- msg='No account found')
-
- if not self.roster.regroup:
- self.assertEquals(acc_model[self.C_ACCOUNT], acc,
- msg='Account not found')
-
- self_jid = gajim.get_jid_from_account(acc)
- self.assertEquals(acc_model[self.C_JID], self_jid,
- msg='Account JID not found in account row')
-
- def assert_model_is_in_sync(self):
- #TODO: check that iter_n_children returns the correct numbers
- pass
-
- # tests
- def test_fill_contacts_and_groups_dicts(self):
- for acc in contacts:
- self.roster.fill_contacts_and_groups_dicts(contacts[acc], acc)
-
- for jid in contacts[acc]:
- instances = gajim.contacts.get_contacts(acc, jid)
-
- # Created a contact for each single jid?
- self.assertTrue(len(instances) == 1)
-
- # Contacts kept their info
- contact = instances[0]
- self.assertEquals(contact.groups, contacts[acc][jid]['groups'],
- msg='Group Missmatch')
-
- groups = contacts[acc][jid]['groups'] or ['General',]
-
- def test_fill_roster_model(self):
- for acc in contacts:
- self.roster.fill_contacts_and_groups_dicts(contacts[acc], acc)
-
- self.roster.add_account(acc)
- self.assert_account_is_in_roster(acc)
-
- self.roster.add_account_contacts(acc)
- self.assert_all_contacts_are_in_roster(acc)
-
- self.assert_model_is_in_sync()
+ def setUp(self):
+ gajim.interface = MockInterface()
+
+ self.C_NAME = roster_window.C_NAME
+ self.C_TYPE = roster_window.C_TYPE
+ self.C_JID = roster_window.C_JID
+ self.C_ACCOUNT = roster_window.C_ACCOUNT
+
+ # Add after creating RosterWindow
+ # We want to test the filling explicitly
+ gajim.contacts = contacts_module.LegacyContactsAPI()
+ gajim.connections = {}
+ self.roster = roster_window.RosterWindow()
+
+ for acc in contacts:
+ gajim.connections[acc] = MockConnection(acc)
+ gajim.contacts.add_account(acc)
+
+ ### Custom assertions
+ def assert_all_contacts_are_in_roster(self, acc):
+ for jid in contacts[acc]:
+ self.assert_contact_is_in_roster(jid, acc)
+
+ def assert_contact_is_in_roster(self, jid, account):
+ contacts = gajim.contacts.get_contacts(account, jid)
+ # check for all resources
+ for contact in contacts:
+ iters = self.roster._get_contact_iter(jid, account,
+ model=self.roster.model)
+
+ if jid != gajim.get_jid_from_account(account):
+ # We don't care for groups of SelfContact
+ self.assertTrue(len(iters) == len(contact.get_shown_groups()),
+ msg='Contact is not in all his groups')
+
+ # Are we big brother?
+ bb_jid = None
+ bb_account = None
+ family = gajim.contacts.get_metacontacts_family(account, jid)
+ if family:
+ nearby_family, bb_jid, bb_account = \
+ self.roster._get_nearby_family_and_big_brother(family, account)
+
+ is_in_nearby_family = (jid, account) in (
+ (data['jid'], data['account']) for data in nearby_family)
+ self.assertTrue(is_in_nearby_family,
+ msg='Contact not in his own nearby family')
+
+ is_big_brother = (bb_jid, bb_account) == (jid, account)
+
+ # check for each group tag
+ for titerC in iters:
+ self.assertTrue(self.roster.model.iter_is_valid(titerC),
+ msg='Contact iter invalid')
+
+ c_model = self.roster.model[titerC]
+ self.assertEquals(contact.get_shown_name(), c_model[self.C_NAME],
+ msg='Contact name missmatch')
+ self.assertEquals(contact.jid, c_model[self.C_JID],
+ msg='Jid missmatch')
+
+ if not self.roster.regroup:
+ self.assertEquals(account, c_model[self.C_ACCOUNT],
+ msg='Account missmatch')
+
+ # Check for correct nesting
+ parent_iter = self.roster.model.iter_parent(titerC)
+ p_model = self.roster.model[parent_iter]
+ if family:
+ if is_big_brother:
+ self.assertTrue(p_model[self.C_TYPE] == 'group',
+ msg='Big Brother is not on top')
+ else:
+ self.assertTrue(p_model[self.C_TYPE] == 'contact',
+ msg='Little Brother brother has no BigB')
+ else:
+ if jid == gajim.get_jid_from_account(account):
+ self.assertTrue(p_model[self.C_TYPE] == 'account',
+ msg='SelfContact is not on top')
+ else:
+ self.assertTrue(p_model[self.C_TYPE] == 'group',
+ msg='Contact not found in a group')
+
+ def assert_group_is_in_roster(self, group, account):
+ #TODO
+ pass
+
+ def assert_account_is_in_roster(self, acc):
+ titerA = self.roster._get_account_iter(acc, model=self.roster.model)
+ self.assertTrue(self.roster.model.iter_is_valid(titerA),
+ msg='Account iter is invalid')
+
+ acc_model = self.roster.model[titerA]
+ self.assertEquals(acc_model[self.C_TYPE], 'account',
+ msg='No account found')
+
+ if not self.roster.regroup:
+ self.assertEquals(acc_model[self.C_ACCOUNT], acc,
+ msg='Account not found')
+
+ self_jid = gajim.get_jid_from_account(acc)
+ self.assertEquals(acc_model[self.C_JID], self_jid,
+ msg='Account JID not found in account row')
+
+ def assert_model_is_in_sync(self):
+ #TODO: check that iter_n_children returns the correct numbers
+ pass
+
+ # tests
+ def test_fill_contacts_and_groups_dicts(self):
+ for acc in contacts:
+ self.roster.fill_contacts_and_groups_dicts(contacts[acc], acc)
+
+ for jid in contacts[acc]:
+ instances = gajim.contacts.get_contacts(acc, jid)
+
+ # Created a contact for each single jid?
+ self.assertTrue(len(instances) == 1)
+
+ # Contacts kept their info
+ contact = instances[0]
+ self.assertEquals(contact.groups, contacts[acc][jid]['groups'],
+ msg='Group Missmatch')
+
+ groups = contacts[acc][jid]['groups'] or ['General',]
+
+ def test_fill_roster_model(self):
+ for acc in contacts:
+ self.roster.fill_contacts_and_groups_dicts(contacts[acc], acc)
+
+ self.roster.add_account(acc)
+ self.assert_account_is_in_roster(acc)
+
+ self.roster.add_account_contacts(acc)
+ self.assert_all_contacts_are_in_roster(acc)
+
+ self.assert_model_is_in_sync()
class TestRosterWindowRegrouped(TestRosterWindow):
- def setUp(self):
- gajim.config.set('mergeaccounts', True)
- TestRosterWindow.setUp(self)
+ def setUp(self):
+ gajim.config.set('mergeaccounts', True)
+ TestRosterWindow.setUp(self)
- def test_toggle_regroup(self):
- self.roster.regroup = not self.roster.regroup
- self.roster.setup_and_draw_roster()
- self.roster.regroup = not self.roster.regroup
- self.roster.setup_and_draw_roster()
+ def test_toggle_regroup(self):
+ self.roster.regroup = not self.roster.regroup
+ self.roster.setup_and_draw_roster()
+ self.roster.regroup = not self.roster.regroup
+ self.roster.setup_and_draw_roster()
class TestRosterWindowMetaContacts(TestRosterWindowRegrouped):
- def test_receive_metacontact_data(self):
- for complete_data in metacontact_data:
- t_acc = complete_data[0]['account']
- t_jid = complete_data[0]['jid']
- data = complete_data[1:]
- for brother in data:
- acc = brother['account']
- jid = brother['jid']
- gajim.contacts.add_metacontact(t_acc, t_jid, acc, jid)
- self.roster.setup_and_draw_roster()
+ def test_receive_metacontact_data(self):
+ for complete_data in metacontact_data:
+ t_acc = complete_data[0]['account']
+ t_jid = complete_data[0]['jid']
+ data = complete_data[1:]
+ for brother in data:
+ acc = brother['account']
+ jid = brother['jid']
+ gajim.contacts.add_metacontact(t_acc, t_jid, acc, jid)
+ self.roster.setup_and_draw_roster()
- def test_connect_new_metacontact(self):
- self.test_fill_roster_model()
+ def test_connect_new_metacontact(self):
+ self.test_fill_roster_model()
- jid = u'coolstuff@gajim.org'
- contact = gajim.contacts.create_contact(jid, account1)
- gajim.contacts.add_contact(account1, contact)
- self.roster.add_contact(jid, account1)
- self.roster.chg_contact_status(contact, 'offline', '', account1)
+ jid = u'coolstuff@gajim.org'
+ contact = gajim.contacts.create_contact(jid, account1)
+ gajim.contacts.add_contact(account1, contact)
+ self.roster.add_contact(jid, account1)
+ self.roster.chg_contact_status(contact, 'offline', '', account1)
- gajim.contacts.add_metacontact(account1, u'samejid@gajim.org',
- account1, jid)
- self.roster.chg_contact_status(contact, 'online', '', account1)
+ gajim.contacts.add_metacontact(account1, u'samejid@gajim.org',
+ account1, jid)
+ self.roster.chg_contact_status(contact, 'online', '', account1)
- self.assert_model_is_in_sync()
+ self.assert_model_is_in_sync()
if __name__ == '__main__':
- unittest.main()
-
-# vim: se ts=3:
+ unittest.main()
diff --git a/test/integration/test_xmpp_client_nb.py b/test/integration/test_xmpp_client_nb.py
index 24a54a3ca..2efdcfe91 100644
--- a/test/integration/test_xmpp_client_nb.py
+++ b/test/integration/test_xmpp_client_nb.py
@@ -24,148 +24,146 @@ xmpp_server_port = ('gajim.org', 5222)
credentials = ['unittest', 'testtest', 'res']
class TestNonBlockingClient(unittest.TestCase):
- '''
- Test Cases class for NonBlockingClient.
- '''
- def setUp(self):
- ''' IdleQueue thread is run and dummy connection is created. '''
- self.idlequeue_thread = IdleQueueThread()
- self.connection = MockConnection() # for dummy callbacks
- self.idlequeue_thread.start()
-
- def tearDown(self):
- ''' IdleQueue thread is stopped. '''
- self.idlequeue_thread.stop_thread()
- self.idlequeue_thread.join()
-
- self.client = None
-
- def open_stream(self, server_port, wrong_pass=False):
- '''
- Method opening the XMPP connection. It returns when <stream:features>
- is received from server.
-
- :param server_port: tuple of (hostname, port) for where the client should
- connect.
- '''
-
- class TempConnection():
- def get_password(self, cb):
- if wrong_pass:
- cb('wrong pass')
- else:
- cb(credentials[1])
- def on_connect_failure(self):
- pass
-
- self.client = client_nb.NonBlockingClient(
- domain=server_port[0],
- idlequeue=self.idlequeue_thread.iq,
- caller=Mock(realClass=TempConnection))
-
- self.client.connect(
- hostname=server_port[0],
- port=server_port[1],
- on_connect=lambda *args: self.connection.on_connect(True, *args),
- on_connect_failure=lambda *args: self.connection.on_connect(
- False, *args))
-
- self.assert_(self.connection.wait(),
- msg='waiting for callback from client constructor')
-
- # if on_connect was called, client has to be connected and vice versa
- if self.connection.connect_succeeded:
- self.assert_(self.client.get_connect_type())
- else:
- self.assert_(not self.client.get_connect_type())
-
- def client_auth(self, username, password, resource, sasl):
- '''
- Method authenticating connected client with supplied credentials. Returns
- when authentication is over.
-
- :param sasl: whether to use sasl (sasl=1) or old (sasl=0) authentication
- :todo: to check and be more specific about when it returns
- (bind, session..)
- '''
- self.client.auth(username, password, resource, sasl,
- on_auth=self.connection.on_auth)
-
- self.assert_(self.connection.wait(), msg='waiting for authentication')
-
- def do_disconnect(self):
- '''
- Does disconnecting of connected client. Returns when TCP connection is
- closed.
- '''
- self.client.RegisterDisconnectHandler(self.connection.set_event)
- self.client.disconnect()
-
- self.assertTrue(self.connection.wait(), msg='waiting for disconnecting')
-
- def test_proper_connect_sasl(self):
- '''
- The ideal testcase - client is connected, authenticated with SASL and
- then disconnected.
- '''
- self.open_stream(xmpp_server_port)
-
- # if client is not connected, lets raise the AssertionError
- self.assert_(self.client.get_connect_type())
- # client.disconnect() is already called from NBClient via
- # _on_connected_failure, no need to call it here
-
- self.client_auth(credentials[0], credentials[1], credentials[2], sasl=1)
- self.assert_(self.connection.con)
- self.assert_(self.connection.auth=='sasl', msg='Unable to auth via SASL')
-
- self.do_disconnect()
-
- def test_proper_connect_oldauth(self):
- '''
- The ideal testcase - client is connected, authenticated with old auth and
- then disconnected.
- '''
- self.open_stream(xmpp_server_port)
- self.assert_(self.client.get_connect_type())
- self.client_auth(credentials[0], credentials[1], credentials[2], sasl=0)
- self.assert_(self.connection.con)
- features = self.client.Dispatcher.Stream.features
- if not features.getTag('auth'):
- print "Server doesn't support old authentication type, ignoring test"
- else:
- self.assert_(self.connection.auth=='old_auth',
- msg='Unable to auth via old_auth')
- self.do_disconnect()
-
- def test_connect_to_nonexisting_host(self):
- '''
- Connect to nonexisting host. DNS request for A records should return
- nothing.
- '''
- self.open_stream(('fdsfsdf.fdsf.fss', 5222))
- self.assert_(not self.client.get_connect_type())
-
- def test_connect_to_wrong_port(self):
- '''
- Connect to nonexisting server. DNS request for A records should return an
- IP but there shouldn't be XMPP server running on specified port.
- '''
- self.open_stream((xmpp_server_port[0], 31337))
- self.assert_(not self.client.get_connect_type())
-
- def test_connect_with_wrong_creds(self):
- '''
- Connecting with invalid password.
- '''
- self.open_stream(xmpp_server_port, wrong_pass=True)
- self.assert_(self.client.get_connect_type())
- self.client_auth(credentials[0], 'wrong pass', credentials[2], sasl=1)
- self.assert_(self.connection.auth is None)
- self.do_disconnect()
+ '''
+ Test Cases class for NonBlockingClient.
+ '''
+ def setUp(self):
+ ''' IdleQueue thread is run and dummy connection is created. '''
+ self.idlequeue_thread = IdleQueueThread()
+ self.connection = MockConnection() # for dummy callbacks
+ self.idlequeue_thread.start()
+
+ def tearDown(self):
+ ''' IdleQueue thread is stopped. '''
+ self.idlequeue_thread.stop_thread()
+ self.idlequeue_thread.join()
+
+ self.client = None
+
+ def open_stream(self, server_port, wrong_pass=False):
+ '''
+ Method opening the XMPP connection. It returns when <stream:features>
+ is received from server.
+
+ :param server_port: tuple of (hostname, port) for where the client should
+ connect.
+ '''
+
+ class TempConnection():
+ def get_password(self, cb):
+ if wrong_pass:
+ cb('wrong pass')
+ else:
+ cb(credentials[1])
+ def on_connect_failure(self):
+ pass
+
+ self.client = client_nb.NonBlockingClient(
+ domain=server_port[0],
+ idlequeue=self.idlequeue_thread.iq,
+ caller=Mock(realClass=TempConnection))
+
+ self.client.connect(
+ hostname=server_port[0],
+ port=server_port[1],
+ on_connect=lambda *args: self.connection.on_connect(True, *args),
+ on_connect_failure=lambda *args: self.connection.on_connect(
+ False, *args))
+
+ self.assert_(self.connection.wait(),
+ msg='waiting for callback from client constructor')
+
+ # if on_connect was called, client has to be connected and vice versa
+ if self.connection.connect_succeeded:
+ self.assert_(self.client.get_connect_type())
+ else:
+ self.assert_(not self.client.get_connect_type())
+
+ def client_auth(self, username, password, resource, sasl):
+ '''
+ Method authenticating connected client with supplied credentials. Returns
+ when authentication is over.
+
+ :param sasl: whether to use sasl (sasl=1) or old (sasl=0) authentication
+ :todo: to check and be more specific about when it returns
+ (bind, session..)
+ '''
+ self.client.auth(username, password, resource, sasl,
+ on_auth=self.connection.on_auth)
+
+ self.assert_(self.connection.wait(), msg='waiting for authentication')
+
+ def do_disconnect(self):
+ '''
+ Does disconnecting of connected client. Returns when TCP connection is
+ closed.
+ '''
+ self.client.RegisterDisconnectHandler(self.connection.set_event)
+ self.client.disconnect()
+
+ self.assertTrue(self.connection.wait(), msg='waiting for disconnecting')
+
+ def test_proper_connect_sasl(self):
+ '''
+ The ideal testcase - client is connected, authenticated with SASL and
+ then disconnected.
+ '''
+ self.open_stream(xmpp_server_port)
+
+ # if client is not connected, lets raise the AssertionError
+ self.assert_(self.client.get_connect_type())
+ # client.disconnect() is already called from NBClient via
+ # _on_connected_failure, no need to call it here
+
+ self.client_auth(credentials[0], credentials[1], credentials[2], sasl=1)
+ self.assert_(self.connection.con)
+ self.assert_(self.connection.auth=='sasl', msg='Unable to auth via SASL')
+
+ self.do_disconnect()
+
+ def test_proper_connect_oldauth(self):
+ '''
+ The ideal testcase - client is connected, authenticated with old auth and
+ then disconnected.
+ '''
+ self.open_stream(xmpp_server_port)
+ self.assert_(self.client.get_connect_type())
+ self.client_auth(credentials[0], credentials[1], credentials[2], sasl=0)
+ self.assert_(self.connection.con)
+ features = self.client.Dispatcher.Stream.features
+ if not features.getTag('auth'):
+ print "Server doesn't support old authentication type, ignoring test"
+ else:
+ self.assert_(self.connection.auth=='old_auth',
+ msg='Unable to auth via old_auth')
+ self.do_disconnect()
+
+ def test_connect_to_nonexisting_host(self):
+ '''
+ Connect to nonexisting host. DNS request for A records should return
+ nothing.
+ '''
+ self.open_stream(('fdsfsdf.fdsf.fss', 5222))
+ self.assert_(not self.client.get_connect_type())
+
+ def test_connect_to_wrong_port(self):
+ '''
+ Connect to nonexisting server. DNS request for A records should return an
+ IP but there shouldn't be XMPP server running on specified port.
+ '''
+ self.open_stream((xmpp_server_port[0], 31337))
+ self.assert_(not self.client.get_connect_type())
+
+ def test_connect_with_wrong_creds(self):
+ '''
+ Connecting with invalid password.
+ '''
+ self.open_stream(xmpp_server_port, wrong_pass=True)
+ self.assert_(self.client.get_connect_type())
+ self.client_auth(credentials[0], 'wrong pass', credentials[2], sasl=1)
+ self.assert_(self.connection.auth is None)
+ self.do_disconnect()
if __name__ == '__main__':
- unittest.main()
-
-# vim: se ts=3:
+ unittest.main()
diff --git a/test/integration/test_xmpp_transports_nb.py b/test/integration/test_xmpp_transports_nb.py
index ef9908903..f7de8acf9 100644
--- a/test/integration/test_xmpp_transports_nb.py
+++ b/test/integration/test_xmpp_transports_nb.py
@@ -14,265 +14,263 @@ from common.xmpp import transports_nb
class AbstractTransportTest(unittest.TestCase):
- ''' Encapsulates Idlequeue instantiation for transports and more...'''
-
- def setUp(self):
- ''' IdleQueue thread is run and dummy connection is created. '''
- self.idlequeue_thread = IdleQueueThread()
- self.idlequeue_thread.start()
- self._setup_hook()
-
- def tearDown(self):
- ''' IdleQueue thread is stopped. '''
- self._teardown_hook()
- self.idlequeue_thread.stop_thread()
- self.idlequeue_thread.join()
-
- def _setup_hook(self):
- pass
-
- def _teardown_hook(self):
- pass
-
- def expect_receive(self, expected, count=1, msg=None):
- '''
- Returns a callback function that will assert whether the data passed to
- it equals the one specified when calling this function.
-
- Can be used to make sure transport dispatch correct data.
- '''
- def receive(data, *args, **kwargs):
- self.assertEqual(data, expected, msg=msg)
- self._expected_count -= 1
- self._expected_count = count
- return receive
-
- def have_received_expected(self):
- '''
- Plays together with expect_receive(). Will return true if expected_rcv
- callback was called as often as specified
- '''
- return self._expected_count == 0
+ ''' Encapsulates Idlequeue instantiation for transports and more...'''
+
+ def setUp(self):
+ ''' IdleQueue thread is run and dummy connection is created. '''
+ self.idlequeue_thread = IdleQueueThread()
+ self.idlequeue_thread.start()
+ self._setup_hook()
+
+ def tearDown(self):
+ ''' IdleQueue thread is stopped. '''
+ self._teardown_hook()
+ self.idlequeue_thread.stop_thread()
+ self.idlequeue_thread.join()
+
+ def _setup_hook(self):
+ pass
+
+ def _teardown_hook(self):
+ pass
+
+ def expect_receive(self, expected, count=1, msg=None):
+ '''
+ Returns a callback function that will assert whether the data passed to
+ it equals the one specified when calling this function.
+
+ Can be used to make sure transport dispatch correct data.
+ '''
+ def receive(data, *args, **kwargs):
+ self.assertEqual(data, expected, msg=msg)
+ self._expected_count -= 1
+ self._expected_count = count
+ return receive
+
+ def have_received_expected(self):
+ '''
+ Plays together with expect_receive(). Will return true if expected_rcv
+ callback was called as often as specified
+ '''
+ return self._expected_count == 0
class TestNonBlockingTCP(AbstractTransportTest):
- '''
- Test class for NonBlockingTCP. Will actually try to connect to an existing
- XMPP server.
- '''
- class MockClient(IdleMock):
- ''' Simple client to test transport functionality '''
- def __init__(self, idlequeue, testcase):
- self.idlequeue = idlequeue
- self.testcase = testcase
- IdleMock.__init__(self)
-
- def do_connect(self, establish_tls=False, proxy_dict=None):
- try:
- ips = socket.getaddrinfo('gajim.org', 5222,
- socket.AF_UNSPEC,socket.SOCK_STREAM)
- ip = ips[0]
- except socket.error, e:
- self.testcase.fail(msg=str(e))
-
- self.socket = transports_nb.NonBlockingTCP(
- raise_event=lambda event_type, data: self.testcase.assertTrue(
- event_type and data),
- on_disconnect=lambda: self.on_success(mode='SocketDisconnect'),
- idlequeue=self.idlequeue,
- estabilish_tls=establish_tls,
- certs=('../data/other/cacerts.pem', 'tmp/cacerts.pem'),
- proxy_dict=proxy_dict)
-
- self.socket.PlugIn(self)
-
- self.socket.connect(conn_5tuple=ip,
- on_connect=lambda: self.on_success(mode='TCPconnect'),
- on_connect_failure=self.on_failure)
- self.testcase.assertTrue(self.wait(), msg='Connection timed out')
-
- def do_disconnect(self):
- self.socket.disconnect()
- self.testcase.assertTrue(self.wait(), msg='Disconnect timed out')
-
- def on_failure(self, err_message):
- self.set_event()
- self.testcase.fail(msg=err_message)
-
- def on_success(self, mode, data=None):
- if mode == "TCPconnect":
- pass
- if mode == "SocketDisconnect":
- pass
- self.set_event()
-
- def _setup_hook(self):
- self.client = self.MockClient(idlequeue=self.idlequeue_thread.iq,
- testcase=self)
-
- def _teardown_hook(self):
- if self.client.socket.state == 'CONNECTED':
- self.client.do_disconnect()
-
- def test_connect_disconnect_plain(self):
- ''' Establish plain connection '''
- self.client.do_connect(establish_tls=False)
- self.assertEquals(self.client.socket.state, 'CONNECTED')
- self.client.do_disconnect()
- self.assertEquals(self.client.socket.state, 'DISCONNECTED')
-
-# def test_connect_disconnect_ssl(self):
-# ''' Establish SSL (not TLS) connection '''
-# self.client.do_connect(establish_tls=True)
-# self.assertEquals(self.client.socket.state, 'CONNECTED')
-# self.client.do_disconnect()
-# self.assertEquals(self.client.socket.state, 'DISCONNECTED')
-
- def test_do_receive(self):
- ''' Test _do_receive method by overwriting socket.recv '''
- self.client.do_connect()
- sock = self.client.socket
-
- # transport shall receive data
- data = "Please don't fail"
- sock._recv = lambda buffer: data
- sock.onreceive(self.expect_receive(data))
- sock._do_receive()
- self.assertTrue(self.have_received_expected(), msg='Did not receive data')
- self.assert_(self.client.socket.state == 'CONNECTED')
-
- # transport shall do nothing as an non-fatal SSL is simulated
- sock._recv = lambda buffer: None
- sock.onreceive(self.assertFalse) # we did not receive anything...
- sock._do_receive()
- self.assert_(self.client.socket.state == 'CONNECTED')
-
- # transport shall disconnect as remote side closed the connection
- sock._recv = lambda buffer: ''
- sock.onreceive(self.assertFalse) # we did not receive anything...
- sock._do_receive()
- self.assert_(self.client.socket.state == 'DISCONNECTED')
-
- def test_do_send(self):
- ''' Test _do_send method by overwriting socket.send '''
- self.client.do_connect()
- sock = self.client.socket
-
- outgoing = [] # what we have actually send to our socket.socket
- data_part1 = "Please don't "
- data_part2 = "fail!"
- data_complete = data_part1 + data_part2
-
- # Simulate everything could be send in one go
- def _send_all(data):
- outgoing.append(data)
- return len(data)
- sock._send = _send_all
- sock.send(data_part1)
- sock.send(data_part2)
- sock._do_send()
- sock._do_send()
- self.assertTrue(self.client.socket.state == 'CONNECTED')
- self.assertTrue(data_part1 in outgoing and data_part2 in outgoing)
- self.assertFalse(sock.sendqueue and sock.sendbuff,
- msg='There is still unsend data in buffers')
-
- # Simulate data could only be sent in chunks
- self.chunk_count = 0
- outgoing = []
- def _send_chunks(data):
- if self.chunk_count == 0:
- outgoing.append(data_part1)
- self.chunk_count += 1
- return len(data_part1)
- else:
- outgoing.append(data_part2)
- return len(data_part2)
- sock._send = _send_chunks
- sock.send(data_complete)
- sock._do_send() # process first chunk
- sock._do_send() # process the second one
- self.assertTrue(self.client.socket.state == 'CONNECTED')
- self.assertTrue(data_part1 in outgoing and data_part2 in outgoing)
- self.assertFalse(sock.sendqueue and sock.sendbuff,
- msg='There is still unsend data in buffers')
+ '''
+ Test class for NonBlockingTCP. Will actually try to connect to an existing
+ XMPP server.
+ '''
+ class MockClient(IdleMock):
+ ''' Simple client to test transport functionality '''
+ def __init__(self, idlequeue, testcase):
+ self.idlequeue = idlequeue
+ self.testcase = testcase
+ IdleMock.__init__(self)
+
+ def do_connect(self, establish_tls=False, proxy_dict=None):
+ try:
+ ips = socket.getaddrinfo('gajim.org', 5222,
+ socket.AF_UNSPEC,socket.SOCK_STREAM)
+ ip = ips[0]
+ except socket.error, e:
+ self.testcase.fail(msg=str(e))
+
+ self.socket = transports_nb.NonBlockingTCP(
+ raise_event=lambda event_type, data: self.testcase.assertTrue(
+ event_type and data),
+ on_disconnect=lambda: self.on_success(mode='SocketDisconnect'),
+ idlequeue=self.idlequeue,
+ estabilish_tls=establish_tls,
+ certs=('../data/other/cacerts.pem', 'tmp/cacerts.pem'),
+ proxy_dict=proxy_dict)
+
+ self.socket.PlugIn(self)
+
+ self.socket.connect(conn_5tuple=ip,
+ on_connect=lambda: self.on_success(mode='TCPconnect'),
+ on_connect_failure=self.on_failure)
+ self.testcase.assertTrue(self.wait(), msg='Connection timed out')
+
+ def do_disconnect(self):
+ self.socket.disconnect()
+ self.testcase.assertTrue(self.wait(), msg='Disconnect timed out')
+
+ def on_failure(self, err_message):
+ self.set_event()
+ self.testcase.fail(msg=err_message)
+
+ def on_success(self, mode, data=None):
+ if mode == "TCPconnect":
+ pass
+ if mode == "SocketDisconnect":
+ pass
+ self.set_event()
+
+ def _setup_hook(self):
+ self.client = self.MockClient(idlequeue=self.idlequeue_thread.iq,
+ testcase=self)
+
+ def _teardown_hook(self):
+ if self.client.socket.state == 'CONNECTED':
+ self.client.do_disconnect()
+
+ def test_connect_disconnect_plain(self):
+ ''' Establish plain connection '''
+ self.client.do_connect(establish_tls=False)
+ self.assertEquals(self.client.socket.state, 'CONNECTED')
+ self.client.do_disconnect()
+ self.assertEquals(self.client.socket.state, 'DISCONNECTED')
+
+# def test_connect_disconnect_ssl(self):
+# ''' Establish SSL (not TLS) connection '''
+# self.client.do_connect(establish_tls=True)
+# self.assertEquals(self.client.socket.state, 'CONNECTED')
+# self.client.do_disconnect()
+# self.assertEquals(self.client.socket.state, 'DISCONNECTED')
+
+ def test_do_receive(self):
+ ''' Test _do_receive method by overwriting socket.recv '''
+ self.client.do_connect()
+ sock = self.client.socket
+
+ # transport shall receive data
+ data = "Please don't fail"
+ sock._recv = lambda buffer: data
+ sock.onreceive(self.expect_receive(data))
+ sock._do_receive()
+ self.assertTrue(self.have_received_expected(), msg='Did not receive data')
+ self.assert_(self.client.socket.state == 'CONNECTED')
+
+ # transport shall do nothing as an non-fatal SSL is simulated
+ sock._recv = lambda buffer: None
+ sock.onreceive(self.assertFalse) # we did not receive anything...
+ sock._do_receive()
+ self.assert_(self.client.socket.state == 'CONNECTED')
+
+ # transport shall disconnect as remote side closed the connection
+ sock._recv = lambda buffer: ''
+ sock.onreceive(self.assertFalse) # we did not receive anything...
+ sock._do_receive()
+ self.assert_(self.client.socket.state == 'DISCONNECTED')
+
+ def test_do_send(self):
+ ''' Test _do_send method by overwriting socket.send '''
+ self.client.do_connect()
+ sock = self.client.socket
+
+ outgoing = [] # what we have actually send to our socket.socket
+ data_part1 = "Please don't "
+ data_part2 = "fail!"
+ data_complete = data_part1 + data_part2
+
+ # Simulate everything could be send in one go
+ def _send_all(data):
+ outgoing.append(data)
+ return len(data)
+ sock._send = _send_all
+ sock.send(data_part1)
+ sock.send(data_part2)
+ sock._do_send()
+ sock._do_send()
+ self.assertTrue(self.client.socket.state == 'CONNECTED')
+ self.assertTrue(data_part1 in outgoing and data_part2 in outgoing)
+ self.assertFalse(sock.sendqueue and sock.sendbuff,
+ msg='There is still unsend data in buffers')
+
+ # Simulate data could only be sent in chunks
+ self.chunk_count = 0
+ outgoing = []
+ def _send_chunks(data):
+ if self.chunk_count == 0:
+ outgoing.append(data_part1)
+ self.chunk_count += 1
+ return len(data_part1)
+ else:
+ outgoing.append(data_part2)
+ return len(data_part2)
+ sock._send = _send_chunks
+ sock.send(data_complete)
+ sock._do_send() # process first chunk
+ sock._do_send() # process the second one
+ self.assertTrue(self.client.socket.state == 'CONNECTED')
+ self.assertTrue(data_part1 in outgoing and data_part2 in outgoing)
+ self.assertFalse(sock.sendqueue and sock.sendbuff,
+ msg='There is still unsend data in buffers')
class TestNonBlockingHTTP(AbstractTransportTest):
- ''' Test class for NonBlockingHTTP transport'''
-
- bosh_http_dict = {
- 'http_uri': 'http://gajim.org:5280/http-bind',
- 'http_version': 'HTTP/1.1',
- 'http_persistent': True,
- 'add_proxy_headers': False
- }
-
- def _get_transport(self, http_dict, proxy_dict=None):
- return transports_nb.NonBlockingHTTP(
- raise_event=None,
- on_disconnect=None,
- idlequeue=self.idlequeue_thread.iq,
- estabilish_tls=False,
- certs=None,
- on_http_request_possible=lambda: None,
- on_persistent_fallback=None,
- http_dict=http_dict,
- proxy_dict=proxy_dict,
- )
-
- def test_parse_own_http_message(self):
- ''' Build a HTTP message and try to parse it afterwards '''
- transport = self._get_transport(self.bosh_http_dict)
-
- data = "<test>Please don't fail!</test>"
- http_message = transport.build_http_message(data)
- statusline, headers, http_body, buffer_rest = transport.parse_http_message(
- http_message)
-
- self.assertFalse(bool(buffer_rest))
- self.assertTrue(statusline and isinstance(statusline, list))
- self.assertTrue(headers and isinstance(headers, dict))
- self.assertEqual(data, http_body, msg='Input and output are different')
-
- def test_receive_http_message(self):
- ''' Let _on_receive handle some http messages '''
- transport = self._get_transport(self.bosh_http_dict)
-
- header = ("HTTP/1.1 200 OK\r\nContent-Type: text/xml; charset=utf-8\r\n" +
- "Content-Length: 88\r\n\r\n")
- payload = "<test>Please don't fail!</test>"
- body = "<body xmlns='http://jabber.org/protocol/httpbind'>%s</body>" \
- % payload
- message = "%s%s" % (header, body)
-
- # try to receive in one go
- transport.onreceive(self.expect_receive(body, msg='Failed: In one go'))
- transport._on_receive(message)
- self.assertTrue(self.have_received_expected(), msg='Failed: In one go')
-
- def test_receive_http_message_in_chunks(self):
- ''' Let _on_receive handle some chunked http messages '''
- transport = self._get_transport(self.bosh_http_dict)
-
- payload = "<test>Please don't fail!\n\n</test>"
- body = "<body xmlns='http://jabber.org/protocol/httpbind'>%s</body>" \
- % payload
- header = "HTTP/1.1 200 OK\r\nContent-Type: text/xml; charset=utf-8\r\n" +\
- "Content-Length: %i\r\n\r\n" % len(body)
- message = "%s%s" % (header, body)
-
- chunk1, chunk2, chunk3, chunk4 = message[:20], message[20:73], \
- message[73:85], message[85:]
- nextmessage_chunk = "\r\n\r\nHTTP/1.1 200 OK\r\nContent-Type: text/x"
- chunks = (chunk1, chunk2, chunk3, chunk4, nextmessage_chunk)
-
- transport.onreceive(self.expect_receive(body, msg='Failed: In chunks'))
- for chunk in chunks:
- transport._on_receive(chunk)
- self.assertTrue(self.have_received_expected(), msg='Failed: In chunks')
+ ''' Test class for NonBlockingHTTP transport'''
+
+ bosh_http_dict = {
+ 'http_uri': 'http://gajim.org:5280/http-bind',
+ 'http_version': 'HTTP/1.1',
+ 'http_persistent': True,
+ 'add_proxy_headers': False
+ }
+
+ def _get_transport(self, http_dict, proxy_dict=None):
+ return transports_nb.NonBlockingHTTP(
+ raise_event=None,
+ on_disconnect=None,
+ idlequeue=self.idlequeue_thread.iq,
+ estabilish_tls=False,
+ certs=None,
+ on_http_request_possible=lambda: None,
+ on_persistent_fallback=None,
+ http_dict=http_dict,
+ proxy_dict=proxy_dict,
+ )
+
+ def test_parse_own_http_message(self):
+ ''' Build a HTTP message and try to parse it afterwards '''
+ transport = self._get_transport(self.bosh_http_dict)
+
+ data = "<test>Please don't fail!</test>"
+ http_message = transport.build_http_message(data)
+ statusline, headers, http_body, buffer_rest = transport.parse_http_message(
+ http_message)
+
+ self.assertFalse(bool(buffer_rest))
+ self.assertTrue(statusline and isinstance(statusline, list))
+ self.assertTrue(headers and isinstance(headers, dict))
+ self.assertEqual(data, http_body, msg='Input and output are different')
+
+ def test_receive_http_message(self):
+ ''' Let _on_receive handle some http messages '''
+ transport = self._get_transport(self.bosh_http_dict)
+
+ header = ("HTTP/1.1 200 OK\r\nContent-Type: text/xml; charset=utf-8\r\n" +
+ "Content-Length: 88\r\n\r\n")
+ payload = "<test>Please don't fail!</test>"
+ body = "<body xmlns='http://jabber.org/protocol/httpbind'>%s</body>" \
+ % payload
+ message = "%s%s" % (header, body)
+
+ # try to receive in one go
+ transport.onreceive(self.expect_receive(body, msg='Failed: In one go'))
+ transport._on_receive(message)
+ self.assertTrue(self.have_received_expected(), msg='Failed: In one go')
+
+ def test_receive_http_message_in_chunks(self):
+ ''' Let _on_receive handle some chunked http messages '''
+ transport = self._get_transport(self.bosh_http_dict)
+
+ payload = "<test>Please don't fail!\n\n</test>"
+ body = "<body xmlns='http://jabber.org/protocol/httpbind'>%s</body>" \
+ % payload
+ header = "HTTP/1.1 200 OK\r\nContent-Type: text/xml; charset=utf-8\r\n" +\
+ "Content-Length: %i\r\n\r\n" % len(body)
+ message = "%s%s" % (header, body)
+
+ chunk1, chunk2, chunk3, chunk4 = message[:20], message[20:73], \
+ message[73:85], message[85:]
+ nextmessage_chunk = "\r\n\r\nHTTP/1.1 200 OK\r\nContent-Type: text/x"
+ chunks = (chunk1, chunk2, chunk3, chunk4, nextmessage_chunk)
+
+ transport.onreceive(self.expect_receive(body, msg='Failed: In chunks'))
+ for chunk in chunks:
+ transport._on_receive(chunk)
+ self.assertTrue(self.have_received_expected(), msg='Failed: In chunks')
if __name__ == '__main__':
- unittest.main()
-
-# vim: se ts=3:
+ unittest.main()