Welcome to mirror list, hosted at ThFree Co, Russian Federation.

dev.gajim.org/gajim/gajim.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorÉric Araujo <merwok@netwok.org>2010-02-08 17:08:40 +0300
committerÉric Araujo <merwok@netwok.org>2010-02-08 17:08:40 +0300
commitfedd7dc8e22950f0dbe297443860662368c249d1 (patch)
treea6426601d50b3020348fd6574107c9fd6f9dfb0e /test
parent1a69ea93f1316e9f98d50ac8a212498b42d3cf60 (diff)
convert tabs to spaces in source code thanks to reindent.py
holy diff batman!
Diffstat (limited to 'test')
-rw-r--r--test/integration/__init__.py2
-rw-r--r--test/integration/test_gui_event_integration.py234
-rw-r--r--test/integration/test_resolver.py158
-rw-r--r--test/integration/test_roster.py336
-rw-r--r--test/integration/test_xmpp_client_nb.py282
-rw-r--r--test/integration/test_xmpp_transports_nb.py506
-rw-r--r--test/lib/__init__.py40
-rwxr-xr-xtest/lib/data.py124
-rw-r--r--test/lib/gajim_mocks.py200
-rw-r--r--test/lib/mock.py2
-rw-r--r--test/lib/notify.py10
-rw-r--r--test/lib/xmpp_mocks.py156
-rwxr-xr-xtest/runtests.py72
-rw-r--r--test/unit/__init__.py2
-rw-r--r--test/unit/test_account.py14
-rw-r--r--test/unit/test_caps_cache.py178
-rw-r--r--test/unit/test_contacts.py218
-rw-r--r--test/unit/test_gui_interface.py186
-rw-r--r--test/unit/test_protocol_caps.py94
-rw-r--r--test/unit/test_sessions.py272
-rw-r--r--test/unit/test_xmpp_dispatcher_nb.py162
-rw-r--r--test/unit/test_xmpp_transports_nb.py124
22 files changed, 1669 insertions, 1703 deletions
diff --git a/test/integration/__init__.py b/test/integration/__init__.py
index 0adf844f1..aaeacfbef 100644
--- a/test/integration/__init__.py
+++ b/test/integration/__init__.py
@@ -3,4 +3,4 @@
This package contains integration tests. Integration tests are tests
which require or include UI, network or both.
-''' \ No newline at end of file
+'''
diff --git a/test/integration/test_gui_event_integration.py b/test/integration/test_gui_event_integration.py
index a1eadea60..c5999389f 100644
--- a/test/integration/test_gui_event_integration.py
+++ b/test/integration/test_gui_event_integration.py
@@ -23,171 +23,169 @@ import roster_window
import notify
class TestStatusChange(unittest.TestCase):
- '''tests gajim.py's incredibly complex handle_event_notify'''
+ '''tests gajim.py's incredibly complex handle_event_notify'''
- def setUp(self):
-
- gajim.connections = {}
- gajim.contacts = contacts_module.LegacyContactsAPI()
- gajim.interface.roster = roster_window.RosterWindow()
+ def setUp(self):
- for acc in contacts:
- gajim.connections[acc] = MockConnection(acc)
+ gajim.connections = {}
+ gajim.contacts = contacts_module.LegacyContactsAPI()
+ gajim.interface.roster = roster_window.RosterWindow()
- gajim.interface.roster.fill_contacts_and_groups_dicts(contacts[acc],
- acc)
- gajim.interface.roster.add_account(acc)
- gajim.interface.roster.add_account_contacts(acc)
+ for acc in contacts:
+ gajim.connections[acc] = MockConnection(acc)
- self.assertEqual(0, len(notify.notifications))
+ gajim.interface.roster.fill_contacts_and_groups_dicts(contacts[acc],
+ acc)
+ gajim.interface.roster.add_account(acc)
+ gajim.interface.roster.add_account_contacts(acc)
- def tearDown(self):
- notify.notifications = []
+ self.assertEqual(0, len(notify.notifications))
- def contact_comes_online(self, account, jid, resource, prio):
- '''a remote contact comes online'''
- gajim.interface.handle_event_notify(account, (jid, 'online', "I'm back!",
- resource, prio, None, time.time(), None))
+ def tearDown(self):
+ notify.notifications = []
- contact = None
- for c in gajim.contacts.get_contacts(account, jid):
- if c.resource == resource:
- contact = c
- break
+ def contact_comes_online(self, account, jid, resource, prio):
+ '''a remote contact comes online'''
+ gajim.interface.handle_event_notify(account, (jid, 'online', "I'm back!",
+ resource, prio, None, time.time(), None))
- self.assertEqual('online', contact.show)
- self.assertEqual("I'm back!", contact.status)
- self.assertEqual(prio, contact.priority)
+ contact = None
+ for c in gajim.contacts.get_contacts(account, jid):
+ if c.resource == resource:
+ contact = c
+ break
- # the most recent notification is that the contact connected
- self.assertEqual('contact_connected', notify.notifications[-1][0])
+ self.assertEqual('online', contact.show)
+ self.assertEqual("I'm back!", contact.status)
+ self.assertEqual(prio, contact.priority)
- def contact_goes_offline(self, account, jid, resource, prio,
- still_exists = True):
- '''a remote contact goes offline.'''
- gajim.interface.handle_event_notify(account, (jid, 'offline', 'Goodbye!',
- resource, prio, None, time.time(), None))
+ # the most recent notification is that the contact connected
+ self.assertEqual('contact_connected', notify.notifications[-1][0])
- contact = None
- for c in gajim.contacts.get_contacts(account, jid):
- if c.resource == resource:
- contact = c
- break
+ def contact_goes_offline(self, account, jid, resource, prio,
+ still_exists = True):
+ '''a remote contact goes offline.'''
+ gajim.interface.handle_event_notify(account, (jid, 'offline', 'Goodbye!',
+ resource, prio, None, time.time(), None))
- if not still_exists:
- self.assert_(contact is None)
- return
+ contact = None
+ for c in gajim.contacts.get_contacts(account, jid):
+ if c.resource == resource:
+ contact = c
+ break
- self.assertEqual('offline', contact.show)
- self.assertEqual('Goodbye!', contact.status)
- self.assertEqual(prio, contact.priority)
+ if not still_exists:
+ self.assert_(contact is None)
+ return
- self.assertEqual('contact_disconnected', notify.notifications[-1][0])
+ self.assertEqual('offline', contact.show)
+ self.assertEqual('Goodbye!', contact.status)
+ self.assertEqual(prio, contact.priority)
- def user_starts_chatting(self, jid, account, resource=None):
- '''the user opens a chat window and starts talking'''
- ctrl = MockChatControl(jid, account)
- win = MockWindow()
- win.new_tab(ctrl)
- gajim.interface.msg_win_mgr._windows['test'] = win
+ self.assertEqual('contact_disconnected', notify.notifications[-1][0])
- if resource:
- jid = jid + '/' + resource
+ def user_starts_chatting(self, jid, account, resource=None):
+ '''the user opens a chat window and starts talking'''
+ ctrl = MockChatControl(jid, account)
+ win = MockWindow()
+ win.new_tab(ctrl)
+ gajim.interface.msg_win_mgr._windows['test'] = win
- # a basic session is started
- session = gajim.connections[account1].make_new_session(jid,
- '01234567890abcdef', cls=MockSession)
- ctrl.set_session(session)
+ if resource:
+ jid = jid + '/' + resource
- return ctrl
+ # a basic session is started
+ session = gajim.connections[account1].make_new_session(jid,
+ '01234567890abcdef', cls=MockSession)
+ ctrl.set_session(session)
- def user_starts_esession(self, jid, resource, account):
- '''the user opens a chat window and starts an encrypted session'''
- ctrl = self.user_starts_chatting(jid, account, resource)
- ctrl.session.status = 'active'
- ctrl.session.enable_encryption = True
+ return ctrl
- return ctrl
+ def user_starts_esession(self, jid, resource, account):
+ '''the user opens a chat window and starts an encrypted session'''
+ ctrl = self.user_starts_chatting(jid, account, resource)
+ ctrl.session.status = 'active'
+ ctrl.session.enable_encryption = True
- def test_contact_comes_online(self):
- jid = 'default1@gajim.org'
+ return ctrl
- # contact is offline initially
- contacts = gajim.contacts.get_contacts(account1, jid)
- self.assertEqual(1, len(contacts))
- self.assertEqual('offline', contacts[0].show)
- self.assertEqual('', contacts[0].status)
+ def test_contact_comes_online(self):
+ jid = 'default1@gajim.org'
- self.contact_comes_online(account1, jid, 'lowprio', 1)
+ # contact is offline initially
+ contacts = gajim.contacts.get_contacts(account1, jid)
+ self.assertEqual(1, len(contacts))
+ self.assertEqual('offline', contacts[0].show)
+ self.assertEqual('', contacts[0].status)
- def test_contact_goes_offline(self):
- jid = 'default1@gajim.org'
+ self.contact_comes_online(account1, jid, 'lowprio', 1)
- self.contact_comes_online(account1, jid, 'lowprio', 1)
+ def test_contact_goes_offline(self):
+ jid = 'default1@gajim.org'
- ctrl = self.user_starts_chatting(jid, account1)
- orig_sess = ctrl.session
+ self.contact_comes_online(account1, jid, 'lowprio', 1)
- self.contact_goes_offline(account1, jid, 'lowprio', 1)
+ ctrl = self.user_starts_chatting(jid, account1)
+ orig_sess = ctrl.session
- # session hasn't changed since we were talking to the bare jid
- self.assertEqual(orig_sess, ctrl.session)
+ self.contact_goes_offline(account1, jid, 'lowprio', 1)
- def test_two_resources_higher_comes_online(self):
- jid = 'default1@gajim.org'
+ # session hasn't changed since we were talking to the bare jid
+ self.assertEqual(orig_sess, ctrl.session)
- self.contact_comes_online(account1, jid, 'lowprio', 1)
+ def test_two_resources_higher_comes_online(self):
+ jid = 'default1@gajim.org'
- ctrl = self.user_starts_chatting(jid, account1)
+ self.contact_comes_online(account1, jid, 'lowprio', 1)
- self.contact_comes_online(account1, jid, 'highprio', 50)
+ ctrl = self.user_starts_chatting(jid, account1)
- # old session was dropped
- self.assertEqual(None, ctrl.session)
+ self.contact_comes_online(account1, jid, 'highprio', 50)
- def test_two_resources_higher_goes_offline(self):
- jid = 'default1@gajim.org'
+ # old session was dropped
+ self.assertEqual(None, ctrl.session)
- self.contact_comes_online(account1, jid, 'lowprio', 1)
- self.contact_comes_online(account1, jid, 'highprio', 50)
+ def test_two_resources_higher_goes_offline(self):
+ jid = 'default1@gajim.org'
- ctrl = self.user_starts_chatting(jid, account1)
+ self.contact_comes_online(account1, jid, 'lowprio', 1)
+ self.contact_comes_online(account1, jid, 'highprio', 50)
- self.contact_goes_offline(account1, jid, 'highprio', 50,
- still_exists=False)
+ ctrl = self.user_starts_chatting(jid, account1)
- # old session was dropped
- self.assertEqual(None, ctrl.session)
+ self.contact_goes_offline(account1, jid, 'highprio', 50,
+ still_exists=False)
- def test_two_resources_higher_comes_online_with_esession(self):
- jid = 'default1@gajim.org'
+ # old session was dropped
+ self.assertEqual(None, ctrl.session)
- self.contact_comes_online(account1, jid, 'lowprio', 1)
+ def test_two_resources_higher_comes_online_with_esession(self):
+ jid = 'default1@gajim.org'
- ctrl = self.user_starts_esession(jid, 'lowprio', account1)
+ self.contact_comes_online(account1, jid, 'lowprio', 1)
- self.contact_comes_online(account1, jid, 'highprio', 50)
+ ctrl = self.user_starts_esession(jid, 'lowprio', account1)
- # session was associated with the low priority full jid, so it should
- # have been removed from the control
- self.assertEqual(None, ctrl.session)
+ self.contact_comes_online(account1, jid, 'highprio', 50)
- def test_two_resources_higher_goes_offline_with_esession(self):
- jid = 'default1@gajim.org'
+ # session was associated with the low priority full jid, so it should
+ # have been removed from the control
+ self.assertEqual(None, ctrl.session)
- self.contact_comes_online(account1, jid, 'lowprio', 1)
- self.contact_comes_online(account1, jid, 'highprio', 50)
+ def test_two_resources_higher_goes_offline_with_esession(self):
+ jid = 'default1@gajim.org'
- ctrl = self.user_starts_esession(jid, 'highprio', account1)
+ self.contact_comes_online(account1, jid, 'lowprio', 1)
+ self.contact_comes_online(account1, jid, 'highprio', 50)
- self.contact_goes_offline(account1, jid, 'highprio', 50,
- still_exists=False)
+ ctrl = self.user_starts_esession(jid, 'highprio', account1)
- # session was associated with the high priority full jid, so it should
- # have been removed from the control
- self.assertEqual(None, ctrl.session)
+ self.contact_goes_offline(account1, jid, 'highprio', 50,
+ still_exists=False)
-if __name__ == '__main__':
- unittest.main()
+ # session was associated with the high priority full jid, so it should
+ # have been removed from the control
+ self.assertEqual(None, ctrl.session)
-# vim: se ts=3:
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/integration/test_resolver.py b/test/integration/test_resolver.py
index 1836d3c7b..d80ffee87 100644
--- a/test/integration/test_resolver.py
+++ b/test/integration/test_resolver.py
@@ -16,87 +16,85 @@ NONSENSE_NAME = 'sfsdfsdfsdf.sdfs.fsd'
JABBERCZ_TXT_NAME = '_xmppconnect.jabber.cz'
JABBERCZ_SRV_NAME = '_xmpp-client._tcp.jabber.cz'
-TEST_LIST = [(GMAIL_SRV_NAME, 'srv', True),
- (NONSENSE_NAME, 'srv', False),
- (JABBERCZ_SRV_NAME, 'srv', True)]
+TEST_LIST = [(GMAIL_SRV_NAME, 'srv', True),
+ (NONSENSE_NAME, 'srv', False),
+ (JABBERCZ_SRV_NAME, 'srv', True)]
class TestResolver(unittest.TestCase):
- '''
- Test for LibAsyncNSResolver and NSLookupResolver. Requires working
- network connection.
- '''
- def setUp(self):
- self.idlequeue_thread = IdleQueueThread()
- self.idlequeue_thread.start()
-
- self.iq = self.idlequeue_thread.iq
- self._reset()
- self.resolver = None
-
- def tearDown(self):
- self.idlequeue_thread.stop_thread()
- self.idlequeue_thread.join()
-
- def _reset(self):
- self.flag = False
- self.expect_results = False
- self.nslookup = False
- self.resolver = None
-
- def testLibAsyncNSResolver(self):
- self._reset()
- if not resolver.USE_LIBASYNCNS:
- print 'testLibAsyncResolver: libasyncns-python not installed'
- return
- self.resolver = resolver.LibAsyncNSResolver()
-
- for name, type, expect_results in TEST_LIST:
- self.expect_results = expect_results
- self._runLANSR(name, type)
- self.flag = False
-
- def _runLANSR(self, name, type):
- self.resolver.resolve(
- host = name,
- type = type,
- on_ready = self._myonready)
- while not self.flag:
- time.sleep(1)
- self.resolver.process()
-
- def _myonready(self, name, result_set):
- if __name__ == '__main__':
- from pprint import pprint
- pprint('on_ready called ...')
- pprint('hostname: %s' % name)
- pprint('result set: %s' % result_set)
- pprint('res.resolved_hosts: %s' % self.resolver.resolved_hosts)
- pprint('')
- if self.expect_results:
- self.assert_(len(result_set) > 0)
- else:
- self.assert_(result_set == [])
- self.flag = True
- if self.nslookup:
- self._testNSLR()
-
- def testNSLookupResolver(self):
- self._reset()
- self.nslookup = True
- self.resolver = resolver.NSLookupResolver(self.iq)
- self.test_list = TEST_LIST
- self._testNSLR()
-
- def _testNSLR(self):
- if self.test_list == []:
- return
- name, type, self.expect_results = self.test_list.pop()
- self.resolver.resolve(
- host = name,
- type = type,
- on_ready = self._myonready)
+ '''
+ Test for LibAsyncNSResolver and NSLookupResolver. Requires working
+ network connection.
+ '''
+ def setUp(self):
+ self.idlequeue_thread = IdleQueueThread()
+ self.idlequeue_thread.start()
+
+ self.iq = self.idlequeue_thread.iq
+ self._reset()
+ self.resolver = None
+
+ def tearDown(self):
+ self.idlequeue_thread.stop_thread()
+ self.idlequeue_thread.join()
+
+ def _reset(self):
+ self.flag = False
+ self.expect_results = False
+ self.nslookup = False
+ self.resolver = None
+
+ def testLibAsyncNSResolver(self):
+ self._reset()
+ if not resolver.USE_LIBASYNCNS:
+ print 'testLibAsyncResolver: libasyncns-python not installed'
+ return
+ self.resolver = resolver.LibAsyncNSResolver()
+
+ for name, type, expect_results in TEST_LIST:
+ self.expect_results = expect_results
+ self._runLANSR(name, type)
+ self.flag = False
+
+ def _runLANSR(self, name, type):
+ self.resolver.resolve(
+ host = name,
+ type = type,
+ on_ready = self._myonready)
+ while not self.flag:
+ time.sleep(1)
+ self.resolver.process()
+
+ def _myonready(self, name, result_set):
+ if __name__ == '__main__':
+ from pprint import pprint
+ pprint('on_ready called ...')
+ pprint('hostname: %s' % name)
+ pprint('result set: %s' % result_set)
+ pprint('res.resolved_hosts: %s' % self.resolver.resolved_hosts)
+ pprint('')
+ if self.expect_results:
+ self.assert_(len(result_set) > 0)
+ else:
+ self.assert_(result_set == [])
+ self.flag = True
+ if self.nslookup:
+ self._testNSLR()
+
+ def testNSLookupResolver(self):
+ self._reset()
+ self.nslookup = True
+ self.resolver = resolver.NSLookupResolver(self.iq)
+ self.test_list = TEST_LIST
+ self._testNSLR()
+
+ def _testNSLR(self):
+ if self.test_list == []:
+ return
+ name, type, self.expect_results = self.test_list.pop()
+ self.resolver.resolve(
+ host = name,
+ type = type,
+ on_ready = self._myonready)
if __name__ == '__main__':
- unittest.main()
-
-# vim: se ts=3:
+ unittest.main()
diff --git a/test/integration/test_roster.py b/test/integration/test_roster.py
index 0be85aa11..8b71df122 100644
--- a/test/integration/test_roster.py
+++ b/test/integration/test_roster.py
@@ -17,189 +17,187 @@ gajim.get_jid_from_account = lambda acc: 'myjid@' + acc
class TestRosterWindow(unittest.TestCase):
- def setUp(self):
- gajim.interface = MockInterface()
-
- self.C_NAME = roster_window.C_NAME
- self.C_TYPE = roster_window.C_TYPE
- self.C_JID = roster_window.C_JID
- self.C_ACCOUNT = roster_window.C_ACCOUNT
-
- # Add after creating RosterWindow
- # We want to test the filling explicitly
- gajim.contacts = contacts_module.LegacyContactsAPI()
- gajim.connections = {}
- self.roster = roster_window.RosterWindow()
-
- for acc in contacts:
- gajim.connections[acc] = MockConnection(acc)
- gajim.contacts.add_account(acc)
-
- ### Custom assertions
- def assert_all_contacts_are_in_roster(self, acc):
- for jid in contacts[acc]:
- self.assert_contact_is_in_roster(jid, acc)
-
- def assert_contact_is_in_roster(self, jid, account):
- contacts = gajim.contacts.get_contacts(account, jid)
- # check for all resources
- for contact in contacts:
- iters = self.roster._get_contact_iter(jid, account,
- model=self.roster.model)
-
- if jid != gajim.get_jid_from_account(account):
- # We don't care for groups of SelfContact
- self.assertTrue(len(iters) == len(contact.get_shown_groups()),
- msg='Contact is not in all his groups')
-
- # Are we big brother?
- bb_jid = None
- bb_account = None
- family = gajim.contacts.get_metacontacts_family(account, jid)
- if family:
- nearby_family, bb_jid, bb_account = \
- self.roster._get_nearby_family_and_big_brother(family, account)
-
- is_in_nearby_family = (jid, account) in (
- (data['jid'], data['account']) for data in nearby_family)
- self.assertTrue(is_in_nearby_family,
- msg='Contact not in his own nearby family')
-
- is_big_brother = (bb_jid, bb_account) == (jid, account)
-
- # check for each group tag
- for titerC in iters:
- self.assertTrue(self.roster.model.iter_is_valid(titerC),
- msg='Contact iter invalid')
-
- c_model = self.roster.model[titerC]
- self.assertEquals(contact.get_shown_name(), c_model[self.C_NAME],
- msg='Contact name missmatch')
- self.assertEquals(contact.jid, c_model[self.C_JID],
- msg='Jid missmatch')
-
- if not self.roster.regroup:
- self.assertEquals(account, c_model[self.C_ACCOUNT],
- msg='Account missmatch')
-
- # Check for correct nesting
- parent_iter = self.roster.model.iter_parent(titerC)
- p_model = self.roster.model[parent_iter]
- if family:
- if is_big_brother:
- self.assertTrue(p_model[self.C_TYPE] == 'group',
- msg='Big Brother is not on top')
- else:
- self.assertTrue(p_model[self.C_TYPE] == 'contact',
- msg='Little Brother brother has no BigB')
- else:
- if jid == gajim.get_jid_from_account(account):
- self.assertTrue(p_model[self.C_TYPE] == 'account',
- msg='SelfContact is not on top')
- else:
- self.assertTrue(p_model[self.C_TYPE] == 'group',
- msg='Contact not found in a group')
-
- def assert_group_is_in_roster(self, group, account):
- #TODO
- pass
-
- def assert_account_is_in_roster(self, acc):
- titerA = self.roster._get_account_iter(acc, model=self.roster.model)
- self.assertTrue(self.roster.model.iter_is_valid(titerA),
- msg='Account iter is invalid')
-
- acc_model = self.roster.model[titerA]
- self.assertEquals(acc_model[self.C_TYPE], 'account',
- msg='No account found')
-
- if not self.roster.regroup:
- self.assertEquals(acc_model[self.C_ACCOUNT], acc,
- msg='Account not found')
-
- self_jid = gajim.get_jid_from_account(acc)
- self.assertEquals(acc_model[self.C_JID], self_jid,
- msg='Account JID not found in account row')
-
- def assert_model_is_in_sync(self):
- #TODO: check that iter_n_children returns the correct numbers
- pass
-
- # tests
- def test_fill_contacts_and_groups_dicts(self):
- for acc in contacts:
- self.roster.fill_contacts_and_groups_dicts(contacts[acc], acc)
-
- for jid in contacts[acc]:
- instances = gajim.contacts.get_contacts(acc, jid)
-
- # Created a contact for each single jid?
- self.assertTrue(len(instances) == 1)
-
- # Contacts kept their info
- contact = instances[0]
- self.assertEquals(contact.groups, contacts[acc][jid]['groups'],
- msg='Group Missmatch')
-
- groups = contacts[acc][jid]['groups'] or ['General',]
-
- def test_fill_roster_model(self):
- for acc in contacts:
- self.roster.fill_contacts_and_groups_dicts(contacts[acc], acc)
-
- self.roster.add_account(acc)
- self.assert_account_is_in_roster(acc)
-
- self.roster.add_account_contacts(acc)
- self.assert_all_contacts_are_in_roster(acc)
-
- self.assert_model_is_in_sync()
+ def setUp(self):
+ gajim.interface = MockInterface()
+
+ self.C_NAME = roster_window.C_NAME
+ self.C_TYPE = roster_window.C_TYPE
+ self.C_JID = roster_window.C_JID
+ self.C_ACCOUNT = roster_window.C_ACCOUNT
+
+ # Add after creating RosterWindow
+ # We want to test the filling explicitly
+ gajim.contacts = contacts_module.LegacyContactsAPI()
+ gajim.connections = {}
+ self.roster = roster_window.RosterWindow()
+
+ for acc in contacts:
+ gajim.connections[acc] = MockConnection(acc)
+ gajim.contacts.add_account(acc)
+
+ ### Custom assertions
+ def assert_all_contacts_are_in_roster(self, acc):
+ for jid in contacts[acc]:
+ self.assert_contact_is_in_roster(jid, acc)
+
+ def assert_contact_is_in_roster(self, jid, account):
+ contacts = gajim.contacts.get_contacts(account, jid)
+ # check for all resources
+ for contact in contacts:
+ iters = self.roster._get_contact_iter(jid, account,
+ model=self.roster.model)
+
+ if jid != gajim.get_jid_from_account(account):
+ # We don't care for groups of SelfContact
+ self.assertTrue(len(iters) == len(contact.get_shown_groups()),
+ msg='Contact is not in all his groups')
+
+ # Are we big brother?
+ bb_jid = None
+ bb_account = None
+ family = gajim.contacts.get_metacontacts_family(account, jid)
+ if family:
+ nearby_family, bb_jid, bb_account = \
+ self.roster._get_nearby_family_and_big_brother(family, account)
+
+ is_in_nearby_family = (jid, account) in (
+ (data['jid'], data['account']) for data in nearby_family)
+ self.assertTrue(is_in_nearby_family,
+ msg='Contact not in his own nearby family')
+
+ is_big_brother = (bb_jid, bb_account) == (jid, account)
+
+ # check for each group tag
+ for titerC in iters:
+ self.assertTrue(self.roster.model.iter_is_valid(titerC),
+ msg='Contact iter invalid')
+
+ c_model = self.roster.model[titerC]
+ self.assertEquals(contact.get_shown_name(), c_model[self.C_NAME],
+ msg='Contact name missmatch')
+ self.assertEquals(contact.jid, c_model[self.C_JID],
+ msg='Jid missmatch')
+
+ if not self.roster.regroup:
+ self.assertEquals(account, c_model[self.C_ACCOUNT],
+ msg='Account missmatch')
+
+ # Check for correct nesting
+ parent_iter = self.roster.model.iter_parent(titerC)
+ p_model = self.roster.model[parent_iter]
+ if family:
+ if is_big_brother:
+ self.assertTrue(p_model[self.C_TYPE] == 'group',
+ msg='Big Brother is not on top')
+ else:
+ self.assertTrue(p_model[self.C_TYPE] == 'contact',
+ msg='Little Brother brother has no BigB')
+ else:
+ if jid == gajim.get_jid_from_account(account):
+ self.assertTrue(p_model[self.C_TYPE] == 'account',
+ msg='SelfContact is not on top')
+ else:
+ self.assertTrue(p_model[self.C_TYPE] == 'group',
+ msg='Contact not found in a group')
+
+ def assert_group_is_in_roster(self, group, account):
+ #TODO
+ pass
+
+ def assert_account_is_in_roster(self, acc):
+ titerA = self.roster._get_account_iter(acc, model=self.roster.model)
+ self.assertTrue(self.roster.model.iter_is_valid(titerA),
+ msg='Account iter is invalid')
+
+ acc_model = self.roster.model[titerA]
+ self.assertEquals(acc_model[self.C_TYPE], 'account',
+ msg='No account found')
+
+ if not self.roster.regroup:
+ self.assertEquals(acc_model[self.C_ACCOUNT], acc,
+ msg='Account not found')
+
+ self_jid = gajim.get_jid_from_account(acc)
+ self.assertEquals(acc_model[self.C_JID], self_jid,
+ msg='Account JID not found in account row')
+
+ def assert_model_is_in_sync(self):
+ #TODO: check that iter_n_children returns the correct numbers
+ pass
+
+ # tests
+ def test_fill_contacts_and_groups_dicts(self):
+ for acc in contacts:
+ self.roster.fill_contacts_and_groups_dicts(contacts[acc], acc)
+
+ for jid in contacts[acc]:
+ instances = gajim.contacts.get_contacts(acc, jid)
+
+ # Created a contact for each single jid?
+ self.assertTrue(len(instances) == 1)
+
+ # Contacts kept their info
+ contact = instances[0]
+ self.assertEquals(contact.groups, contacts[acc][jid]['groups'],
+ msg='Group Missmatch')
+
+ groups = contacts[acc][jid]['groups'] or ['General',]
+
+ def test_fill_roster_model(self):
+ for acc in contacts:
+ self.roster.fill_contacts_and_groups_dicts(contacts[acc], acc)
+
+ self.roster.add_account(acc)
+ self.assert_account_is_in_roster(acc)
+
+ self.roster.add_account_contacts(acc)
+ self.assert_all_contacts_are_in_roster(acc)
+
+ self.assert_model_is_in_sync()
class TestRosterWindowRegrouped(TestRosterWindow):
- def setUp(self):
- gajim.config.set('mergeaccounts', True)
- TestRosterWindow.setUp(self)
+ def setUp(self):
+ gajim.config.set('mergeaccounts', True)
+ TestRosterWindow.setUp(self)
- def test_toggle_regroup(self):
- self.roster.regroup = not self.roster.regroup
- self.roster.setup_and_draw_roster()
- self.roster.regroup = not self.roster.regroup
- self.roster.setup_and_draw_roster()
+ def test_toggle_regroup(self):
+ self.roster.regroup = not self.roster.regroup
+ self.roster.setup_and_draw_roster()
+ self.roster.regroup = not self.roster.regroup
+ self.roster.setup_and_draw_roster()
class TestRosterWindowMetaContacts(TestRosterWindowRegrouped):
- def test_receive_metacontact_data(self):
- for complete_data in metacontact_data:
- t_acc = complete_data[0]['account']
- t_jid = complete_data[0]['jid']
- data = complete_data[1:]
- for brother in data:
- acc = brother['account']
- jid = brother['jid']
- gajim.contacts.add_metacontact(t_acc, t_jid, acc, jid)
- self.roster.setup_and_draw_roster()
+ def test_receive_metacontact_data(self):
+ for complete_data in metacontact_data:
+ t_acc = complete_data[0]['account']
+ t_jid = complete_data[0]['jid']
+ data = complete_data[1:]
+ for brother in data:
+ acc = brother['account']
+ jid = brother['jid']
+ gajim.contacts.add_metacontact(t_acc, t_jid, acc, jid)
+ self.roster.setup_and_draw_roster()
- def test_connect_new_metacontact(self):
- self.test_fill_roster_model()
+ def test_connect_new_metacontact(self):
+ self.test_fill_roster_model()
- jid = u'coolstuff@gajim.org'
- contact = gajim.contacts.create_contact(jid, account1)
- gajim.contacts.add_contact(account1, contact)
- self.roster.add_contact(jid, account1)
- self.roster.chg_contact_status(contact, 'offline', '', account1)
+ jid = u'coolstuff@gajim.org'
+ contact = gajim.contacts.create_contact(jid, account1)
+ gajim.contacts.add_contact(account1, contact)
+ self.roster.add_contact(jid, account1)
+ self.roster.chg_contact_status(contact, 'offline', '', account1)
- gajim.contacts.add_metacontact(account1, u'samejid@gajim.org',
- account1, jid)
- self.roster.chg_contact_status(contact, 'online', '', account1)
+ gajim.contacts.add_metacontact(account1, u'samejid@gajim.org',
+ account1, jid)
+ self.roster.chg_contact_status(contact, 'online', '', account1)
- self.assert_model_is_in_sync()
+ self.assert_model_is_in_sync()
if __name__ == '__main__':
- unittest.main()
-
-# vim: se ts=3:
+ unittest.main()
diff --git a/test/integration/test_xmpp_client_nb.py b/test/integration/test_xmpp_client_nb.py
index 24a54a3ca..2efdcfe91 100644
--- a/test/integration/test_xmpp_client_nb.py
+++ b/test/integration/test_xmpp_client_nb.py
@@ -24,148 +24,146 @@ xmpp_server_port = ('gajim.org', 5222)
credentials = ['unittest', 'testtest', 'res']
class TestNonBlockingClient(unittest.TestCase):
- '''
- Test Cases class for NonBlockingClient.
- '''
- def setUp(self):
- ''' IdleQueue thread is run and dummy connection is created. '''
- self.idlequeue_thread = IdleQueueThread()
- self.connection = MockConnection() # for dummy callbacks
- self.idlequeue_thread.start()
-
- def tearDown(self):
- ''' IdleQueue thread is stopped. '''
- self.idlequeue_thread.stop_thread()
- self.idlequeue_thread.join()
-
- self.client = None
-
- def open_stream(self, server_port, wrong_pass=False):
- '''
- Method opening the XMPP connection. It returns when <stream:features>
- is received from server.
-
- :param server_port: tuple of (hostname, port) for where the client should
- connect.
- '''
-
- class TempConnection():
- def get_password(self, cb):
- if wrong_pass:
- cb('wrong pass')
- else:
- cb(credentials[1])
- def on_connect_failure(self):
- pass
-
- self.client = client_nb.NonBlockingClient(
- domain=server_port[0],
- idlequeue=self.idlequeue_thread.iq,
- caller=Mock(realClass=TempConnection))
-
- self.client.connect(
- hostname=server_port[0],
- port=server_port[1],
- on_connect=lambda *args: self.connection.on_connect(True, *args),
- on_connect_failure=lambda *args: self.connection.on_connect(
- False, *args))
-
- self.assert_(self.connection.wait(),
- msg='waiting for callback from client constructor')
-
- # if on_connect was called, client has to be connected and vice versa
- if self.connection.connect_succeeded:
- self.assert_(self.client.get_connect_type())
- else:
- self.assert_(not self.client.get_connect_type())
-
- def client_auth(self, username, password, resource, sasl):
- '''
- Method authenticating connected client with supplied credentials. Returns
- when authentication is over.
-
- :param sasl: whether to use sasl (sasl=1) or old (sasl=0) authentication
- :todo: to check and be more specific about when it returns
- (bind, session..)
- '''
- self.client.auth(username, password, resource, sasl,
- on_auth=self.connection.on_auth)
-
- self.assert_(self.connection.wait(), msg='waiting for authentication')
-
- def do_disconnect(self):
- '''
- Does disconnecting of connected client. Returns when TCP connection is
- closed.
- '''
- self.client.RegisterDisconnectHandler(self.connection.set_event)
- self.client.disconnect()
-
- self.assertTrue(self.connection.wait(), msg='waiting for disconnecting')
-
- def test_proper_connect_sasl(self):
- '''
- The ideal testcase - client is connected, authenticated with SASL and
- then disconnected.
- '''
- self.open_stream(xmpp_server_port)
-
- # if client is not connected, lets raise the AssertionError
- self.assert_(self.client.get_connect_type())
- # client.disconnect() is already called from NBClient via
- # _on_connected_failure, no need to call it here
-
- self.client_auth(credentials[0], credentials[1], credentials[2], sasl=1)
- self.assert_(self.connection.con)
- self.assert_(self.connection.auth=='sasl', msg='Unable to auth via SASL')
-
- self.do_disconnect()
-
- def test_proper_connect_oldauth(self):
- '''
- The ideal testcase - client is connected, authenticated with old auth and
- then disconnected.
- '''
- self.open_stream(xmpp_server_port)
- self.assert_(self.client.get_connect_type())
- self.client_auth(credentials[0], credentials[1], credentials[2], sasl=0)
- self.assert_(self.connection.con)
- features = self.client.Dispatcher.Stream.features
- if not features.getTag('auth'):
- print "Server doesn't support old authentication type, ignoring test"
- else:
- self.assert_(self.connection.auth=='old_auth',
- msg='Unable to auth via old_auth')
- self.do_disconnect()
-
- def test_connect_to_nonexisting_host(self):
- '''
- Connect to nonexisting host. DNS request for A records should return
- nothing.
- '''
- self.open_stream(('fdsfsdf.fdsf.fss', 5222))
- self.assert_(not self.client.get_connect_type())
-
- def test_connect_to_wrong_port(self):
- '''
- Connect to nonexisting server. DNS request for A records should return an
- IP but there shouldn't be XMPP server running on specified port.
- '''
- self.open_stream((xmpp_server_port[0], 31337))
- self.assert_(not self.client.get_connect_type())
-
- def test_connect_with_wrong_creds(self):
- '''
- Connecting with invalid password.
- '''
- self.open_stream(xmpp_server_port, wrong_pass=True)
- self.assert_(self.client.get_connect_type())
- self.client_auth(credentials[0], 'wrong pass', credentials[2], sasl=1)
- self.assert_(self.connection.auth is None)
- self.do_disconnect()
+ '''
+ Test Cases class for NonBlockingClient.
+ '''
+ def setUp(self):
+ ''' IdleQueue thread is run and dummy connection is created. '''
+ self.idlequeue_thread = IdleQueueThread()
+ self.connection = MockConnection() # for dummy callbacks
+ self.idlequeue_thread.start()
+
+ def tearDown(self):
+ ''' IdleQueue thread is stopped. '''
+ self.idlequeue_thread.stop_thread()
+ self.idlequeue_thread.join()
+
+ self.client = None
+
+ def open_stream(self, server_port, wrong_pass=False):
+ '''
+ Method opening the XMPP connection. It returns when <stream:features>
+ is received from server.
+
+ :param server_port: tuple of (hostname, port) for where the client should
+ connect.
+ '''
+
+ class TempConnection():
+ def get_password(self, cb):
+ if wrong_pass:
+ cb('wrong pass')
+ else:
+ cb(credentials[1])
+ def on_connect_failure(self):
+ pass
+
+ self.client = client_nb.NonBlockingClient(
+ domain=server_port[0],
+ idlequeue=self.idlequeue_thread.iq,
+ caller=Mock(realClass=TempConnection))
+
+ self.client.connect(
+ hostname=server_port[0],
+ port=server_port[1],
+ on_connect=lambda *args: self.connection.on_connect(True, *args),
+ on_connect_failure=lambda *args: self.connection.on_connect(
+ False, *args))
+
+ self.assert_(self.connection.wait(),
+ msg='waiting for callback from client constructor')
+
+ # if on_connect was called, client has to be connected and vice versa
+ if self.connection.connect_succeeded:
+ self.assert_(self.client.get_connect_type())
+ else:
+ self.assert_(not self.client.get_connect_type())
+
+ def client_auth(self, username, password, resource, sasl):
+ '''
+ Method authenticating connected client with supplied credentials. Returns
+ when authentication is over.
+
+ :param sasl: whether to use sasl (sasl=1) or old (sasl=0) authentication
+ :todo: to check and be more specific about when it returns
+ (bind, session..)
+ '''
+ self.client.auth(username, password, resource, sasl,
+ on_auth=self.connection.on_auth)
+
+ self.assert_(self.connection.wait(), msg='waiting for authentication')
+
+ def do_disconnect(self):
+ '''
+ Does disconnecting of connected client. Returns when TCP connection is
+ closed.
+ '''
+ self.client.RegisterDisconnectHandler(self.connection.set_event)
+ self.client.disconnect()
+
+ self.assertTrue(self.connection.wait(), msg='waiting for disconnecting')
+
+ def test_proper_connect_sasl(self):
+ '''
+ The ideal testcase - client is connected, authenticated with SASL and
+ then disconnected.
+ '''
+ self.open_stream(xmpp_server_port)
+
+ # if client is not connected, lets raise the AssertionError
+ self.assert_(self.client.get_connect_type())
+ # client.disconnect() is already called from NBClient via
+ # _on_connected_failure, no need to call it here
+
+ self.client_auth(credentials[0], credentials[1], credentials[2], sasl=1)
+ self.assert_(self.connection.con)
+ self.assert_(self.connection.auth=='sasl', msg='Unable to auth via SASL')
+
+ self.do_disconnect()
+
+ def test_proper_connect_oldauth(self):
+ '''
+ The ideal testcase - client is connected, authenticated with old auth and
+ then disconnected.
+ '''
+ self.open_stream(xmpp_server_port)
+ self.assert_(self.client.get_connect_type())
+ self.client_auth(credentials[0], credentials[1], credentials[2], sasl=0)
+ self.assert_(self.connection.con)
+ features = self.client.Dispatcher.Stream.features
+ if not features.getTag('auth'):
+ print "Server doesn't support old authentication type, ignoring test"
+ else:
+ self.assert_(self.connection.auth=='old_auth',
+ msg='Unable to auth via old_auth')
+ self.do_disconnect()
+
+ def test_connect_to_nonexisting_host(self):
+ '''
+ Connect to nonexisting host. DNS request for A records should return
+ nothing.
+ '''
+ self.open_stream(('fdsfsdf.fdsf.fss', 5222))
+ self.assert_(not self.client.get_connect_type())
+
+ def test_connect_to_wrong_port(self):
+ '''
+ Connect to nonexisting server. DNS request for A records should return an
+ IP but there shouldn't be XMPP server running on specified port.
+ '''
+ self.open_stream((xmpp_server_port[0], 31337))
+ self.assert_(not self.client.get_connect_type())
+
+ def test_connect_with_wrong_creds(self):
+ '''
+ Connecting with invalid password.
+ '''
+ self.open_stream(xmpp_server_port, wrong_pass=True)
+ self.assert_(self.client.get_connect_type())
+ self.client_auth(credentials[0], 'wrong pass', credentials[2], sasl=1)
+ self.assert_(self.connection.auth is None)
+ self.do_disconnect()
if __name__ == '__main__':
- unittest.main()
-
-# vim: se ts=3:
+ unittest.main()
diff --git a/test/integration/test_xmpp_transports_nb.py b/test/integration/test_xmpp_transports_nb.py
index ef9908903..f7de8acf9 100644
--- a/test/integration/test_xmpp_transports_nb.py
+++ b/test/integration/test_xmpp_transports_nb.py
@@ -14,265 +14,263 @@ from common.xmpp import transports_nb
class AbstractTransportTest(unittest.TestCase):
- ''' Encapsulates Idlequeue instantiation for transports and more...'''
-
- def setUp(self):
- ''' IdleQueue thread is run and dummy connection is created. '''
- self.idlequeue_thread = IdleQueueThread()
- self.idlequeue_thread.start()
- self._setup_hook()
-
- def tearDown(self):
- ''' IdleQueue thread is stopped. '''
- self._teardown_hook()
- self.idlequeue_thread.stop_thread()
- self.idlequeue_thread.join()
-
- def _setup_hook(self):
- pass
-
- def _teardown_hook(self):
- pass
-
- def expect_receive(self, expected, count=1, msg=None):
- '''
- Returns a callback function that will assert whether the data passed to
- it equals the one specified when calling this function.
-
- Can be used to make sure transport dispatch correct data.
- '''
- def receive(data, *args, **kwargs):
- self.assertEqual(data, expected, msg=msg)
- self._expected_count -= 1
- self._expected_count = count
- return receive
-
- def have_received_expected(self):
- '''
- Plays together with expect_receive(). Will return true if expected_rcv
- callback was called as often as specified
- '''
- return self._expected_count == 0
+ ''' Encapsulates Idlequeue instantiation for transports and more...'''
+
+ def setUp(self):
+ ''' IdleQueue thread is run and dummy connection is created. '''
+ self.idlequeue_thread = IdleQueueThread()
+ self.idlequeue_thread.start()
+ self._setup_hook()
+
+ def tearDown(self):
+ ''' IdleQueue thread is stopped. '''
+ self._teardown_hook()
+ self.idlequeue_thread.stop_thread()
+ self.idlequeue_thread.join()
+
+ def _setup_hook(self):
+ pass
+
+ def _teardown_hook(self):
+ pass
+
+ def expect_receive(self, expected, count=1, msg=None):
+ '''
+ Returns a callback function that will assert whether the data passed to
+ it equals the one specified when calling this function.
+
+ Can be used to make sure transport dispatch correct data.
+ '''
+ def receive(data, *args, **kwargs):
+ self.assertEqual(data, expected, msg=msg)
+ self._expected_count -= 1
+ self._expected_count = count
+ return receive
+
+ def have_received_expected(self):
+ '''
+ Plays together with expect_receive(). Will return true if expected_rcv
+ callback was called as often as specified
+ '''
+ return self._expected_count == 0
class TestNonBlockingTCP(AbstractTransportTest):
- '''
- Test class for NonBlockingTCP. Will actually try to connect to an existing
- XMPP server.
- '''
- class MockClient(IdleMock):
- ''' Simple client to test transport functionality '''
- def __init__(self, idlequeue, testcase):
- self.idlequeue = idlequeue
- self.testcase = testcase
- IdleMock.__init__(self)
-
- def do_connect(self, establish_tls=False, proxy_dict=None):
- try:
- ips = socket.getaddrinfo('gajim.org', 5222,
- socket.AF_UNSPEC,socket.SOCK_STREAM)
- ip = ips[0]
- except socket.error, e:
- self.testcase.fail(msg=str(e))
-
- self.socket = transports_nb.NonBlockingTCP(
- raise_event=lambda event_type, data: self.testcase.assertTrue(
- event_type and data),
- on_disconnect=lambda: self.on_success(mode='SocketDisconnect'),
- idlequeue=self.idlequeue,
- estabilish_tls=establish_tls,
- certs=('../data/other/cacerts.pem', 'tmp/cacerts.pem'),
- proxy_dict=proxy_dict)
-
- self.socket.PlugIn(self)
-
- self.socket.connect(conn_5tuple=ip,
- on_connect=lambda: self.on_success(mode='TCPconnect'),
- on_connect_failure=self.on_failure)
- self.testcase.assertTrue(self.wait(), msg='Connection timed out')
-
- def do_disconnect(self):
- self.socket.disconnect()
- self.testcase.assertTrue(self.wait(), msg='Disconnect timed out')
-
- def on_failure(self, err_message):
- self.set_event()
- self.testcase.fail(msg=err_message)
-
- def on_success(self, mode, data=None):
- if mode == "TCPconnect":
- pass
- if mode == "SocketDisconnect":
- pass
- self.set_event()
-
- def _setup_hook(self):
- self.client = self.MockClient(idlequeue=self.idlequeue_thread.iq,
- testcase=self)
-
- def _teardown_hook(self):
- if self.client.socket.state == 'CONNECTED':
- self.client.do_disconnect()
-
- def test_connect_disconnect_plain(self):
- ''' Establish plain connection '''
- self.client.do_connect(establish_tls=False)
- self.assertEquals(self.client.socket.state, 'CONNECTED')
- self.client.do_disconnect()
- self.assertEquals(self.client.socket.state, 'DISCONNECTED')
-
-# def test_connect_disconnect_ssl(self):
-# ''' Establish SSL (not TLS) connection '''
-# self.client.do_connect(establish_tls=True)
-# self.assertEquals(self.client.socket.state, 'CONNECTED')
-# self.client.do_disconnect()
-# self.assertEquals(self.client.socket.state, 'DISCONNECTED')
-
- def test_do_receive(self):
- ''' Test _do_receive method by overwriting socket.recv '''
- self.client.do_connect()
- sock = self.client.socket
-
- # transport shall receive data
- data = "Please don't fail"
- sock._recv = lambda buffer: data
- sock.onreceive(self.expect_receive(data))
- sock._do_receive()
- self.assertTrue(self.have_received_expected(), msg='Did not receive data')
- self.assert_(self.client.socket.state == 'CONNECTED')
-
- # transport shall do nothing as an non-fatal SSL is simulated
- sock._recv = lambda buffer: None
- sock.onreceive(self.assertFalse) # we did not receive anything...
- sock._do_receive()
- self.assert_(self.client.socket.state == 'CONNECTED')
-
- # transport shall disconnect as remote side closed the connection
- sock._recv = lambda buffer: ''
- sock.onreceive(self.assertFalse) # we did not receive anything...
- sock._do_receive()
- self.assert_(self.client.socket.state == 'DISCONNECTED')
-
- def test_do_send(self):
- ''' Test _do_send method by overwriting socket.send '''
- self.client.do_connect()
- sock = self.client.socket
-
- outgoing = [] # what we have actually send to our socket.socket
- data_part1 = "Please don't "
- data_part2 = "fail!"
- data_complete = data_part1 + data_part2
-
- # Simulate everything could be send in one go
- def _send_all(data):
- outgoing.append(data)
- return len(data)
- sock._send = _send_all
- sock.send(data_part1)
- sock.send(data_part2)
- sock._do_send()
- sock._do_send()
- self.assertTrue(self.client.socket.state == 'CONNECTED')
- self.assertTrue(data_part1 in outgoing and data_part2 in outgoing)
- self.assertFalse(sock.sendqueue and sock.sendbuff,
- msg='There is still unsend data in buffers')
-
- # Simulate data could only be sent in chunks
- self.chunk_count = 0
- outgoing = []
- def _send_chunks(data):
- if self.chunk_count == 0:
- outgoing.append(data_part1)
- self.chunk_count += 1
- return len(data_part1)
- else:
- outgoing.append(data_part2)
- return len(data_part2)
- sock._send = _send_chunks
- sock.send(data_complete)
- sock._do_send() # process first chunk
- sock._do_send() # process the second one
- self.assertTrue(self.client.socket.state == 'CONNECTED')
- self.assertTrue(data_part1 in outgoing and data_part2 in outgoing)
- self.assertFalse(sock.sendqueue and sock.sendbuff,
- msg='There is still unsend data in buffers')
+ '''
+ Test class for NonBlockingTCP. Will actually try to connect to an existing
+ XMPP server.
+ '''
+ class MockClient(IdleMock):
+ ''' Simple client to test transport functionality '''
+ def __init__(self, idlequeue, testcase):
+ self.idlequeue = idlequeue
+ self.testcase = testcase
+ IdleMock.__init__(self)
+
+ def do_connect(self, establish_tls=False, proxy_dict=None):
+ try:
+ ips = socket.getaddrinfo('gajim.org', 5222,
+ socket.AF_UNSPEC,socket.SOCK_STREAM)
+ ip = ips[0]
+ except socket.error, e:
+ self.testcase.fail(msg=str(e))
+
+ self.socket = transports_nb.NonBlockingTCP(
+ raise_event=lambda event_type, data: self.testcase.assertTrue(
+ event_type and data),
+ on_disconnect=lambda: self.on_success(mode='SocketDisconnect'),
+ idlequeue=self.idlequeue,
+ estabilish_tls=establish_tls,
+ certs=('../data/other/cacerts.pem', 'tmp/cacerts.pem'),
+ proxy_dict=proxy_dict)
+
+ self.socket.PlugIn(self)
+
+ self.socket.connect(conn_5tuple=ip,
+ on_connect=lambda: self.on_success(mode='TCPconnect'),
+ on_connect_failure=self.on_failure)
+ self.testcase.assertTrue(self.wait(), msg='Connection timed out')
+
+ def do_disconnect(self):
+ self.socket.disconnect()
+ self.testcase.assertTrue(self.wait(), msg='Disconnect timed out')
+
+ def on_failure(self, err_message):
+ self.set_event()
+ self.testcase.fail(msg=err_message)
+
+ def on_success(self, mode, data=None):
+ if mode == "TCPconnect":
+ pass
+ if mode == "SocketDisconnect":
+ pass
+ self.set_event()
+
+ def _setup_hook(self):
+ self.client = self.MockClient(idlequeue=self.idlequeue_thread.iq,
+ testcase=self)
+
+ def _teardown_hook(self):
+ if self.client.socket.state == 'CONNECTED':
+ self.client.do_disconnect()
+
+ def test_connect_disconnect_plain(self):
+ ''' Establish plain connection '''
+ self.client.do_connect(establish_tls=False)
+ self.assertEquals(self.client.socket.state, 'CONNECTED')
+ self.client.do_disconnect()
+ self.assertEquals(self.client.socket.state, 'DISCONNECTED')
+
+# def test_connect_disconnect_ssl(self):
+# ''' Establish SSL (not TLS) connection '''
+# self.client.do_connect(establish_tls=True)
+# self.assertEquals(self.client.socket.state, 'CONNECTED')
+# self.client.do_disconnect()
+# self.assertEquals(self.client.socket.state, 'DISCONNECTED')
+
+ def test_do_receive(self):
+ ''' Test _do_receive method by overwriting socket.recv '''
+ self.client.do_connect()
+ sock = self.client.socket
+
+ # transport shall receive data
+ data = "Please don't fail"
+ sock._recv = lambda buffer: data
+ sock.onreceive(self.expect_receive(data))
+ sock._do_receive()
+ self.assertTrue(self.have_received_expected(), msg='Did not receive data')
+ self.assert_(self.client.socket.state == 'CONNECTED')
+
+ # transport shall do nothing as an non-fatal SSL is simulated
+ sock._recv = lambda buffer: None
+ sock.onreceive(self.assertFalse) # we did not receive anything...
+ sock._do_receive()
+ self.assert_(self.client.socket.state == 'CONNECTED')
+
+ # transport shall disconnect as remote side closed the connection
+ sock._recv = lambda buffer: ''
+ sock.onreceive(self.assertFalse) # we did not receive anything...
+ sock._do_receive()
+ self.assert_(self.client.socket.state == 'DISCONNECTED')
+
+ def test_do_send(self):
+ ''' Test _do_send method by overwriting socket.send '''
+ self.client.do_connect()
+ sock = self.client.socket
+
+ outgoing = [] # what we have actually send to our socket.socket
+ data_part1 = "Please don't "
+ data_part2 = "fail!"
+ data_complete = data_part1 + data_part2
+
+ # Simulate everything could be send in one go
+ def _send_all(data):
+ outgoing.append(data)
+ return len(data)
+ sock._send = _send_all
+ sock.send(data_part1)
+ sock.send(data_part2)
+ sock._do_send()
+ sock._do_send()
+ self.assertTrue(self.client.socket.state == 'CONNECTED')
+ self.assertTrue(data_part1 in outgoing and data_part2 in outgoing)
+ self.assertFalse(sock.sendqueue and sock.sendbuff,
+ msg='There is still unsend data in buffers')
+
+ # Simulate data could only be sent in chunks
+ self.chunk_count = 0
+ outgoing = []
+ def _send_chunks(data):
+ if self.chunk_count == 0:
+ outgoing.append(data_part1)
+ self.chunk_count += 1
+ return len(data_part1)
+ else:
+ outgoing.append(data_part2)
+ return len(data_part2)
+ sock._send = _send_chunks
+ sock.send(data_complete)
+ sock._do_send() # process first chunk
+ sock._do_send() # process the second one
+ self.assertTrue(self.client.socket.state == 'CONNECTED')
+ self.assertTrue(data_part1 in outgoing and data_part2 in outgoing)
+ self.assertFalse(sock.sendqueue and sock.sendbuff,
+ msg='There is still unsend data in buffers')
class TestNonBlockingHTTP(AbstractTransportTest):
- ''' Test class for NonBlockingHTTP transport'''
-
- bosh_http_dict = {
- 'http_uri': 'http://gajim.org:5280/http-bind',
- 'http_version': 'HTTP/1.1',
- 'http_persistent': True,
- 'add_proxy_headers': False
- }
-
- def _get_transport(self, http_dict, proxy_dict=None):
- return transports_nb.NonBlockingHTTP(
- raise_event=None,
- on_disconnect=None,
- idlequeue=self.idlequeue_thread.iq,
- estabilish_tls=False,
- certs=None,
- on_http_request_possible=lambda: None,
- on_persistent_fallback=None,
- http_dict=http_dict,
- proxy_dict=proxy_dict,
- )
-
- def test_parse_own_http_message(self):
- ''' Build a HTTP message and try to parse it afterwards '''
- transport = self._get_transport(self.bosh_http_dict)
-
- data = "<test>Please don't fail!</test>"
- http_message = transport.build_http_message(data)
- statusline, headers, http_body, buffer_rest = transport.parse_http_message(
- http_message)
-
- self.assertFalse(bool(buffer_rest))
- self.assertTrue(statusline and isinstance(statusline, list))
- self.assertTrue(headers and isinstance(headers, dict))
- self.assertEqual(data, http_body, msg='Input and output are different')
-
- def test_receive_http_message(self):
- ''' Let _on_receive handle some http messages '''
- transport = self._get_transport(self.bosh_http_dict)
-
- header = ("HTTP/1.1 200 OK\r\nContent-Type: text/xml; charset=utf-8\r\n" +
- "Content-Length: 88\r\n\r\n")
- payload = "<test>Please don't fail!</test>"
- body = "<body xmlns='http://jabber.org/protocol/httpbind'>%s</body>" \
- % payload
- message = "%s%s" % (header, body)
-
- # try to receive in one go
- transport.onreceive(self.expect_receive(body, msg='Failed: In one go'))
- transport._on_receive(message)
- self.assertTrue(self.have_received_expected(), msg='Failed: In one go')
-
- def test_receive_http_message_in_chunks(self):
- ''' Let _on_receive handle some chunked http messages '''
- transport = self._get_transport(self.bosh_http_dict)
-
- payload = "<test>Please don't fail!\n\n</test>"
- body = "<body xmlns='http://jabber.org/protocol/httpbind'>%s</body>" \
- % payload
- header = "HTTP/1.1 200 OK\r\nContent-Type: text/xml; charset=utf-8\r\n" +\
- "Content-Length: %i\r\n\r\n" % len(body)
- message = "%s%s" % (header, body)
-
- chunk1, chunk2, chunk3, chunk4 = message[:20], message[20:73], \
- message[73:85], message[85:]
- nextmessage_chunk = "\r\n\r\nHTTP/1.1 200 OK\r\nContent-Type: text/x"
- chunks = (chunk1, chunk2, chunk3, chunk4, nextmessage_chunk)
-
- transport.onreceive(self.expect_receive(body, msg='Failed: In chunks'))
- for chunk in chunks:
- transport._on_receive(chunk)
- self.assertTrue(self.have_received_expected(), msg='Failed: In chunks')
+ ''' Test class for NonBlockingHTTP transport'''
+
+ bosh_http_dict = {
+ 'http_uri': 'http://gajim.org:5280/http-bind',
+ 'http_version': 'HTTP/1.1',
+ 'http_persistent': True,
+ 'add_proxy_headers': False
+ }
+
+ def _get_transport(self, http_dict, proxy_dict=None):
+ return transports_nb.NonBlockingHTTP(
+ raise_event=None,
+ on_disconnect=None,
+ idlequeue=self.idlequeue_thread.iq,
+ estabilish_tls=False,
+ certs=None,
+ on_http_request_possible=lambda: None,
+ on_persistent_fallback=None,
+ http_dict=http_dict,
+ proxy_dict=proxy_dict,
+ )
+
+ def test_parse_own_http_message(self):
+ ''' Build a HTTP message and try to parse it afterwards '''
+ transport = self._get_transport(self.bosh_http_dict)
+
+ data = "<test>Please don't fail!</test>"
+ http_message = transport.build_http_message(data)
+ statusline, headers, http_body, buffer_rest = transport.parse_http_message(
+ http_message)
+
+ self.assertFalse(bool(buffer_rest))
+ self.assertTrue(statusline and isinstance(statusline, list))
+ self.assertTrue(headers and isinstance(headers, dict))
+ self.assertEqual(data, http_body, msg='Input and output are different')
+
+ def test_receive_http_message(self):
+ ''' Let _on_receive handle some http messages '''
+ transport = self._get_transport(self.bosh_http_dict)
+
+ header = ("HTTP/1.1 200 OK\r\nContent-Type: text/xml; charset=utf-8\r\n" +
+ "Content-Length: 88\r\n\r\n")
+ payload = "<test>Please don't fail!</test>"
+ body = "<body xmlns='http://jabber.org/protocol/httpbind'>%s</body>" \
+ % payload
+ message = "%s%s" % (header, body)
+
+ # try to receive in one go
+ transport.onreceive(self.expect_receive(body, msg='Failed: In one go'))
+ transport._on_receive(message)
+ self.assertTrue(self.have_received_expected(), msg='Failed: In one go')
+
+ def test_receive_http_message_in_chunks(self):
+ ''' Let _on_receive handle some chunked http messages '''
+ transport = self._get_transport(self.bosh_http_dict)
+
+ payload = "<test>Please don't fail!\n\n</test>"
+ body = "<body xmlns='http://jabber.org/protocol/httpbind'>%s</body>" \
+ % payload
+ header = "HTTP/1.1 200 OK\r\nContent-Type: text/xml; charset=utf-8\r\n" +\
+ "Content-Length: %i\r\n\r\n" % len(body)
+ message = "%s%s" % (header, body)
+
+ chunk1, chunk2, chunk3, chunk4 = message[:20], message[20:73], \
+ message[73:85], message[85:]
+ nextmessage_chunk = "\r\n\r\nHTTP/1.1 200 OK\r\nContent-Type: text/x"
+ chunks = (chunk1, chunk2, chunk3, chunk4, nextmessage_chunk)
+
+ transport.onreceive(self.expect_receive(body, msg='Failed: In chunks'))
+ for chunk in chunks:
+ transport._on_receive(chunk)
+ self.assertTrue(self.have_received_expected(), msg='Failed: In chunks')
if __name__ == '__main__':
- unittest.main()
-
-# vim: se ts=3:
+ unittest.main()
diff --git a/test/lib/__init__.py b/test/lib/__init__.py
index 7d0b1bd98..8d211a03e 100644
--- a/test/lib/__init__.py
+++ b/test/lib/__init__.py
@@ -7,8 +7,8 @@ shortargs = 'hnv:'
longargs = 'help no-x verbose='
opts, args = getopt.getopt(sys.argv[1:], shortargs, longargs.split())
for o, a in opts:
- if o in ('-n', '--no-x'):
- use_x = False
+ if o in ('-n', '--no-x'):
+ use_x = False
gajim_root = os.path.join(os.path.abspath(os.path.dirname(__file__)), '../..')
@@ -25,28 +25,26 @@ import __builtin__
__builtin__._ = lambda x: x
def setup_env():
- # wipe config directory
- if os.path.isdir(configdir):
- import shutil
- shutil.rmtree(configdir)
+ # wipe config directory
+ if os.path.isdir(configdir):
+ import shutil
+ shutil.rmtree(configdir)
- os.mkdir(configdir)
+ os.mkdir(configdir)
- import common.configpaths
- common.configpaths.gajimpaths.init(configdir)
- common.configpaths.gajimpaths.init_profile()
+ import common.configpaths
+ common.configpaths.gajimpaths.init(configdir)
+ common.configpaths.gajimpaths.init_profile()
- # for some reason common.gajim needs to be imported before xmpppy?
- from common import gajim
+ # for some reason common.gajim needs to be imported before xmpppy?
+ from common import gajim
- import logging
- logging.basicConfig()
+ import logging
+ logging.basicConfig()
- gajim.DATA_DIR = gajim_root + '/data'
- gajim.use_x = use_x
+ gajim.DATA_DIR = gajim_root + '/data'
+ gajim.use_x = use_x
- if use_x:
- import gtkgui_helpers
- gtkgui_helpers.GUI_DIR = gajim_root + '/data/gui'
-
-# vim: se ts=3:
+ if use_x:
+ import gtkgui_helpers
+ gtkgui_helpers.GUI_DIR = gajim_root + '/data/gui'
diff --git a/test/lib/data.py b/test/lib/data.py
index c713de0ac..af2a87057 100755
--- a/test/lib/data.py
+++ b/test/lib/data.py
@@ -5,75 +5,73 @@ account3 = u'dingdong.org'
contacts = {}
contacts[account1] = {
- u'myjid@'+account1: {
- 'ask': None, 'groups': [], 'name': None, 'resources': {},
- 'subscription': u'both'},
- u'default1@gajim.org': {
- 'ask': None, 'groups': [], 'name': None, 'resources': {},
- 'subscription': u'both'},
- u'default2@gajim.org': {
- 'ask': None, 'groups': [u'GroupA',], 'name': None, 'resources': {},
- 'subscription': u'both'},
- u'Cool"chârßéµö@gajim.org': {
- 'ask': None, 'groups': [u'<Cool"chârßéµö', u'GroupB'],
- 'name': None, 'resources': {}, 'subscription': u'both'},
- u'samejid@gajim.org': {
- 'ask': None, 'groups': [u'GroupA',], 'name': None, 'resources': {},
- 'subscription': u'both'}
+ u'myjid@'+account1: {
+ 'ask': None, 'groups': [], 'name': None, 'resources': {},
+ 'subscription': u'both'},
+ u'default1@gajim.org': {
+ 'ask': None, 'groups': [], 'name': None, 'resources': {},
+ 'subscription': u'both'},
+ u'default2@gajim.org': {
+ 'ask': None, 'groups': [u'GroupA',], 'name': None, 'resources': {},
+ 'subscription': u'both'},
+ u'Cool"chârßéµö@gajim.org': {
+ 'ask': None, 'groups': [u'<Cool"chârßéµö', u'GroupB'],
+ 'name': None, 'resources': {}, 'subscription': u'both'},
+ u'samejid@gajim.org': {
+ 'ask': None, 'groups': [u'GroupA',], 'name': None, 'resources': {},
+ 'subscription': u'both'}
}
contacts[account2] = {
- u'myjid@'+account2: {
- 'ask': None, 'groups': [], 'name': None, 'resources': {},
- 'subscription': u'both'},
- u'default3@gajim.org': {
- 'ask': None, 'groups': [u'GroupC',], 'name': None, 'resources': {},
- 'subscription': u'both'},
- u'asksubfrom@gajim.org': {
- 'ask': u'subscribe', 'groups': [u'GroupA',], 'name': None,
- 'resources': {}, 'subscription': u'from'},
- u'subto@gajim.org': {
- 'ask': None, 'groups': [u'GroupB'], 'name': None, 'resources': {},
- 'subscription': u'to'},
- u'samejid@gajim.org': {
- 'ask': None, 'groups': [u'GroupA', u'GroupB'], 'name': None,
- 'resources': {}, 'subscription': u'both'}
+ u'myjid@'+account2: {
+ 'ask': None, 'groups': [], 'name': None, 'resources': {},
+ 'subscription': u'both'},
+ u'default3@gajim.org': {
+ 'ask': None, 'groups': [u'GroupC',], 'name': None, 'resources': {},
+ 'subscription': u'both'},
+ u'asksubfrom@gajim.org': {
+ 'ask': u'subscribe', 'groups': [u'GroupA',], 'name': None,
+ 'resources': {}, 'subscription': u'from'},
+ u'subto@gajim.org': {
+ 'ask': None, 'groups': [u'GroupB'], 'name': None, 'resources': {},
+ 'subscription': u'to'},
+ u'samejid@gajim.org': {
+ 'ask': None, 'groups': [u'GroupA', u'GroupB'], 'name': None,
+ 'resources': {}, 'subscription': u'both'}
}
contacts[account3] = {
- #u'guypsych0\\40h.com@msn.dingdong.org': {
- # 'ask': None, 'groups': [], 'name': None, 'resources': {},
- # 'subscription': u'both'},
- u'guypsych0%h.com@msn.delx.cjb.net': {
- 'ask': u'subscribe', 'groups': [], 'name': None,
- 'resources': {}, 'subscription': u'from'},
- #u'guypsych0%h.com@msn.jabber.wiretrip.org': {
- # 'ask': None, 'groups': [], 'name': None, 'resources': {},
- # 'subscription': u'to'},
- #u'guypsycho\\40g.com@gtalk.dingdong.org': {
- # 'ask': None, 'groups': [], 'name': None,
- # 'resources': {}, 'subscription': u'both'}
+ #u'guypsych0\\40h.com@msn.dingdong.org': {
+ # 'ask': None, 'groups': [], 'name': None, 'resources': {},
+ # 'subscription': u'both'},
+ u'guypsych0%h.com@msn.delx.cjb.net': {
+ 'ask': u'subscribe', 'groups': [], 'name': None,
+ 'resources': {}, 'subscription': u'from'},
+ #u'guypsych0%h.com@msn.jabber.wiretrip.org': {
+ # 'ask': None, 'groups': [], 'name': None, 'resources': {},
+ # 'subscription': u'to'},
+ #u'guypsycho\\40g.com@gtalk.dingdong.org': {
+ # 'ask': None, 'groups': [], 'name': None,
+ # 'resources': {}, 'subscription': u'both'}
}
# We have contacts that are not in roster but only specified in the metadata
metacontact_data = [
- [{'account': account3,
- 'jid': u'guypsych0\\40h.com@msn.dingdong.org',
- 'order': 0},
- {'account': account3,
- 'jid': u'guypsych0%h.com@msn.delx.cjb.net',
- 'order': 0},
- {'account': account3,
- 'jid': u'guypsych0%h.com@msn.jabber.wiretrip.org',
- 'order': 0},
- {'account': account3,
- 'jid': u'guypsycho\\40g.com@gtalk.dingdong.org',
- 'order': 0}],
+ [{'account': account3,
+ 'jid': u'guypsych0\\40h.com@msn.dingdong.org',
+ 'order': 0},
+ {'account': account3,
+ 'jid': u'guypsych0%h.com@msn.delx.cjb.net',
+ 'order': 0},
+ {'account': account3,
+ 'jid': u'guypsych0%h.com@msn.jabber.wiretrip.org',
+ 'order': 0},
+ {'account': account3,
+ 'jid': u'guypsycho\\40g.com@gtalk.dingdong.org',
+ 'order': 0}],
- [{'account': account1,
- 'jid': u'samejid@gajim.org',
- 'order': 0},
- {'account': account2,
- 'jid': u'samejid@gajim.org',
- 'order': 0}]
- ]
-
-# vim: se ts=3:
+ [{'account': account1,
+ 'jid': u'samejid@gajim.org',
+ 'order': 0},
+ {'account': account2,
+ 'jid': u'samejid@gajim.org',
+ 'order': 0}]
+ ]
diff --git a/test/lib/gajim_mocks.py b/test/lib/gajim_mocks.py
index 9607a77c9..2f0037a00 100644
--- a/test/lib/gajim_mocks.py
+++ b/test/lib/gajim_mocks.py
@@ -8,142 +8,140 @@ from common import gajim
from common.connection_handlers import ConnectionHandlersBase
class MockConnection(Mock, ConnectionHandlersBase):
- def __init__(self, account, *args):
- Mock.__init__(self, *args)
- ConnectionHandlersBase.__init__(self)
-
- self.name = account
- self.connected = 2
- self.pep = {}
- self.blocked_contacts = {}
- self.blocked_groups = {}
- self.sessions = {}
-
- gajim.interface.instances[account] = {'infos': {}, 'disco': {},
- 'gc_config': {}, 'search': {}}
- gajim.interface.minimized_controls[account] = {}
- gajim.contacts.add_account(account)
- gajim.groups[account] = {}
- gajim.gc_connected[account] = {}
- gajim.automatic_rooms[account] = {}
- gajim.newly_added[account] = []
- gajim.to_be_removed[account] = []
- gajim.nicks[account] = gajim.config.get_per('accounts', account, 'name')
- gajim.block_signed_in_notifications[account] = True
- gajim.sleeper_state[account] = 0
- gajim.encrypted_chats[account] = []
- gajim.last_message_time[account] = {}
- gajim.status_before_autoaway[account] = ''
- gajim.transport_avatar[account] = {}
- gajim.gajim_optional_features[account] = []
- gajim.caps_hash[account] = ''
-
- gajim.connections[account] = self
+ def __init__(self, account, *args):
+ Mock.__init__(self, *args)
+ ConnectionHandlersBase.__init__(self)
+
+ self.name = account
+ self.connected = 2
+ self.pep = {}
+ self.blocked_contacts = {}
+ self.blocked_groups = {}
+ self.sessions = {}
+
+ gajim.interface.instances[account] = {'infos': {}, 'disco': {},
+ 'gc_config': {}, 'search': {}}
+ gajim.interface.minimized_controls[account] = {}
+ gajim.contacts.add_account(account)
+ gajim.groups[account] = {}
+ gajim.gc_connected[account] = {}
+ gajim.automatic_rooms[account] = {}
+ gajim.newly_added[account] = []
+ gajim.to_be_removed[account] = []
+ gajim.nicks[account] = gajim.config.get_per('accounts', account, 'name')
+ gajim.block_signed_in_notifications[account] = True
+ gajim.sleeper_state[account] = 0
+ gajim.encrypted_chats[account] = []
+ gajim.last_message_time[account] = {}
+ gajim.status_before_autoaway[account] = ''
+ gajim.transport_avatar[account] = {}
+ gajim.gajim_optional_features[account] = []
+ gajim.caps_hash[account] = ''
+
+ gajim.connections[account] = self
class MockWindow(Mock):
- def __init__(self, *args):
- Mock.__init__(self, *args)
- self.window = Mock()
- self._controls = {}
+ def __init__(self, *args):
+ Mock.__init__(self, *args)
+ self.window = Mock()
+ self._controls = {}
- def get_control(self, jid, account):
- try:
- return self._controls[account][jid]
- except KeyError:
- return None
+ def get_control(self, jid, account):
+ try:
+ return self._controls[account][jid]
+ except KeyError:
+ return None
- def has_control(self, jid, acct):
- return self.get_control(jid, acct) is not None
+ def has_control(self, jid, acct):
+ return self.get_control(jid, acct) is not None
- def new_tab(self, ctrl):
- account = ctrl.account
- jid = ctrl.jid
+ def new_tab(self, ctrl):
+ account = ctrl.account
+ jid = ctrl.jid
- if account not in self._controls:
- self._controls[account] = {}
+ if account not in self._controls:
+ self._controls[account] = {}
- if jid not in self._controls[account]:
- self._controls[account][jid] = {}
+ if jid not in self._controls[account]:
+ self._controls[account][jid] = {}
- self._controls[account][jid] = ctrl
+ self._controls[account][jid] = ctrl
- def __nonzero__(self):
- return True
+ def __nonzero__(self):
+ return True
class MockChatControl(Mock):
- def __init__(self, jid, account, *args):
- Mock.__init__(self, *args)
+ def __init__(self, jid, account, *args):
+ Mock.__init__(self, *args)
- self.jid = jid
- self.account = account
+ self.jid = jid
+ self.account = account
- self.parent_win = MockWindow({'get_active_control': self})
- self.session = None
+ self.parent_win = MockWindow({'get_active_control': self})
+ self.session = None
- def set_session(self, sess):
- self.session = sess
+ def set_session(self, sess):
+ self.session = sess
- def __nonzero__(self):
- return True
+ def __nonzero__(self):
+ return True
- def __eq__(self, other):
- return self is other
+ def __eq__(self, other):
+ return self is other
class MockInterface(Mock):
- def __init__(self, *args):
- Mock.__init__(self, *args)
- gajim.interface = self
- self.msg_win_mgr = Mock()
- self.roster = Mock()
+ def __init__(self, *args):
+ Mock.__init__(self, *args)
+ gajim.interface = self
+ self.msg_win_mgr = Mock()
+ self.roster = Mock()
- self.remote_ctrl = None
- self.instances = {}
- self.minimized_controls = {}
- self.status_sent_to_users = Mock()
+ self.remote_ctrl = None
+ self.instances = {}
+ self.minimized_controls = {}
+ self.status_sent_to_users = Mock()
- if gajim.use_x:
- self.jabber_state_images = {'16': {}, '32': {}, 'opened': {},
- 'closed': {}}
+ if gajim.use_x:
+ self.jabber_state_images = {'16': {}, '32': {}, 'opened': {},
+ 'closed': {}}
- import gtkgui_helpers
- gtkgui_helpers.make_jabber_state_images()
- else:
- self.jabber_state_images = {'16': Mock(), '32': Mock(),
- 'opened': Mock(), 'closed': Mock()}
+ import gtkgui_helpers
+ gtkgui_helpers.make_jabber_state_images()
+ else:
+ self.jabber_state_images = {'16': Mock(), '32': Mock(),
+ 'opened': Mock(), 'closed': Mock()}
class MockLogger(Mock):
- def __init__(self):
- Mock.__init__(self, {'write': None, 'get_transports_type': {}})
+ def __init__(self):
+ Mock.__init__(self, {'write': None, 'get_transports_type': {}})
class MockContact(Mock):
- def __nonzero__(self):
- return True
+ def __nonzero__(self):
+ return True
import random
class MockSession(Mock):
- def __init__(self, conn, jid, thread_id, type_):
- Mock.__init__(self)
+ def __init__(self, conn, jid, thread_id, type_):
+ Mock.__init__(self)
- self.conn = conn
- self.jid = jid
- self.type = type_
- self.thread_id = thread_id
+ self.conn = conn
+ self.jid = jid
+ self.type = type_
+ self.thread_id = thread_id
- if not self.thread_id:
- self.thread_id = '%0x' % random.randint(0, 10000)
+ if not self.thread_id:
+ self.thread_id = '%0x' % random.randint(0, 10000)
- def __repr__(self):
- return '<MockSession %s>' % self.thread_id
+ def __repr__(self):
+ return '<MockSession %s>' % self.thread_id
- def __nonzero__(self):
- return True
+ def __nonzero__(self):
+ return True
- def __eq__(self, other):
- return self is other
-
-# vim: se ts=3:
+ def __eq__(self, other):
+ return self is other
diff --git a/test/lib/mock.py b/test/lib/mock.py
index 2c87d53f8..de078b52b 100644
--- a/test/lib/mock.py
+++ b/test/lib/mock.py
@@ -464,5 +464,3 @@ CALLABLE = callable
-
-# vim: se ts=3:
diff --git a/test/lib/notify.py b/test/lib/notify.py
index f14100af3..b13d03678 100644
--- a/test/lib/notify.py
+++ b/test/lib/notify.py
@@ -3,15 +3,13 @@
notifications = []
def notify(event, jid, account, parameters, advanced_notif_num = None):
- notifications.append((event, jid, account, parameters, advanced_notif_num))
+ notifications.append((event, jid, account, parameters, advanced_notif_num))
def get_advanced_notification(event, account, contact):
- return None
+ return None
def get_show_in_roster(event, account, contact, session = None):
- return True
+ return True
def get_show_in_systray(event, account, contact, type_ = None):
- return True
-
-# vim: se ts=3: \ No newline at end of file
+ return True
diff --git a/test/lib/xmpp_mocks.py b/test/lib/xmpp_mocks.py
index bf60ae6dc..0b4055949 100644
--- a/test/lib/xmpp_mocks.py
+++ b/test/lib/xmpp_mocks.py
@@ -12,86 +12,84 @@ IDLEQUEUE_INTERVAL = 0.2 # polling interval. 200ms is used in Gajim as default
IDLEMOCK_TIMEOUT = 30 # how long we wait for an event
class IdleQueueThread(threading.Thread):
- '''
- Thread for regular processing of idlequeue.
- '''
- def __init__(self):
- self.iq = idlequeue.IdleQueue()
- self.stop = threading.Event() # Event to stop the thread main loop.
- self.stop.clear()
- threading.Thread.__init__(self)
-
- def run(self):
- while not self.stop.isSet():
- self.iq.process()
- time.sleep(IDLEQUEUE_INTERVAL)
-
- def stop_thread(self):
- self.stop.set()
-
-
+ '''
+ Thread for regular processing of idlequeue.
+ '''
+ def __init__(self):
+ self.iq = idlequeue.IdleQueue()
+ self.stop = threading.Event() # Event to stop the thread main loop.
+ self.stop.clear()
+ threading.Thread.__init__(self)
+
+ def run(self):
+ while not self.stop.isSet():
+ self.iq.process()
+ time.sleep(IDLEQUEUE_INTERVAL)
+
+ def stop_thread(self):
+ self.stop.set()
+
+
class IdleMock:
- '''
- Serves as template for testing objects that are normally controlled by GUI.
- Allows to wait for asynchronous callbacks with wait() method.
- '''
- def __init__(self):
- self._event = threading.Event()
- self._event.clear()
-
- def wait(self):
- '''
- Block until some callback sets the event and clearing the event
- subsequently.
- Returns True if event was set, False on timeout
- '''
- self._event.wait(IDLEMOCK_TIMEOUT)
- if self._event.isSet():
- self._event.clear()
- return True
- else:
- return False
-
- def set_event(self):
- self._event.set()
+ '''
+ Serves as template for testing objects that are normally controlled by GUI.
+ Allows to wait for asynchronous callbacks with wait() method.
+ '''
+ def __init__(self):
+ self._event = threading.Event()
+ self._event.clear()
+
+ def wait(self):
+ '''
+ Block until some callback sets the event and clearing the event
+ subsequently.
+ Returns True if event was set, False on timeout
+ '''
+ self._event.wait(IDLEMOCK_TIMEOUT)
+ if self._event.isSet():
+ self._event.clear()
+ return True
+ else:
+ return False
+
+ def set_event(self):
+ self._event.set()
class MockConnection(IdleMock, Mock):
- '''
- Class simulating Connection class from src/common/connection.py
-
- It is derived from Mock in order to avoid defining all methods
- from real Connection that are called from NBClient or Dispatcher
- ( _event_dispatcher for example)
- '''
-
- def __init__(self, *args):
- self.connect_succeeded = True
- IdleMock.__init__(self)
- Mock.__init__(self, *args)
-
- def on_connect(self, success, *args):
- '''
- Method called after connecting - after receiving <stream:features>
- from server (NOT after TLS stream restart) or connect failure
- '''
- self.connect_succeeded = success
- self.set_event()
-
-
- def on_auth(self, con, auth):
- '''
- Method called after authentication, regardless of the result.
-
- :Parameters:
- con : NonBlockingClient
- reference to authenticated object
- auth : string
- type of authetication in case of success ('old_auth', 'sasl') or
- None in case of auth failure
- '''
- self.auth_connection = con
- self.auth = auth
- self.set_event()
-
-# vim: se ts=3:
+ '''
+ Class simulating Connection class from src/common/connection.py
+
+ It is derived from Mock in order to avoid defining all methods
+ from real Connection that are called from NBClient or Dispatcher
+ ( _event_dispatcher for example)
+ '''
+
+ def __init__(self, *args):
+ self.connect_succeeded = True
+ IdleMock.__init__(self)
+ Mock.__init__(self, *args)
+
+ def on_connect(self, success, *args):
+ '''
+ Method called after connecting - after receiving <stream:features>
+ from server (NOT after TLS stream restart) or connect failure
+ '''
+ self.connect_succeeded = success
+ self.set_event()
+
+
+ def on_auth(self, con, auth):
+ '''
+ Method called after authentication, regardless of the result.
+
+ :Parameters:
+ con : NonBlockingClient
+ reference to authenticated object
+ auth : string
+ type of authetication in case of success ('old_auth', 'sasl') or
+ None in case of auth failure
+ '''
+ self.auth_connection = con
+ self.auth = auth
+ self.set_event()
diff --git a/test/runtests.py b/test/runtests.py
index 6bca37228..c2b0cbaa5 100755
--- a/test/runtests.py
+++ b/test/runtests.py
@@ -14,55 +14,53 @@ use_x = True
verbose = 1
try:
- shortargs = 'hnv:'
- longargs = 'help no-x verbose='
- opts, args = getopt.getopt(sys.argv[1:], shortargs, longargs.split())
+ shortargs = 'hnv:'
+ longargs = 'help no-x verbose='
+ opts, args = getopt.getopt(sys.argv[1:], shortargs, longargs.split())
except getopt.error, msg:
- print msg
- print 'for help use --help'
- sys.exit(2)
+ print msg
+ print 'for help use --help'
+ sys.exit(2)
for o, a in opts:
- if o in ('-h', '--help'):
- print 'runtests [--help] [--no-x] [--verbose level]'
- sys.exit()
- elif o in ('-n', '--no-x'):
- use_x = False
- elif o in ('-v', '--verbose'):
- try:
- verbose = int(a)
- except Exception:
- print 'verbose must be a number >= 0'
- sys.exit(2)
+ if o in ('-h', '--help'):
+ print 'runtests [--help] [--no-x] [--verbose level]'
+ sys.exit()
+ elif o in ('-n', '--no-x'):
+ use_x = False
+ elif o in ('-v', '--verbose'):
+ try:
+ verbose = int(a)
+ except Exception:
+ print 'verbose must be a number >= 0'
+ sys.exit(2)
# new test modules need to be added manually
modules = ( 'unit.test_xmpp_dispatcher_nb',
- 'unit.test_xmpp_transports_nb',
- 'unit.test_protocol_caps',
- 'unit.test_caps_cache',
- 'unit.test_contacts',
- 'unit.test_sessions',
- 'unit.test_account',
- 'unit.test_gui_interface',
- )
+ 'unit.test_xmpp_transports_nb',
+ 'unit.test_protocol_caps',
+ 'unit.test_caps_cache',
+ 'unit.test_contacts',
+ 'unit.test_sessions',
+ 'unit.test_account',
+ 'unit.test_gui_interface',
+ )
#modules = ()
if use_x:
- modules += ('integration.test_gui_event_integration',
- 'integration.test_roster',
- 'integration.test_resolver',
- 'integration.test_xmpp_client_nb',
- 'integration.test_xmpp_transports_nb'
- )
+ modules += ('integration.test_gui_event_integration',
+ 'integration.test_roster',
+ 'integration.test_resolver',
+ 'integration.test_xmpp_client_nb',
+ 'integration.test_xmpp_transports_nb'
+ )
nb_errors = 0
nb_failures = 0
for mod in modules:
- suite = unittest.defaultTestLoader.loadTestsFromName(mod)
- result = unittest.TextTestRunner(verbosity=verbose).run(suite)
- nb_errors += len(result.errors)
- nb_failures += len(result.failures)
+ suite = unittest.defaultTestLoader.loadTestsFromName(mod)
+ result = unittest.TextTestRunner(verbosity=verbose).run(suite)
+ nb_errors += len(result.errors)
+ nb_failures += len(result.failures)
sys.exit(nb_errors + nb_failures)
-
-# vim: se ts=3:
diff --git a/test/unit/__init__.py b/test/unit/__init__.py
index 8e5f2ad7c..0252a7b2f 100644
--- a/test/unit/__init__.py
+++ b/test/unit/__init__.py
@@ -2,4 +2,4 @@
This package just contains plain unit tests
-''' \ No newline at end of file
+'''
diff --git a/test/unit/test_account.py b/test/unit/test_account.py
index d655aae2f..962403752 100644
--- a/test/unit/test_account.py
+++ b/test/unit/test_account.py
@@ -10,12 +10,12 @@ from common.account import Account
class Test(unittest.TestCase):
- def testInstantiate(self):
- account = Account(name='MyAcc', contacts=None, gc_contacts=None)
-
- self.assertEquals('MyAcc', account.name)
- self.assertTrue(account.gc_contacts is None)
- self.assertTrue(account.contacts is None)
+ def testInstantiate(self):
+ account = Account(name='MyAcc', contacts=None, gc_contacts=None)
+
+ self.assertEquals('MyAcc', account.name)
+ self.assertTrue(account.gc_contacts is None)
+ self.assertTrue(account.contacts is None)
if __name__ == "__main__":
- unittest.main() \ No newline at end of file
+ unittest.main()
diff --git a/test/unit/test_caps_cache.py b/test/unit/test_caps_cache.py
index bb6069bc0..468d8f1c0 100644
--- a/test/unit/test_caps_cache.py
+++ b/test/unit/test_caps_cache.py
@@ -15,137 +15,135 @@ from mock import Mock
class CommonCapsTest(unittest.TestCase):
- def setUp(self):
- self.caps_method = 'sha-1'
- self.caps_hash = 'm3P2WeXPMGVH2tZPe7yITnfY0Dw='
- self.client_caps = (self.caps_method, self.caps_hash)
+ def setUp(self):
+ self.caps_method = 'sha-1'
+ self.caps_hash = 'm3P2WeXPMGVH2tZPe7yITnfY0Dw='
+ self.client_caps = (self.caps_method, self.caps_hash)
- self.node = "http://gajim.org"
- self.identity = {'category': 'client', 'type': 'pc', 'name':'Gajim'}
+ self.node = "http://gajim.org"
+ self.identity = {'category': 'client', 'type': 'pc', 'name':'Gajim'}
- self.identities = [self.identity]
- self.features = [NS_MUC, NS_XHTML_IM] # NS_MUC not supported!
+ self.identities = [self.identity]
+ self.features = [NS_MUC, NS_XHTML_IM] # NS_MUC not supported!
- # Simulate a filled db
- db_caps_cache = [
- (self.caps_method, self.caps_hash, self.identities, self.features),
- ('old', self.node + '#' + self.caps_hash, self.identities, self.features)]
- self.logger = Mock(returnValues={"iter_caps_data":db_caps_cache})
+ # Simulate a filled db
+ db_caps_cache = [
+ (self.caps_method, self.caps_hash, self.identities, self.features),
+ ('old', self.node + '#' + self.caps_hash, self.identities, self.features)]
+ self.logger = Mock(returnValues={"iter_caps_data":db_caps_cache})
- self.cc = caps.CapsCache(self.logger)
- caps.capscache = self.cc
+ self.cc = caps.CapsCache(self.logger)
+ caps.capscache = self.cc
class TestCapsCache(CommonCapsTest):
- def test_set_retrieve(self):
- ''' Test basic set / retrieve cycle '''
+ def test_set_retrieve(self):
+ ''' Test basic set / retrieve cycle '''
- self.cc[self.client_caps].identities = self.identities
- self.cc[self.client_caps].features = self.features
+ self.cc[self.client_caps].identities = self.identities
+ self.cc[self.client_caps].features = self.features
- self.assert_(NS_MUC in self.cc[self.client_caps].features)
- self.assert_(NS_PING not in self.cc[self.client_caps].features)
+ self.assert_(NS_MUC in self.cc[self.client_caps].features)
+ self.assert_(NS_PING not in self.cc[self.client_caps].features)
- identities = self.cc[self.client_caps].identities
+ identities = self.cc[self.client_caps].identities
- self.assertEqual(1, len(identities))
+ self.assertEqual(1, len(identities))
- identity = identities[0]
- self.assertEqual('client', identity['category'])
- self.assertEqual('pc', identity['type'])
+ identity = identities[0]
+ self.assertEqual('client', identity['category'])
+ self.assertEqual('pc', identity['type'])
- def test_set_and_store(self):
- ''' Test client_caps update gets logged into db '''
+ def test_set_and_store(self):
+ ''' Test client_caps update gets logged into db '''
- item = self.cc[self.client_caps]
- item.set_and_store(self.identities, self.features)
+ item = self.cc[self.client_caps]
+ item.set_and_store(self.identities, self.features)
- self.logger.mockCheckCall(0, "add_caps_entry", self.caps_method,
- self.caps_hash, self.identities, self.features)
+ self.logger.mockCheckCall(0, "add_caps_entry", self.caps_method,
+ self.caps_hash, self.identities, self.features)
- def test_initialize_from_db(self):
- ''' Read cashed dummy data from db '''
- self.assertEqual(self.cc[self.client_caps].status, caps.NEW)
- self.cc.initialize_from_db()
- self.assertEqual(self.cc[self.client_caps].status, caps.CACHED)
+ def test_initialize_from_db(self):
+ ''' Read cashed dummy data from db '''
+ self.assertEqual(self.cc[self.client_caps].status, caps.NEW)
+ self.cc.initialize_from_db()
+ self.assertEqual(self.cc[self.client_caps].status, caps.CACHED)
- def test_preload_triggering_query(self):
- ''' Make sure that preload issues a disco '''
- connection = Mock()
- client_caps = caps.ClientCaps(self.caps_hash, self.node, self.caps_method)
+ def test_preload_triggering_query(self):
+ ''' Make sure that preload issues a disco '''
+ connection = Mock()
+ client_caps = caps.ClientCaps(self.caps_hash, self.node, self.caps_method)
- self.cc.query_client_of_jid_if_unknown(connection, "test@gajim.org",
- client_caps)
+ self.cc.query_client_of_jid_if_unknown(connection, "test@gajim.org",
+ client_caps)
- self.assertEqual(1, len(connection.mockGetAllCalls()))
+ self.assertEqual(1, len(connection.mockGetAllCalls()))
- def test_no_preload_query_if_cashed(self):
- ''' Preload must not send a query if the data is already cached '''
- connection = Mock()
- client_caps = caps.ClientCaps(self.caps_hash, self.node, self.caps_method)
+ def test_no_preload_query_if_cashed(self):
+ ''' Preload must not send a query if the data is already cached '''
+ connection = Mock()
+ client_caps = caps.ClientCaps(self.caps_hash, self.node, self.caps_method)
- self.cc.initialize_from_db()
- self.cc.query_client_of_jid_if_unknown(connection, "test@gajim.org",
- client_caps)
+ self.cc.initialize_from_db()
+ self.cc.query_client_of_jid_if_unknown(connection, "test@gajim.org",
+ client_caps)
- self.assertEqual(0, len(connection.mockGetAllCalls()))
+ self.assertEqual(0, len(connection.mockGetAllCalls()))
- def test_hash(self):
- '''tests the hash computation'''
- computed_hash = caps.compute_caps_hash(self.identities, self.features)
- self.assertEqual(self.caps_hash, computed_hash)
+ def test_hash(self):
+ '''tests the hash computation'''
+ computed_hash = caps.compute_caps_hash(self.identities, self.features)
+ self.assertEqual(self.caps_hash, computed_hash)
class TestClientCaps(CommonCapsTest):
- def setUp(self):
- CommonCapsTest.setUp(self)
- self.client_caps = caps.ClientCaps(self.caps_hash, self.node, self.caps_method)
+ def setUp(self):
+ CommonCapsTest.setUp(self)
+ self.client_caps = caps.ClientCaps(self.caps_hash, self.node, self.caps_method)
- def test_query_by_get_discover_strategy(self):
- ''' Client must be queried if the data is unkown '''
- connection = Mock()
- discover = self.client_caps.get_discover_strategy()
- discover(connection, "test@gajim.org")
+ def test_query_by_get_discover_strategy(self):
+ ''' Client must be queried if the data is unkown '''
+ connection = Mock()
+ discover = self.client_caps.get_discover_strategy()
+ discover(connection, "test@gajim.org")
- connection.mockCheckCall(0, "discoverInfo", "test@gajim.org",
- "http://gajim.org#m3P2WeXPMGVH2tZPe7yITnfY0Dw=")
+ connection.mockCheckCall(0, "discoverInfo", "test@gajim.org",
+ "http://gajim.org#m3P2WeXPMGVH2tZPe7yITnfY0Dw=")
- def test_client_supports(self):
- self.assertTrue(caps.client_supports(self.client_caps, NS_PING),
- msg="Assume supported, if we don't have caps")
+ def test_client_supports(self):
+ self.assertTrue(caps.client_supports(self.client_caps, NS_PING),
+ msg="Assume supported, if we don't have caps")
- self.assertFalse(caps.client_supports(self.client_caps, NS_XHTML_IM),
- msg="Must not assume blacklisted feature is supported on default")
+ self.assertFalse(caps.client_supports(self.client_caps, NS_XHTML_IM),
+ msg="Must not assume blacklisted feature is supported on default")
- self.cc.initialize_from_db()
+ self.cc.initialize_from_db()
- self.assertFalse(caps.client_supports(self.client_caps, NS_PING),
- msg="Must return false on unsupported feature")
+ self.assertFalse(caps.client_supports(self.client_caps, NS_PING),
+ msg="Must return false on unsupported feature")
- self.assertTrue(caps.client_supports(self.client_caps, NS_XHTML_IM),
- msg="Must return True on supported feature")
+ self.assertTrue(caps.client_supports(self.client_caps, NS_XHTML_IM),
+ msg="Must return True on supported feature")
- self.assertTrue(caps.client_supports(self.client_caps, NS_MUC),
- msg="Must return True on supported feature")
+ self.assertTrue(caps.client_supports(self.client_caps, NS_MUC),
+ msg="Must return True on supported feature")
class TestOldClientCaps(TestClientCaps):
- def setUp(self):
- TestClientCaps.setUp(self)
- self.client_caps = caps.OldClientCaps(self.caps_hash, self.node)
+ def setUp(self):
+ TestClientCaps.setUp(self)
+ self.client_caps = caps.OldClientCaps(self.caps_hash, self.node)
- def test_query_by_get_discover_strategy(self):
- ''' Client must be queried if the data is unknown '''
- connection = Mock()
- discover = self.client_caps.get_discover_strategy()
- discover(connection, "test@gajim.org")
+ def test_query_by_get_discover_strategy(self):
+ ''' Client must be queried if the data is unknown '''
+ connection = Mock()
+ discover = self.client_caps.get_discover_strategy()
+ discover(connection, "test@gajim.org")
- connection.mockCheckCall(0, "discoverInfo", "test@gajim.org")
+ connection.mockCheckCall(0, "discoverInfo", "test@gajim.org")
if __name__ == '__main__':
- unittest.main()
-
-# vim: se ts=3:
+ unittest.main()
diff --git a/test/unit/test_contacts.py b/test/unit/test_contacts.py
index ecfe793c8..957f54e42 100644
--- a/test/unit/test_contacts.py
+++ b/test/unit/test_contacts.py
@@ -12,118 +12,118 @@ from common.xmpp import NS_MUC
from common import caps_cache
class TestCommonContact(unittest.TestCase):
-
- def setUp(self):
- self.contact = CommonContact(jid='', account="", resource='', show='',
- status='', name='', our_chatstate=None, composing_xep=None,
- chatstate=None, client_caps=None)
-
- def test_default_client_supports(self):
- '''
- Test the caps support method of contacts.
- See test_caps for more enhanced tests.
- '''
- caps_cache.capscache = caps_cache.CapsCache()
- self.assertTrue(self.contact.supports(NS_MUC),
- msg="Must not backtrace on simple check for supported feature")
-
- self.contact.client_caps = caps_cache.NullClientCaps()
-
- self.assertTrue(self.contact.supports(NS_MUC),
- msg="Must not backtrace on simple check for supported feature")
-
-
+
+ def setUp(self):
+ self.contact = CommonContact(jid='', account="", resource='', show='',
+ status='', name='', our_chatstate=None, composing_xep=None,
+ chatstate=None, client_caps=None)
+
+ def test_default_client_supports(self):
+ '''
+ Test the caps support method of contacts.
+ See test_caps for more enhanced tests.
+ '''
+ caps_cache.capscache = caps_cache.CapsCache()
+ self.assertTrue(self.contact.supports(NS_MUC),
+ msg="Must not backtrace on simple check for supported feature")
+
+ self.contact.client_caps = caps_cache.NullClientCaps()
+
+ self.assertTrue(self.contact.supports(NS_MUC),
+ msg="Must not backtrace on simple check for supported feature")
+
+
class TestContact(TestCommonContact):
-
- def setUp(self):
- TestCommonContact.setUp(self)
- self.contact = Contact(jid="test@gajim.org", account="account")
-
- def test_attributes_available(self):
- '''This test supports the migration from the old to the new contact
- domain model by smoke testing that no attribute values are lost'''
-
- attributes = ["jid", "resource", "show", "status", "name", "our_chatstate",
- "composing_xep", "chatstate", "client_caps", "priority", "sub"]
- for attr in attributes:
- self.assertTrue(hasattr(self.contact, attr), msg="expected: " + attr)
-
+
+ def setUp(self):
+ TestCommonContact.setUp(self)
+ self.contact = Contact(jid="test@gajim.org", account="account")
+
+ def test_attributes_available(self):
+ '''This test supports the migration from the old to the new contact
+ domain model by smoke testing that no attribute values are lost'''
+
+ attributes = ["jid", "resource", "show", "status", "name", "our_chatstate",
+ "composing_xep", "chatstate", "client_caps", "priority", "sub"]
+ for attr in attributes:
+ self.assertTrue(hasattr(self.contact, attr), msg="expected: " + attr)
+
class TestGC_Contact(TestCommonContact):
- def setUp(self):
- TestCommonContact.setUp(self)
- self.contact = GC_Contact(room_jid="confernce@gajim.org", account="account")
-
- def test_attributes_available(self):
- '''This test supports the migration from the old to the new contact
- domain model by asserting no attributes have been lost'''
-
- attributes = ["jid", "resource", "show", "status", "name", "our_chatstate",
- "composing_xep", "chatstate", "client_caps", "role", "room_jid"]
- for attr in attributes:
- self.assertTrue(hasattr(self.contact, attr), msg="expected: " + attr)
-
-
+ def setUp(self):
+ TestCommonContact.setUp(self)
+ self.contact = GC_Contact(room_jid="confernce@gajim.org", account="account")
+
+ def test_attributes_available(self):
+ '''This test supports the migration from the old to the new contact
+ domain model by asserting no attributes have been lost'''
+
+ attributes = ["jid", "resource", "show", "status", "name", "our_chatstate",
+ "composing_xep", "chatstate", "client_caps", "role", "room_jid"]
+ for attr in attributes:
+ self.assertTrue(hasattr(self.contact, attr), msg="expected: " + attr)
+
+
class TestContacts(unittest.TestCase):
-
- def setUp(self):
- self.contacts = LegacyContactsAPI()
-
- def test_create_add_get_contact(self):
- jid = 'test@gajim.org'
- account = "account"
-
- contact = self.contacts.create_contact(jid=jid, account=account)
- self.contacts.add_contact(account, contact)
-
- retrieved_contact = self.contacts.get_contact(account, jid)
- self.assertEqual(contact, retrieved_contact, "Contact must be known")
-
- self.contacts.remove_contact(account, contact)
-
- retrieved_contact = self.contacts.get_contact(account, jid)
- self.assertNotEqual(contact, retrieved_contact,
- msg="Contact must not be known any longer")
-
-
- def test_copy_contact(self):
- jid = 'test@gajim.org'
- account = "account"
-
- contact = self.contacts.create_contact(jid=jid, account=account)
- copy = self.contacts.copy_contact(contact)
- self.assertFalse(contact is copy, msg="Must not be the same")
-
- # Not yet implemented to remain backwart compatible
- # self.assertEqual(contact, copy, msg="Must be equal")
-
- def test_legacy_accounts_handling(self):
- self.contacts.add_account("one")
- self.contacts.add_account("two")
-
- self.contacts.change_account_name("two", "old")
- self.contacts.remove_account("one")
-
- self.assertEqual(["old"], self.contacts.get_accounts())
-
- def test_legacy_contacts_from_groups(self):
- jid1 = "test1@gajim.org"
- jid2 = "test2@gajim.org"
- account = "account"
- group = "GroupA"
-
- contact1 = self.contacts.create_contact(jid=jid1, account=account,
- groups=[group])
- self.contacts.add_contact(account, contact1)
-
- contact2 = self.contacts.create_contact(jid=jid2, account=account,
- groups=[group])
- self.contacts.add_contact(account, contact2)
-
- self.assertEqual(2, len(self.contacts.get_contacts_from_group(account, group)))
- self.assertEqual(0, len(self.contacts.get_contacts_from_group(account, '')))
-
-
+
+ def setUp(self):
+ self.contacts = LegacyContactsAPI()
+
+ def test_create_add_get_contact(self):
+ jid = 'test@gajim.org'
+ account = "account"
+
+ contact = self.contacts.create_contact(jid=jid, account=account)
+ self.contacts.add_contact(account, contact)
+
+ retrieved_contact = self.contacts.get_contact(account, jid)
+ self.assertEqual(contact, retrieved_contact, "Contact must be known")
+
+ self.contacts.remove_contact(account, contact)
+
+ retrieved_contact = self.contacts.get_contact(account, jid)
+ self.assertNotEqual(contact, retrieved_contact,
+ msg="Contact must not be known any longer")
+
+
+ def test_copy_contact(self):
+ jid = 'test@gajim.org'
+ account = "account"
+
+ contact = self.contacts.create_contact(jid=jid, account=account)
+ copy = self.contacts.copy_contact(contact)
+ self.assertFalse(contact is copy, msg="Must not be the same")
+
+ # Not yet implemented to remain backwart compatible
+ # self.assertEqual(contact, copy, msg="Must be equal")
+
+ def test_legacy_accounts_handling(self):
+ self.contacts.add_account("one")
+ self.contacts.add_account("two")
+
+ self.contacts.change_account_name("two", "old")
+ self.contacts.remove_account("one")
+
+ self.assertEqual(["old"], self.contacts.get_accounts())
+
+ def test_legacy_contacts_from_groups(self):
+ jid1 = "test1@gajim.org"
+ jid2 = "test2@gajim.org"
+ account = "account"
+ group = "GroupA"
+
+ contact1 = self.contacts.create_contact(jid=jid1, account=account,
+ groups=[group])
+ self.contacts.add_contact(account, contact1)
+
+ contact2 = self.contacts.create_contact(jid=jid2, account=account,
+ groups=[group])
+ self.contacts.add_contact(account, contact2)
+
+ self.assertEqual(2, len(self.contacts.get_contacts_from_group(account, group)))
+ self.assertEqual(0, len(self.contacts.get_contacts_from_group(account, '')))
+
+
if __name__ == "__main__":
- unittest.main() \ No newline at end of file
+ unittest.main()
diff --git a/test/unit/test_gui_interface.py b/test/unit/test_gui_interface.py
index d4f1ef4a4..b892f7bd0 100644
--- a/test/unit/test_gui_interface.py
+++ b/test/unit/test_gui_interface.py
@@ -9,7 +9,7 @@ lib.setup_env()
from common import logging_helpers
logging_helpers.set_quiet()
-from common import gajim
+from common import gajim
from gajim_mocks import MockLogger
gajim.logger = MockLogger()
@@ -18,98 +18,98 @@ from gui_interface import Interface
class TestInterface(unittest.TestCase):
- def test_instantiation(self):
- ''' Test that we can proper initialize and do not fail on globals '''
- interface = Interface()
- interface.run()
-
- def test_dispatch(self):
- ''' Test dispatcher forwarding network events to handler_* methods '''
- sut = Interface()
-
- success = sut.dispatch('No Such Event', None, None)
- self.assertFalse(success, msg="Unexisting event handled")
-
- success = sut.dispatch('STANZA_ARRIVED', None, None)
- self.assertTrue(success, msg="Existing event must be handled")
-
- def test_register_unregister_single_handler(self):
- ''' Register / Unregister a custom event handler '''
- sut = Interface()
- event = 'TESTS_ARE_COOL_EVENT'
-
- self.called = False
- def handler(account, data):
- self.assertEqual(account, 'account')
- self.assertEqual(data, 'data')
- self.called = True
-
- self.assertFalse(self.called)
- sut.register_handler('TESTS_ARE_COOL_EVENT', handler)
- sut.dispatch(event, 'account', 'data')
- self.assertTrue(self.called, msg="Handler should have been called")
-
- self.called = False
- sut.unregister_handler('TESTS_ARE_COOL_EVENT', handler)
- sut.dispatch(event, 'account', 'data')
- self.assertFalse(self.called, msg="Handler should no longer be called")
-
-
- def test_dispatch_to_multiple_handlers(self):
- ''' Register and dispatch a single event to multiple handlers '''
- sut = Interface()
- event = 'SINGLE_EVENT'
-
- self.called_a = False
- self.called_b = False
-
- def handler_a(account, data):
- self.assertFalse(self.called_a, msg="One must only be notified once")
- self.called_a = True
-
- def handler_b(account, data):
- self.assertFalse(self.called_b, msg="One must only be notified once")
- self.called_b = True
-
- sut.register_handler(event, handler_a)
- sut.register_handler(event, handler_b)
-
- # register again
- sut.register_handler('SOME_OTHER_EVENT', handler_b)
- sut.register_handler(event, handler_a)
-
- sut.dispatch(event, 'account', 'data')
- self.assertTrue(self.called_a and self.called_b,
- msg="Both handlers should have been called")
-
- def test_links_regexp_entire(self):
- sut = Interface()
- def assert_matches_all(str_):
- m = sut.basic_pattern_re.match(str_)
-
- # the match should equal the string
- str_span = (0, len(str_))
- self.assertEqual(m.span(), str_span)
-
- # these entire strings should be parsed as links
- assert_matches_all('http://google.com/')
- assert_matches_all('http://google.com')
- assert_matches_all('http://www.google.ca/search?q=xmpp')
-
- assert_matches_all('http://tools.ietf.org/html/draft-saintandre-rfc3920bis-05#section-12.3')
-
- assert_matches_all('http://en.wikipedia.org/wiki/Protocol_(computing)')
- assert_matches_all(
- 'http://en.wikipedia.org/wiki/Protocol_%28computing%29')
-
- assert_matches_all('mailto:test@example.org')
-
- assert_matches_all('xmpp:example-node@example.com')
- assert_matches_all('xmpp:example-node@example.com/some-resource')
- assert_matches_all('xmpp:example-node@example.com?message')
- assert_matches_all('xmpp://guest@example.com/support@example.com?message')
+ def test_instantiation(self):
+ ''' Test that we can proper initialize and do not fail on globals '''
+ interface = Interface()
+ interface.run()
+
+ def test_dispatch(self):
+ ''' Test dispatcher forwarding network events to handler_* methods '''
+ sut = Interface()
+
+ success = sut.dispatch('No Such Event', None, None)
+ self.assertFalse(success, msg="Unexisting event handled")
+
+ success = sut.dispatch('STANZA_ARRIVED', None, None)
+ self.assertTrue(success, msg="Existing event must be handled")
+
+ def test_register_unregister_single_handler(self):
+ ''' Register / Unregister a custom event handler '''
+ sut = Interface()
+ event = 'TESTS_ARE_COOL_EVENT'
+
+ self.called = False
+ def handler(account, data):
+ self.assertEqual(account, 'account')
+ self.assertEqual(data, 'data')
+ self.called = True
+
+ self.assertFalse(self.called)
+ sut.register_handler('TESTS_ARE_COOL_EVENT', handler)
+ sut.dispatch(event, 'account', 'data')
+ self.assertTrue(self.called, msg="Handler should have been called")
+
+ self.called = False
+ sut.unregister_handler('TESTS_ARE_COOL_EVENT', handler)
+ sut.dispatch(event, 'account', 'data')
+ self.assertFalse(self.called, msg="Handler should no longer be called")
+
+
+ def test_dispatch_to_multiple_handlers(self):
+ ''' Register and dispatch a single event to multiple handlers '''
+ sut = Interface()
+ event = 'SINGLE_EVENT'
+
+ self.called_a = False
+ self.called_b = False
+
+ def handler_a(account, data):
+ self.assertFalse(self.called_a, msg="One must only be notified once")
+ self.called_a = True
+
+ def handler_b(account, data):
+ self.assertFalse(self.called_b, msg="One must only be notified once")
+ self.called_b = True
+
+ sut.register_handler(event, handler_a)
+ sut.register_handler(event, handler_b)
+
+ # register again
+ sut.register_handler('SOME_OTHER_EVENT', handler_b)
+ sut.register_handler(event, handler_a)
+
+ sut.dispatch(event, 'account', 'data')
+ self.assertTrue(self.called_a and self.called_b,
+ msg="Both handlers should have been called")
+
+ def test_links_regexp_entire(self):
+ sut = Interface()
+ def assert_matches_all(str_):
+ m = sut.basic_pattern_re.match(str_)
+
+ # the match should equal the string
+ str_span = (0, len(str_))
+ self.assertEqual(m.span(), str_span)
+
+ # these entire strings should be parsed as links
+ assert_matches_all('http://google.com/')
+ assert_matches_all('http://google.com')
+ assert_matches_all('http://www.google.ca/search?q=xmpp')
+
+ assert_matches_all('http://tools.ietf.org/html/draft-saintandre-rfc3920bis-05#section-12.3')
+
+ assert_matches_all('http://en.wikipedia.org/wiki/Protocol_(computing)')
+ assert_matches_all(
+ 'http://en.wikipedia.org/wiki/Protocol_%28computing%29')
+
+ assert_matches_all('mailto:test@example.org')
+
+ assert_matches_all('xmpp:example-node@example.com')
+ assert_matches_all('xmpp:example-node@example.com/some-resource')
+ assert_matches_all('xmpp:example-node@example.com?message')
+ assert_matches_all('xmpp://guest@example.com/support@example.com?message')
if __name__ == "__main__":
- #import sys;sys.argv = ['', 'Test.test']
- unittest.main() \ No newline at end of file
+ #import sys;sys.argv = ['', 'Test.test']
+ unittest.main()
diff --git a/test/unit/test_protocol_caps.py b/test/unit/test_protocol_caps.py
index f2c7ae509..c527c148d 100644
--- a/test/unit/test_protocol_caps.py
+++ b/test/unit/test_protocol_caps.py
@@ -17,59 +17,57 @@ from common.xmpp import protocol
class TestableConnectionCaps(caps.ConnectionCaps):
- def __init__(self, *args, **kwargs):
- self._mocked_contacts = {}
- caps.ConnectionCaps.__init__(self, *args, **kwargs)
+ def __init__(self, *args, **kwargs):
+ self._mocked_contacts = {}
+ caps.ConnectionCaps.__init__(self, *args, **kwargs)
- def _get_contact_or_gc_contact_for_jid(self, jid):
- """
- Overwrite to decouple form contact handling
- """
- if jid not in self._mocked_contacts:
- self._mocked_contacts[jid] = Mock(realClass=Contact)
- self._mocked_contacts[jid].jid = jid
- return self._mocked_contacts[jid]
+ def _get_contact_or_gc_contact_for_jid(self, jid):
+ """
+ Overwrite to decouple form contact handling
+ """
+ if jid not in self._mocked_contacts:
+ self._mocked_contacts[jid] = Mock(realClass=Contact)
+ self._mocked_contacts[jid].jid = jid
+ return self._mocked_contacts[jid]
- def discoverInfo(self, *args, **kwargs):
- pass
+ def discoverInfo(self, *args, **kwargs):
+ pass
- def get_mocked_contact_for_jid(self, jid):
- return self._mocked_contacts[jid]
+ def get_mocked_contact_for_jid(self, jid):
+ return self._mocked_contacts[jid]
class TestConnectionCaps(unittest.TestCase):
- def test_capsPresenceCB(self):
- jid = "user@server.com/a"
- connection_caps = TestableConnectionCaps("account",
- self._build_assertering_dispatcher_function("CAPS_RECEIVED", jid),
- Mock(), caps_cache.create_suitable_client_caps)
-
- xml = """<presence from='user@server.com/a' to='%s' id='123'>
- <c node='http://gajim.org' ver='pRCD6cgQ4SDqNMCjdhRV6TECx5o='
- hash='sha-1' xmlns='http://jabber.org/protocol/caps'/>
- </presence>
- """ % (jid)
- iq = protocol.Iq(node=simplexml.XML2Node(xml))
- connection_caps._capsPresenceCB(None, iq)
-
- self.assertTrue(self._dispatcher_called, msg="Must have received caps")
-
- client_caps = connection_caps.get_mocked_contact_for_jid(jid).client_caps
- self.assertTrue(client_caps, msg="Client caps must be set")
- self.assertFalse(isinstance(client_caps, caps_cache.NullClientCaps),
- msg="On receive of proper caps, we must not use the fallback")
-
- def _build_assertering_dispatcher_function(self, expected_event, jid):
- self._dispatcher_called = False
- def dispatch(event, data):
- self.assertFalse(self._dispatcher_called, msg="Must only be called once")
- self._dispatcher_called = True
- self.assertEqual(expected_event, event)
- self.assertEqual(jid, data[0])
- return dispatch
-
-if __name__ == '__main__':
- unittest.main()
+ def test_capsPresenceCB(self):
+ jid = "user@server.com/a"
+ connection_caps = TestableConnectionCaps("account",
+ self._build_assertering_dispatcher_function("CAPS_RECEIVED", jid),
+ Mock(), caps_cache.create_suitable_client_caps)
+
+ xml = """<presence from='user@server.com/a' to='%s' id='123'>
+ <c node='http://gajim.org' ver='pRCD6cgQ4SDqNMCjdhRV6TECx5o='
+ hash='sha-1' xmlns='http://jabber.org/protocol/caps'/>
+ </presence>
+ """ % (jid)
+ iq = protocol.Iq(node=simplexml.XML2Node(xml))
+ connection_caps._capsPresenceCB(None, iq)
+
+ self.assertTrue(self._dispatcher_called, msg="Must have received caps")
+
+ client_caps = connection_caps.get_mocked_contact_for_jid(jid).client_caps
+ self.assertTrue(client_caps, msg="Client caps must be set")
+ self.assertFalse(isinstance(client_caps, caps_cache.NullClientCaps),
+ msg="On receive of proper caps, we must not use the fallback")
+
+ def _build_assertering_dispatcher_function(self, expected_event, jid):
+ self._dispatcher_called = False
+ def dispatch(event, data):
+ self.assertFalse(self._dispatcher_called, msg="Must only be called once")
+ self._dispatcher_called = True
+ self.assertEqual(expected_event, event)
+ self.assertEqual(jid, data[0])
+ return dispatch
-# vim: se ts=3:
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/unit/test_sessions.py b/test/unit/test_sessions.py
index 724bac56d..05a4351a7 100644
--- a/test/unit/test_sessions.py
+++ b/test/unit/test_sessions.py
@@ -24,191 +24,189 @@ gajim.interface = MockInterface()
account_name = 'test'
class TestStanzaSession(unittest.TestCase):
- ''' Testclass for common/stanzasession.py '''
+ ''' Testclass for common/stanzasession.py '''
- def setUp(self):
- self.jid = 'test@example.org/Gajim'
- self.conn = MockConnection(account_name, {'send_stanza': None})
- self.sess = StanzaSession(self.conn, self.jid, None, 'chat')
+ def setUp(self):
+ self.jid = 'test@example.org/Gajim'
+ self.conn = MockConnection(account_name, {'send_stanza': None})
+ self.sess = StanzaSession(self.conn, self.jid, None, 'chat')
- def test_generate_thread_id(self):
- # thread_id is a string
- self.assert_(isinstance(self.sess.thread_id, str))
+ def test_generate_thread_id(self):
+ # thread_id is a string
+ self.assert_(isinstance(self.sess.thread_id, str))
- # it should be somewhat long, to avoid clashes
- self.assert_(len(self.sess.thread_id) >= 32)
+ # it should be somewhat long, to avoid clashes
+ self.assert_(len(self.sess.thread_id) >= 32)
- def test_is_loggable(self):
- # by default a session should be loggable
- # (unless the no_log_for setting says otherwise)
- self.assert_(self.sess.is_loggable())
+ def test_is_loggable(self):
+ # by default a session should be loggable
+ # (unless the no_log_for setting says otherwise)
+ self.assert_(self.sess.is_loggable())
- def test_terminate(self):
- # termination is sent by default
- self.sess.last_send = time.time()
- self.sess.terminate()
+ def test_terminate(self):
+ # termination is sent by default
+ self.sess.last_send = time.time()
+ self.sess.terminate()
- self.assertEqual(None, self.sess.status)
+ self.assertEqual(None, self.sess.status)
- calls = self.conn.mockGetNamedCalls('send_stanza')
- msg = calls[0].getParam(0)
+ calls = self.conn.mockGetNamedCalls('send_stanza')
+ msg = calls[0].getParam(0)
- self.assertEqual(msg.getThread(), self.sess.thread_id)
+ self.assertEqual(msg.getThread(), self.sess.thread_id)
- def test_terminate_without_sending(self):
- # no termination is sent if no messages have been sent in the session
- self.sess.terminate()
+ def test_terminate_without_sending(self):
+ # no termination is sent if no messages have been sent in the session
+ self.sess.terminate()
- self.assertEqual(None, self.sess.status)
+ self.assertEqual(None, self.sess.status)
- calls = self.conn.mockGetNamedCalls('send_stanza')
- self.assertEqual(0, len(calls))
+ calls = self.conn.mockGetNamedCalls('send_stanza')
+ self.assertEqual(0, len(calls))
- def test_terminate_no_remote_xep_201(self):
- # no termination is sent if only messages without thread ids have been
- # received
- self.sess.last_send = time.time()
- self.sess.last_receive = time.time()
- self.sess.terminate()
+ def test_terminate_no_remote_xep_201(self):
+ # no termination is sent if only messages without thread ids have been
+ # received
+ self.sess.last_send = time.time()
+ self.sess.last_receive = time.time()
+ self.sess.terminate()
- self.assertEqual(None, self.sess.status)
+ self.assertEqual(None, self.sess.status)
- calls = self.conn.mockGetNamedCalls('send_stanza')
- self.assertEqual(0, len(calls))
+ calls = self.conn.mockGetNamedCalls('send_stanza')
+ self.assertEqual(0, len(calls))
class TestChatControlSession(unittest.TestCase):
- ''' Testclass for session.py '''
+ ''' Testclass for session.py '''
- def setUp(self):
- self.jid = 'test@example.org/Gajim'
- self.conn = MockConnection(account_name, {'send_stanza': None})
- self.sess = ChatControlSession(self.conn, self.jid, None)
- gajim.logger = MockLogger()
-
- # initially there are no events
- self.assertEqual(0, len(gajim.events.get_events(account_name)))
+ def setUp(self):
+ self.jid = 'test@example.org/Gajim'
+ self.conn = MockConnection(account_name, {'send_stanza': None})
+ self.sess = ChatControlSession(self.conn, self.jid, None)
+ gajim.logger = MockLogger()
+
+ # initially there are no events
+ self.assertEqual(0, len(gajim.events.get_events(account_name)))
- # no notifications have been sent
- self.assertEqual(0, len(notify.notifications))
+ # no notifications have been sent
+ self.assertEqual(0, len(notify.notifications))
- def tearDown(self):
- # remove all events and notifications that were added
- gajim.events._events = {}
- notify.notifications = []
+ def tearDown(self):
+ # remove all events and notifications that were added
+ gajim.events._events = {}
+ notify.notifications = []
- def receive_chat_msg(self, jid, msgtxt):
- '''simulate receiving a chat message from jid'''
- msg = xmpp.Message()
- msg.setBody(msgtxt)
- msg.setType('chat')
+ def receive_chat_msg(self, jid, msgtxt):
+ '''simulate receiving a chat message from jid'''
+ msg = xmpp.Message()
+ msg.setBody(msgtxt)
+ msg.setType('chat')
- tim = time.localtime()
- encrypted = False
- self.sess.received(jid, msgtxt, tim, encrypted, msg)
+ tim = time.localtime()
+ encrypted = False
+ self.sess.received(jid, msgtxt, tim, encrypted, msg)
- # ----- custom assertions -----
- def assert_new_message_notification(self):
- '''a new_message notification has been sent'''
- self.assertEqual(1, len(notify.notifications))
- notif = notify.notifications[0]
- self.assertEqual('new_message', notif[0])
+ # ----- custom assertions -----
+ def assert_new_message_notification(self):
+ '''a new_message notification has been sent'''
+ self.assertEqual(1, len(notify.notifications))
+ notif = notify.notifications[0]
+ self.assertEqual('new_message', notif[0])
- def assert_first_message_notification(self):
- '''this message was treated as a first message'''
- self.assert_new_message_notification()
- notif = notify.notifications[0]
- params = notif[3]
- first = params[1]
- self.assert_(first, 'message should have been treated as a first message')
+ def assert_first_message_notification(self):
+ '''this message was treated as a first message'''
+ self.assert_new_message_notification()
+ notif = notify.notifications[0]
+ params = notif[3]
+ first = params[1]
+ self.assert_(first, 'message should have been treated as a first message')
- def assert_not_first_message_notification(self):
- '''this message was not treated as a first message'''
- self.assert_new_message_notification()
- notif = notify.notifications[0]
- params = notif[3]
- first = params[1]
- self.assert_(not first,
- 'message was unexpectedly treated as a first message')
+ def assert_not_first_message_notification(self):
+ '''this message was not treated as a first message'''
+ self.assert_new_message_notification()
+ notif = notify.notifications[0]
+ params = notif[3]
+ first = params[1]
+ self.assert_(not first,
+ 'message was unexpectedly treated as a first message')
- # ----- tests -----
- def test_receive_nocontrol(self):
- '''test receiving a message in a blank state'''
- jid = 'bct@necronomicorp.com/Gajim'
- msgtxt = 'testing one two three'
+ # ----- tests -----
+ def test_receive_nocontrol(self):
+ '''test receiving a message in a blank state'''
+ jid = 'bct@necronomicorp.com/Gajim'
+ msgtxt = 'testing one two three'
- self.receive_chat_msg(jid, msgtxt)
+ self.receive_chat_msg(jid, msgtxt)
- # message was logged
- calls = gajim.logger.mockGetNamedCalls('write')
- self.assertEqual(1, len(calls))
+ # message was logged
+ calls = gajim.logger.mockGetNamedCalls('write')
+ self.assertEqual(1, len(calls))
- # no ChatControl was open and autopopup was off
- # so the message goes into the event queue
- self.assertEqual(1, len(gajim.events.get_events(account_name)))
+ # no ChatControl was open and autopopup was off
+ # so the message goes into the event queue
+ self.assertEqual(1, len(gajim.events.get_events(account_name)))
- self.assert_first_message_notification()
+ self.assert_first_message_notification()
- # no control is attached to the session
- self.assertEqual(None, self.sess.control)
+ # no control is attached to the session
+ self.assertEqual(None, self.sess.control)
- def test_receive_already_has_control(self):
- '''test receiving a message with a session already attached to a
- control'''
+ def test_receive_already_has_control(self):
+ '''test receiving a message with a session already attached to a
+ control'''
- jid = 'bct@necronomicorp.com/Gajim'
- msgtxt = 'testing one two three'
+ jid = 'bct@necronomicorp.com/Gajim'
+ msgtxt = 'testing one two three'
- self.sess.control = MockChatControl(jid, account_name)
+ self.sess.control = MockChatControl(jid, account_name)
- self.receive_chat_msg(jid, msgtxt)
+ self.receive_chat_msg(jid, msgtxt)
- # message was logged
- calls = gajim.logger.mockGetNamedCalls('write')
- self.assertEqual(1, len(calls))
+ # message was logged
+ calls = gajim.logger.mockGetNamedCalls('write')
+ self.assertEqual(1, len(calls))
- # the message does not go into the event queue
- self.assertEqual(0, len(gajim.events.get_events(account_name)))
+ # the message does not go into the event queue
+ self.assertEqual(0, len(gajim.events.get_events(account_name)))
- self.assert_not_first_message_notification()
+ self.assert_not_first_message_notification()
- # message was printed to the control
- calls = self.sess.control.mockGetNamedCalls('print_conversation')
- self.assertEqual(1, len(calls))
+ # message was printed to the control
+ calls = self.sess.control.mockGetNamedCalls('print_conversation')
+ self.assertEqual(1, len(calls))
- def test_received_orphaned_control(self):
- '''test receiving a message when a control that doesn't have a session
- attached exists'''
+ def test_received_orphaned_control(self):
+ '''test receiving a message when a control that doesn't have a session
+ attached exists'''
- jid = 'bct@necronomicorp.com'
- fjid = jid + '/Gajim'
- msgtxt = 'testing one two three'
+ jid = 'bct@necronomicorp.com'
+ fjid = jid + '/Gajim'
+ msgtxt = 'testing one two three'
- ctrl = MockChatControl(jid, account_name)
- gajim.interface.msg_win_mgr = Mock({'get_control': ctrl})
- gajim.interface.msg_win_mgr.mockSetExpectation('get_control',
- expectParams(jid, account_name))
+ ctrl = MockChatControl(jid, account_name)
+ gajim.interface.msg_win_mgr = Mock({'get_control': ctrl})
+ gajim.interface.msg_win_mgr.mockSetExpectation('get_control',
+ expectParams(jid, account_name))
- self.receive_chat_msg(fjid, msgtxt)
+ self.receive_chat_msg(fjid, msgtxt)
- # message was logged
- calls = gajim.logger.mockGetNamedCalls('write')
- self.assertEqual(1, len(calls))
+ # message was logged
+ calls = gajim.logger.mockGetNamedCalls('write')
+ self.assertEqual(1, len(calls))
- # the message does not go into the event queue
- self.assertEqual(0, len(gajim.events.get_events(account_name)))
+ # the message does not go into the event queue
+ self.assertEqual(0, len(gajim.events.get_events(account_name)))
- self.assert_not_first_message_notification()
+ self.assert_not_first_message_notification()
- # this session is now attached to that control
- self.assertEqual(self.sess, ctrl.session)
- self.assertEqual(ctrl, self.sess.control, 'foo')
+ # this session is now attached to that control
+ self.assertEqual(self.sess, ctrl.session)
+ self.assertEqual(ctrl, self.sess.control, 'foo')
- # message was printed to the control
- calls = ctrl.mockGetNamedCalls('print_conversation')
- self.assertEqual(1, len(calls))
+ # message was printed to the control
+ calls = ctrl.mockGetNamedCalls('print_conversation')
+ self.assertEqual(1, len(calls))
if __name__ == '__main__':
unittest.main()
-
-# vim: se ts=3:
diff --git a/test/unit/test_xmpp_dispatcher_nb.py b/test/unit/test_xmpp_dispatcher_nb.py
index 7d57cae6e..660b6d572 100644
--- a/test/unit/test_xmpp_dispatcher_nb.py
+++ b/test/unit/test_xmpp_dispatcher_nb.py
@@ -12,88 +12,86 @@ from common.xmpp import dispatcher_nb
from common.xmpp import protocol
class TestDispatcherNB(unittest.TestCase):
- '''
- Test class for NonBlocking dispatcher. Tested dispatcher will be plugged
- into a mock client
- '''
- def setUp(self):
- self.dispatcher = dispatcher_nb.XMPPDispatcher()
-
- # Setup mock client
- self.client = Mock()
- self.client.__str__ = lambda: 'Mock' # FIXME: why do I need this one?
- self.client._caller = Mock()
- self.client.defaultNamespace = protocol.NS_CLIENT
- self.client.Connection = Mock() # mock transport
- self.con = self.client.Connection
-
- def tearDown(self):
- # Unplug if needed
- if hasattr(self.dispatcher, '_owner'):
- self.dispatcher.PlugOut()
-
- def _simulate_connect(self):
- self.dispatcher.PlugIn(self.client) # client is owner
- # Simulate that we have established a connection
- self.dispatcher.StreamInit()
- self.dispatcher.ProcessNonBlocking("<stream:stream xmlns:stream='http://etherx.jabber.org/streams' xmlns='jabber:client'>")
-
- def test_unbound_namespace_prefix(self):
- '''tests our handling of a message with an unbound namespace prefix'''
- self._simulate_connect()
-
- msgs = []
- def _got_message(conn, msg):
- msgs.append(msg)
- self.dispatcher.RegisterHandler('message', _got_message)
-
- # should be able to parse a normal message
- self.dispatcher.ProcessNonBlocking('<message><body>hello</body></message>')
- self.assertEqual(1, len(msgs))
-
- self.dispatcher.ProcessNonBlocking('<message><x:y/></message>')
- self.assertEqual(2, len(msgs))
- # we should not have been disconnected after that message
- self.assertEqual(0, len(self.con.mockGetNamedCalls('pollend')))
- self.assertEqual(0, len(self.con.mockGetNamedCalls('disconnect')))
-
- # we should be able to keep parsing
- self.dispatcher.ProcessNonBlocking('<message><body>still here?</body></message>')
- self.assertEqual(3, len(msgs))
-
- def test_process_non_blocking(self):
- ''' Check for ProcessNonBlocking return types '''
- self._simulate_connect()
- process = self.dispatcher.ProcessNonBlocking
-
- # length of data expected
- data = "Please don't fail"
- result = process(data)
- self.assertEqual(result, len(data))
-
- # no data processed, link shall still be active
- result = process('')
- self.assertEqual(result, '0')
- self.assertEqual(0, len(self.con.mockGetNamedCalls('pollend')) +
- len(self.con.mockGetNamedCalls('disconnect')))
-
- # simulate disconnect
- result = process('</stream:stream>')
- self.assertEqual(1, len(self.client.mockGetNamedCalls('disconnect')))
-
- def test_return_stanza_handler(self):
- ''' Test sasl_error_conditions transformation in protocol.py '''
- # quick'n dirty...I wasn't aware of it existance and thought it would
- # always fail :-)
- self._simulate_connect()
- stanza = "<iq type='get' />"
- def send(data):
- self.assertEqual(str(data), '<iq xmlns="jabber:client" type="error"><error code="501" type="cancel"><feature-not-implemented xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" /><text xmlns="urn:ietf:params:xml:ns:xmpp-stanzas">The feature requested is not implemented by the recipient or server and therefore cannot be processed.</text></error></iq>')
- self.client.send = send
- self.dispatcher.ProcessNonBlocking(stanza)
+ '''
+ Test class for NonBlocking dispatcher. Tested dispatcher will be plugged
+ into a mock client
+ '''
+ def setUp(self):
+ self.dispatcher = dispatcher_nb.XMPPDispatcher()
+
+ # Setup mock client
+ self.client = Mock()
+ self.client.__str__ = lambda: 'Mock' # FIXME: why do I need this one?
+ self.client._caller = Mock()
+ self.client.defaultNamespace = protocol.NS_CLIENT
+ self.client.Connection = Mock() # mock transport
+ self.con = self.client.Connection
+
+ def tearDown(self):
+ # Unplug if needed
+ if hasattr(self.dispatcher, '_owner'):
+ self.dispatcher.PlugOut()
+
+ def _simulate_connect(self):
+ self.dispatcher.PlugIn(self.client) # client is owner
+ # Simulate that we have established a connection
+ self.dispatcher.StreamInit()
+ self.dispatcher.ProcessNonBlocking("<stream:stream xmlns:stream='http://etherx.jabber.org/streams' xmlns='jabber:client'>")
+
+ def test_unbound_namespace_prefix(self):
+ '''tests our handling of a message with an unbound namespace prefix'''
+ self._simulate_connect()
+
+ msgs = []
+ def _got_message(conn, msg):
+ msgs.append(msg)
+ self.dispatcher.RegisterHandler('message', _got_message)
+
+ # should be able to parse a normal message
+ self.dispatcher.ProcessNonBlocking('<message><body>hello</body></message>')
+ self.assertEqual(1, len(msgs))
+
+ self.dispatcher.ProcessNonBlocking('<message><x:y/></message>')
+ self.assertEqual(2, len(msgs))
+ # we should not have been disconnected after that message
+ self.assertEqual(0, len(self.con.mockGetNamedCalls('pollend')))
+ self.assertEqual(0, len(self.con.mockGetNamedCalls('disconnect')))
+
+ # we should be able to keep parsing
+ self.dispatcher.ProcessNonBlocking('<message><body>still here?</body></message>')
+ self.assertEqual(3, len(msgs))
+
+ def test_process_non_blocking(self):
+ ''' Check for ProcessNonBlocking return types '''
+ self._simulate_connect()
+ process = self.dispatcher.ProcessNonBlocking
+
+ # length of data expected
+ data = "Please don't fail"
+ result = process(data)
+ self.assertEqual(result, len(data))
+
+ # no data processed, link shall still be active
+ result = process('')
+ self.assertEqual(result, '0')
+ self.assertEqual(0, len(self.con.mockGetNamedCalls('pollend')) +
+ len(self.con.mockGetNamedCalls('disconnect')))
+
+ # simulate disconnect
+ result = process('</stream:stream>')
+ self.assertEqual(1, len(self.client.mockGetNamedCalls('disconnect')))
+
+ def test_return_stanza_handler(self):
+ ''' Test sasl_error_conditions transformation in protocol.py '''
+ # quick'n dirty...I wasn't aware of it existance and thought it would
+ # always fail :-)
+ self._simulate_connect()
+ stanza = "<iq type='get' />"
+ def send(data):
+ self.assertEqual(str(data), '<iq xmlns="jabber:client" type="error"><error code="501" type="cancel"><feature-not-implemented xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" /><text xmlns="urn:ietf:params:xml:ns:xmpp-stanzas">The feature requested is not implemented by the recipient or server and therefore cannot be processed.</text></error></iq>')
+ self.client.send = send
+ self.dispatcher.ProcessNonBlocking(stanza)
if __name__ == '__main__':
- unittest.main()
-
-# vim: se ts=3:
+ unittest.main()
diff --git a/test/unit/test_xmpp_transports_nb.py b/test/unit/test_xmpp_transports_nb.py
index b81f60d61..633549560 100644
--- a/test/unit/test_xmpp_transports_nb.py
+++ b/test/unit/test_xmpp_transports_nb.py
@@ -11,70 +11,68 @@ from common.xmpp import transports_nb
class TestModuleLevelFunctions(unittest.TestCase):
- '''
- Test class for functions defined at module level
- '''
- def test_urisplit(self):
- def check_uri(uri, proto, host, port, path):
- _proto, _host, _port, _path = transports_nb.urisplit(uri)
- self.assertEqual(proto, _proto)
- self.assertEqual(host, _host)
- self.assertEqual(path, _path)
- self.assertEqual(port, _port)
-
- check_uri('http://httpcm.jabber.org:5280/webclient', proto='http',
- host='httpcm.jabber.org', port=5280, path='/webclient')
-
- check_uri('http://httpcm.jabber.org/webclient', proto='http',
- host='httpcm.jabber.org', port=80, path='/webclient')
-
- check_uri('https://httpcm.jabber.org/webclient', proto='https',
- host='httpcm.jabber.org', port=443, path='/webclient')
+ '''
+ Test class for functions defined at module level
+ '''
+ def test_urisplit(self):
+ def check_uri(uri, proto, host, port, path):
+ _proto, _host, _port, _path = transports_nb.urisplit(uri)
+ self.assertEqual(proto, _proto)
+ self.assertEqual(host, _host)
+ self.assertEqual(path, _path)
+ self.assertEqual(port, _port)
- def test_get_proxy_data_from_dict(self):
- def check_dict(proxy_dict, host, port, user, passwd):
- _host, _port, _user, _passwd = transports_nb.get_proxy_data_from_dict(
- proxy_dict)
- self.assertEqual(_host, host)
- self.assertEqual(_port, port)
- self.assertEqual(_user, user)
- self.assertEqual(_passwd, passwd)
+ check_uri('http://httpcm.jabber.org:5280/webclient', proto='http',
+ host='httpcm.jabber.org', port=5280, path='/webclient')
- bosh_dict = {'bosh_content': u'text/xml; charset=utf-8',
- 'bosh_hold': 2,
- 'bosh_http_pipelining': False,
- 'bosh_uri': u'http://gajim.org:5280/http-bind',
- 'bosh_useproxy': False,
- 'bosh_wait': 30,
- 'bosh_wait_for_restart_response': False,
- 'host': u'172.16.99.11',
- 'pass': u'pass',
- 'port': 3128,
- 'type': u'bosh',
- 'useauth': True,
- 'user': u'user'}
- check_dict(bosh_dict, host=u'gajim.org', port=5280, user=u'user',
- passwd=u'pass')
+ check_uri('http://httpcm.jabber.org/webclient', proto='http',
+ host='httpcm.jabber.org', port=80, path='/webclient')
- proxy_dict = {'bosh_content': u'text/xml; charset=utf-8',
- 'bosh_hold': 2,
- 'bosh_http_pipelining': False,
- 'bosh_port': 5280,
- 'bosh_uri': u'',
- 'bosh_useproxy': True,
- 'bosh_wait': 30,
- 'bosh_wait_for_restart_response': False,
- 'host': u'172.16.99.11',
- 'pass': u'pass',
- 'port': 3128,
- 'type': 'socks5',
- 'useauth': True,
- 'user': u'user'}
- check_dict(proxy_dict, host=u'172.16.99.11', port=3128, user=u'user',
- passwd=u'pass')
-
-
-if __name__ == '__main__':
- unittest.main()
+ check_uri('https://httpcm.jabber.org/webclient', proto='https',
+ host='httpcm.jabber.org', port=443, path='/webclient')
+
+ def test_get_proxy_data_from_dict(self):
+ def check_dict(proxy_dict, host, port, user, passwd):
+ _host, _port, _user, _passwd = transports_nb.get_proxy_data_from_dict(
+ proxy_dict)
+ self.assertEqual(_host, host)
+ self.assertEqual(_port, port)
+ self.assertEqual(_user, user)
+ self.assertEqual(_passwd, passwd)
+
+ bosh_dict = {'bosh_content': u'text/xml; charset=utf-8',
+ 'bosh_hold': 2,
+ 'bosh_http_pipelining': False,
+ 'bosh_uri': u'http://gajim.org:5280/http-bind',
+ 'bosh_useproxy': False,
+ 'bosh_wait': 30,
+ 'bosh_wait_for_restart_response': False,
+ 'host': u'172.16.99.11',
+ 'pass': u'pass',
+ 'port': 3128,
+ 'type': u'bosh',
+ 'useauth': True,
+ 'user': u'user'}
+ check_dict(bosh_dict, host=u'gajim.org', port=5280, user=u'user',
+ passwd=u'pass')
-# vim: se ts=3:
+ proxy_dict = {'bosh_content': u'text/xml; charset=utf-8',
+ 'bosh_hold': 2,
+ 'bosh_http_pipelining': False,
+ 'bosh_port': 5280,
+ 'bosh_uri': u'',
+ 'bosh_useproxy': True,
+ 'bosh_wait': 30,
+ 'bosh_wait_for_restart_response': False,
+ 'host': u'172.16.99.11',
+ 'pass': u'pass',
+ 'port': 3128,
+ 'type': 'socks5',
+ 'useauth': True,
+ 'user': u'user'}
+ check_dict(proxy_dict, host=u'172.16.99.11', port=3128, user=u'user',
+ passwd=u'pass')
+
+
+if __name__ == '__main__':
+ unittest.main()