Welcome to mirror list, hosted at ThFree Co, Russian Federation.

dev.gajim.org/gajim/gajim.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStephan Erb <steve-e@h3c.de>2009-11-05 11:06:46 +0300
committerStephan Erb <steve-e@h3c.de>2009-11-05 11:06:46 +0300
commit2e5bf4d0d2b205cc76221cbe253176bdbc56d3a3 (patch)
treeb6648da219b0dcac6e9cc92598efe8dab5e181c3 /test/integration
parent3157cf0b1bcc361677e23c3014e6ee470881602c (diff)
Organize tests into unit and integration tests.
Integration tests can depend on UI, network or both. Unittests use neither.
Diffstat (limited to 'test/integration')
-rw-r--r--test/integration/__init__.py6
-rw-r--r--test/integration/test_gui_event_integration.py196
-rw-r--r--test/integration/test_resolver.py102
-rw-r--r--test/integration/test_roster.py209
-rw-r--r--test/integration/test_xmpp_client_nb.py171
-rw-r--r--test/integration/test_xmpp_transports_nb.py277
6 files changed, 961 insertions, 0 deletions
diff --git a/test/integration/__init__.py b/test/integration/__init__.py
new file mode 100644
index 000000000..0adf844f1
--- /dev/null
+++ b/test/integration/__init__.py
@@ -0,0 +1,6 @@
+'''
+
+This package contains integration tests. Integration tests are tests
+which require or include UI, network or both.
+
+''' \ No newline at end of file
diff --git a/test/integration/test_gui_event_integration.py b/test/integration/test_gui_event_integration.py
new file mode 100644
index 000000000..00ed5c696
--- /dev/null
+++ b/test/integration/test_gui_event_integration.py
@@ -0,0 +1,196 @@
+'''
+Tests for the miscellaneous functions scattered throughout src/gajim.py
+'''
+import unittest
+
+import lib
+lib.setup_env()
+
+from common import gajim
+from gajim import Interface
+
+from gajim_mocks import *
+gajim.logger = MockLogger()
+
+Interface()
+
+import time
+from data import *
+
+import roster_window
+
+import notify
+
+class TestStatusChange(unittest.TestCase):
+ '''tests gajim.py's incredibly complex handle_event_notify'''
+
+ def setUp(self):
+ gajim.interface.roster = roster_window.RosterWindow()
+
+ for acc in contacts:
+ gajim.connections[acc] = MockConnection(acc)
+
+ gajim.interface.roster.fill_contacts_and_groups_dicts(contacts[acc],
+ acc)
+ gajim.interface.roster.add_account(acc)
+ gajim.interface.roster.add_account_contacts(acc)
+
+ self.assertEqual(0, len(notify.notifications))
+
+ def tearDown(self):
+ gajim.interface.roster.model.clear()
+
+ for acc in contacts:
+ gajim.contacts.clear_contacts(acc)
+
+ del gajim.interface.roster
+
+ notify.notifications = []
+
+ def contact_comes_online(self, account, jid, resource, prio):
+ '''a remote contact comes online'''
+ gajim.interface.handle_event_notify(account, (jid, 'online', "I'm back!",
+ resource, prio, None, time.time(), None))
+
+ contact = None
+ for c in gajim.contacts.get_contacts(account, jid):
+ if c.resource == resource:
+ contact = c
+ break
+
+ self.assertEqual('online', contact.show)
+ self.assertEqual("I'm back!", contact.status)
+ self.assertEqual(prio, contact.priority)
+
+ # the most recent notification is that the contact connected
+ self.assertEqual('contact_connected', notify.notifications[-1][0])
+
+ def contact_goes_offline(self, account, jid, resource, prio,
+ still_exists = True):
+ '''a remote contact goes offline.'''
+ gajim.interface.handle_event_notify(account, (jid, 'offline', 'Goodbye!',
+ resource, prio, None, time.time(), None))
+
+ contact = None
+ for c in gajim.contacts.get_contacts(account, jid):
+ if c.resource == resource:
+ contact = c
+ break
+
+ if not still_exists:
+ self.assert_(contact is None)
+ return
+
+ self.assertEqual('offline', contact.show)
+ self.assertEqual('Goodbye!', contact.status)
+ self.assertEqual(prio, contact.priority)
+
+ self.assertEqual('contact_disconnected', notify.notifications[-1][0])
+
+ def user_starts_chatting(self, jid, account, resource=None):
+ '''the user opens a chat window and starts talking'''
+ ctrl = MockChatControl(jid, account)
+ win = MockWindow()
+ win.new_tab(ctrl)
+ gajim.interface.msg_win_mgr._windows['test'] = win
+
+ if resource:
+ jid = jid + '/' + resource
+
+ # a basic session is started
+ session = gajim.connections[account1].make_new_session(jid,
+ '01234567890abcdef', cls=MockSession)
+ ctrl.set_session(session)
+
+ return ctrl
+
+ def user_starts_esession(self, jid, resource, account):
+ '''the user opens a chat window and starts an encrypted session'''
+ ctrl = self.user_starts_chatting(jid, account, resource)
+ ctrl.session.status = 'active'
+ ctrl.session.enable_encryption = True
+
+ return ctrl
+
+ def test_contact_comes_online(self):
+ jid = 'default1@gajim.org'
+
+ # contact is offline initially
+ contacts = gajim.contacts.get_contacts(account1, jid)
+ self.assertEqual(1, len(contacts))
+ self.assertEqual('offline', contacts[0].show)
+ self.assertEqual('', contacts[0].status)
+
+ self.contact_comes_online(account1, jid, 'lowprio', 1)
+
+ def test_contact_goes_offline(self):
+ jid = 'default1@gajim.org'
+
+ self.contact_comes_online(account1, jid, 'lowprio', 1)
+
+ ctrl = self.user_starts_chatting(jid, account1)
+ orig_sess = ctrl.session
+
+ self.contact_goes_offline(account1, jid, 'lowprio', 1)
+
+ # session hasn't changed since we were talking to the bare jid
+ self.assertEqual(orig_sess, ctrl.session)
+
+ def test_two_resources_higher_comes_online(self):
+ jid = 'default1@gajim.org'
+
+ self.contact_comes_online(account1, jid, 'lowprio', 1)
+
+ ctrl = self.user_starts_chatting(jid, account1)
+
+ self.contact_comes_online(account1, jid, 'highprio', 50)
+
+ # old session was dropped
+ self.assertEqual(None, ctrl.session)
+
+ def test_two_resources_higher_goes_offline(self):
+ jid = 'default1@gajim.org'
+
+ self.contact_comes_online(account1, jid, 'lowprio', 1)
+ self.contact_comes_online(account1, jid, 'highprio', 50)
+
+ ctrl = self.user_starts_chatting(jid, account1)
+
+ self.contact_goes_offline(account1, jid, 'highprio', 50,
+ still_exists=False)
+
+ # old session was dropped
+ self.assertEqual(None, ctrl.session)
+
+ def test_two_resources_higher_comes_online_with_esession(self):
+ jid = 'default1@gajim.org'
+
+ self.contact_comes_online(account1, jid, 'lowprio', 1)
+
+ ctrl = self.user_starts_esession(jid, 'lowprio', account1)
+
+ self.contact_comes_online(account1, jid, 'highprio', 50)
+
+ # session was associated with the low priority full jid, so it should
+ # have been removed from the control
+ self.assertEqual(None, ctrl.session)
+
+ def test_two_resources_higher_goes_offline_with_esession(self):
+ jid = 'default1@gajim.org'
+
+ self.contact_comes_online(account1, jid, 'lowprio', 1)
+ self.contact_comes_online(account1, jid, 'highprio', 50)
+
+ ctrl = self.user_starts_esession(jid, 'highprio', account1)
+
+ self.contact_goes_offline(account1, jid, 'highprio', 50,
+ still_exists=False)
+
+ # session was associated with the high priority full jid, so it should
+ # have been removed from the control
+ self.assertEqual(None, ctrl.session)
+
+if __name__ == '__main__':
+ unittest.main()
+
+# vim: se ts=3:
diff --git a/test/integration/test_resolver.py b/test/integration/test_resolver.py
new file mode 100644
index 000000000..1836d3c7b
--- /dev/null
+++ b/test/integration/test_resolver.py
@@ -0,0 +1,102 @@
+import unittest
+
+import time
+
+import lib
+lib.setup_env()
+
+from common import resolver
+
+from mock import Mock, expectParams
+from gajim_mocks import *
+from xmpp_mocks import IdleQueueThread
+
+GMAIL_SRV_NAME = '_xmpp-client._tcp.gmail.com'
+NONSENSE_NAME = 'sfsdfsdfsdf.sdfs.fsd'
+JABBERCZ_TXT_NAME = '_xmppconnect.jabber.cz'
+JABBERCZ_SRV_NAME = '_xmpp-client._tcp.jabber.cz'
+
+TEST_LIST = [(GMAIL_SRV_NAME, 'srv', True),
+ (NONSENSE_NAME, 'srv', False),
+ (JABBERCZ_SRV_NAME, 'srv', True)]
+
+class TestResolver(unittest.TestCase):
+ '''
+ Test for LibAsyncNSResolver and NSLookupResolver. Requires working
+ network connection.
+ '''
+ def setUp(self):
+ self.idlequeue_thread = IdleQueueThread()
+ self.idlequeue_thread.start()
+
+ self.iq = self.idlequeue_thread.iq
+ self._reset()
+ self.resolver = None
+
+ def tearDown(self):
+ self.idlequeue_thread.stop_thread()
+ self.idlequeue_thread.join()
+
+ def _reset(self):
+ self.flag = False
+ self.expect_results = False
+ self.nslookup = False
+ self.resolver = None
+
+ def testLibAsyncNSResolver(self):
+ self._reset()
+ if not resolver.USE_LIBASYNCNS:
+ print 'testLibAsyncResolver: libasyncns-python not installed'
+ return
+ self.resolver = resolver.LibAsyncNSResolver()
+
+ for name, type, expect_results in TEST_LIST:
+ self.expect_results = expect_results
+ self._runLANSR(name, type)
+ self.flag = False
+
+ def _runLANSR(self, name, type):
+ self.resolver.resolve(
+ host = name,
+ type = type,
+ on_ready = self._myonready)
+ while not self.flag:
+ time.sleep(1)
+ self.resolver.process()
+
+ def _myonready(self, name, result_set):
+ if __name__ == '__main__':
+ from pprint import pprint
+ pprint('on_ready called ...')
+ pprint('hostname: %s' % name)
+ pprint('result set: %s' % result_set)
+ pprint('res.resolved_hosts: %s' % self.resolver.resolved_hosts)
+ pprint('')
+ if self.expect_results:
+ self.assert_(len(result_set) > 0)
+ else:
+ self.assert_(result_set == [])
+ self.flag = True
+ if self.nslookup:
+ self._testNSLR()
+
+ def testNSLookupResolver(self):
+ self._reset()
+ self.nslookup = True
+ self.resolver = resolver.NSLookupResolver(self.iq)
+ self.test_list = TEST_LIST
+ self._testNSLR()
+
+ def _testNSLR(self):
+ if self.test_list == []:
+ return
+ name, type, self.expect_results = self.test_list.pop()
+ self.resolver.resolve(
+ host = name,
+ type = type,
+ on_ready = self._myonready)
+
+if __name__ == '__main__':
+ unittest.main()
+
+# vim: se ts=3:
diff --git a/test/integration/test_roster.py b/test/integration/test_roster.py
new file mode 100644
index 000000000..10d1ab488
--- /dev/null
+++ b/test/integration/test_roster.py
@@ -0,0 +1,209 @@
+import unittest
+
+import lib
+lib.setup_env()
+
+from data import *
+
+from mock import Mock, expectParams
+from gajim_mocks import *
+
+from common import gajim
+import roster_window
+
+gajim.get_jid_from_account = lambda acc: 'myjid@' + acc
+
+class TestRosterWindow(unittest.TestCase):
+
+ def setUp(self):
+ gajim.interface = MockInterface()
+ self.roster = roster_window.RosterWindow()
+
+ self.C_NAME = roster_window.C_NAME
+ self.C_TYPE = roster_window.C_TYPE
+ self.C_JID = roster_window.C_JID
+ self.C_ACCOUNT = roster_window.C_ACCOUNT
+
+ # Add after creating RosterWindow
+ # We want to test the filling explicitly
+ for acc in contacts:
+ gajim.connections[acc] = MockConnection(acc)
+
+ def tearDown(self):
+ self.roster.model.clear()
+ for acc in gajim.contacts.get_accounts():
+ gajim.contacts.clear_contacts(acc)
+
+ ### Custom assertions
+ def assert_all_contacts_are_in_roster(self, acc):
+ for jid in contacts[acc]:
+ self.assert_contact_is_in_roster(jid, acc)
+
+ def assert_contact_is_in_roster(self, jid, account):
+ contacts = gajim.contacts.get_contacts(account, jid)
+ # check for all resources
+ for contact in contacts:
+ iters = self.roster._get_contact_iter(jid, account,
+ model=self.roster.model)
+
+ if jid != gajim.get_jid_from_account(account):
+ # We don't care for groups of SelfContact
+ self.assertTrue(len(iters) == len(contact.get_shown_groups()),
+ msg='Contact is not in all his groups')
+
+ # Are we big brother?
+ bb_jid = None
+ bb_account = None
+ family = gajim.contacts.get_metacontacts_family(account, jid)
+ if family:
+ nearby_family, bb_jid, bb_account = \
+ self.roster._get_nearby_family_and_big_brother(family, account)
+
+ is_in_nearby_family = (jid, account) in (
+ (data['jid'], data['account']) for data in nearby_family)
+ self.assertTrue(is_in_nearby_family,
+ msg='Contact not in his own nearby family')
+
+ is_big_brother = (bb_jid, bb_account) == (jid, account)
+
+ # check for each group tag
+ for titerC in iters:
+ self.assertTrue(self.roster.model.iter_is_valid(titerC),
+ msg='Contact iter invalid')
+
+ c_model = self.roster.model[titerC]
+ self.assertEquals(contact.get_shown_name(), c_model[self.C_NAME],
+ msg='Contact name missmatch')
+ self.assertEquals(contact.jid, c_model[self.C_JID],
+ msg='Jid missmatch')
+
+ if not self.roster.regroup:
+ self.assertEquals(account, c_model[self.C_ACCOUNT],
+ msg='Account missmatch')
+
+ # Check for correct nesting
+ parent_iter = self.roster.model.iter_parent(titerC)
+ p_model = self.roster.model[parent_iter]
+ if family:
+ if is_big_brother:
+ self.assertTrue(p_model[self.C_TYPE] == 'group',
+ msg='Big Brother is not on top')
+ else:
+ self.assertTrue(p_model[self.C_TYPE] == 'contact',
+ msg='Little Brother brother has no BigB')
+ else:
+ if jid == gajim.get_jid_from_account(account):
+ self.assertTrue(p_model[self.C_TYPE] == 'account',
+ msg='SelfContact is not on top')
+ else:
+ self.assertTrue(p_model[self.C_TYPE] == 'group',
+ msg='Contact not found in a group')
+
+ def assert_group_is_in_roster(self, group, account):
+ #TODO
+ pass
+
+ def assert_account_is_in_roster(self, acc):
+ titerA = self.roster._get_account_iter(acc, model=self.roster.model)
+ self.assertTrue(self.roster.model.iter_is_valid(titerA),
+ msg='Account iter is invalid')
+
+ acc_model = self.roster.model[titerA]
+ self.assertEquals(acc_model[self.C_TYPE], 'account',
+ msg='No account found')
+
+ if not self.roster.regroup:
+ self.assertEquals(acc_model[self.C_ACCOUNT], acc,
+ msg='Account not found')
+
+ self_jid = gajim.get_jid_from_account(acc)
+ self.assertEquals(acc_model[self.C_JID], self_jid,
+ msg='Account JID not found in account row')
+
+ def assert_model_is_in_sync(self):
+ #TODO: check that iter_n_children returns the correct numbers
+ pass
+
+ # tests
+ def test_fill_contacts_and_groups_dicts(self):
+ for acc in contacts:
+ self.roster.fill_contacts_and_groups_dicts(contacts[acc], acc)
+
+ for jid in contacts[acc]:
+ instances = gajim.contacts.get_contacts(acc, jid)
+
+ # Created a contact for each single jid?
+ self.assertTrue(len(instances) == 1)
+
+ # Contacts kept their info
+ contact = instances[0]
+ self.assertEquals(contact.groups, contacts[acc][jid]['groups'],
+ msg='Group Missmatch')
+
+ groups = contacts[acc][jid]['groups'] or ['General',]
+
+ # cleanup
+ self.roster.model.clear()
+ for acc in contacts:
+ gajim.contacts.clear_contacts(acc)
+
+ def test_fill_roster_model(self):
+ for acc in contacts:
+ self.roster.fill_contacts_and_groups_dicts(contacts[acc], acc)
+
+ self.roster.add_account(acc)
+ self.assert_account_is_in_roster(acc)
+
+ self.roster.add_account_contacts(acc)
+ self.assert_all_contacts_are_in_roster(acc)
+
+ self.assert_model_is_in_sync()
+
+
+class TestRosterWindowRegrouped(TestRosterWindow):
+
+ def setUp(self):
+ gajim.config.set('mergeaccounts', True)
+ TestRosterWindow.setUp(self)
+
+ def test_toggle_regroup(self):
+ self.roster.regroup = not self.roster.regroup
+ self.roster.setup_and_draw_roster()
+ self.roster.regroup = not self.roster.regroup
+ self.roster.setup_and_draw_roster()
+
+
+class TestRosterWindowMetaContacts(TestRosterWindowRegrouped):
+
+ def test_receive_metacontact_data(self):
+ for complete_data in metacontact_data:
+ t_acc = complete_data[0]['account']
+ t_jid = complete_data[0]['jid']
+ data = complete_data[1:]
+ for brother in data:
+ acc = brother['account']
+ jid = brother['jid']
+ gajim.contacts.add_metacontact(t_acc, t_jid, acc, jid)
+ self.roster.setup_and_draw_roster()
+
+ def test_connect_new_metacontact(self):
+ self.test_fill_roster_model()
+
+ jid = u'coolstuff@gajim.org'
+ contact = gajim.contacts.create_contact(jid)
+ gajim.contacts.add_contact(account1, contact)
+ self.roster.add_contact(jid, account1)
+ self.roster.chg_contact_status(contact, 'offline', '', account1)
+
+ gajim.contacts.add_metacontact(account1, u'samejid@gajim.org',
+ account1, jid)
+ self.roster.chg_contact_status(contact, 'online', '', account1)
+
+ self.assert_model_is_in_sync()
+
+
+
+if __name__ == '__main__':
+ unittest.main()
+
+# vim: se ts=3:
diff --git a/test/integration/test_xmpp_client_nb.py b/test/integration/test_xmpp_client_nb.py
new file mode 100644
index 000000000..24a54a3ca
--- /dev/null
+++ b/test/integration/test_xmpp_client_nb.py
@@ -0,0 +1,171 @@
+'''
+Testing script for NonBlockingClient class (src/common/xmpp/client_nb.py)
+
+It actually connects to a xmpp server.
+'''
+
+import unittest
+
+import lib
+lib.setup_env()
+
+from xmpp_mocks import MockConnection, IdleQueueThread
+from mock import Mock
+from common.xmpp import client_nb
+
+#import logging
+#log = logging.getLogger('gajim')
+#log.setLevel(logging.DEBUG)
+
+# (XMPP server hostname, c2s port). Script will connect to the machine.
+xmpp_server_port = ('gajim.org', 5222)
+
+# [username, password, passphrase]. Script will authenticate to server above
+credentials = ['unittest', 'testtest', 'res']
+
+class TestNonBlockingClient(unittest.TestCase):
+ '''
+ Test Cases class for NonBlockingClient.
+ '''
+ def setUp(self):
+ ''' IdleQueue thread is run and dummy connection is created. '''
+ self.idlequeue_thread = IdleQueueThread()
+ self.connection = MockConnection() # for dummy callbacks
+ self.idlequeue_thread.start()
+
+ def tearDown(self):
+ ''' IdleQueue thread is stopped. '''
+ self.idlequeue_thread.stop_thread()
+ self.idlequeue_thread.join()
+
+ self.client = None
+
+ def open_stream(self, server_port, wrong_pass=False):
+ '''
+ Method opening the XMPP connection. It returns when <stream:features>
+ is received from server.
+
+ :param server_port: tuple of (hostname, port) for where the client should
+ connect.
+ '''
+
+ class TempConnection():
+ def get_password(self, cb):
+ if wrong_pass:
+ cb('wrong pass')
+ else:
+ cb(credentials[1])
+ def on_connect_failure(self):
+ pass
+
+ self.client = client_nb.NonBlockingClient(
+ domain=server_port[0],
+ idlequeue=self.idlequeue_thread.iq,
+ caller=Mock(realClass=TempConnection))
+
+ self.client.connect(
+ hostname=server_port[0],
+ port=server_port[1],
+ on_connect=lambda *args: self.connection.on_connect(True, *args),
+ on_connect_failure=lambda *args: self.connection.on_connect(
+ False, *args))
+
+ self.assert_(self.connection.wait(),
+ msg='waiting for callback from client constructor')
+
+ # if on_connect was called, client has to be connected and vice versa
+ if self.connection.connect_succeeded:
+ self.assert_(self.client.get_connect_type())
+ else:
+ self.assert_(not self.client.get_connect_type())
+
+ def client_auth(self, username, password, resource, sasl):
+ '''
+ Method authenticating connected client with supplied credentials. Returns
+ when authentication is over.
+
+ :param sasl: whether to use sasl (sasl=1) or old (sasl=0) authentication
+ :todo: to check and be more specific about when it returns
+ (bind, session..)
+ '''
+ self.client.auth(username, password, resource, sasl,
+ on_auth=self.connection.on_auth)
+
+ self.assert_(self.connection.wait(), msg='waiting for authentication')
+
+ def do_disconnect(self):
+ '''
+ Does disconnecting of connected client. Returns when TCP connection is
+ closed.
+ '''
+ self.client.RegisterDisconnectHandler(self.connection.set_event)
+ self.client.disconnect()
+
+ self.assertTrue(self.connection.wait(), msg='waiting for disconnecting')
+
+ def test_proper_connect_sasl(self):
+ '''
+ The ideal testcase - client is connected, authenticated with SASL and
+ then disconnected.
+ '''
+ self.open_stream(xmpp_server_port)
+
+ # if client is not connected, lets raise the AssertionError
+ self.assert_(self.client.get_connect_type())
+ # client.disconnect() is already called from NBClient via
+ # _on_connected_failure, no need to call it here
+
+ self.client_auth(credentials[0], credentials[1], credentials[2], sasl=1)
+ self.assert_(self.connection.con)
+ self.assert_(self.connection.auth=='sasl', msg='Unable to auth via SASL')
+
+ self.do_disconnect()
+
+ def test_proper_connect_oldauth(self):
+ '''
+ The ideal testcase - client is connected, authenticated with old auth and
+ then disconnected.
+ '''
+ self.open_stream(xmpp_server_port)
+ self.assert_(self.client.get_connect_type())
+ self.client_auth(credentials[0], credentials[1], credentials[2], sasl=0)
+ self.assert_(self.connection.con)
+ features = self.client.Dispatcher.Stream.features
+ if not features.getTag('auth'):
+ print "Server doesn't support old authentication type, ignoring test"
+ else:
+ self.assert_(self.connection.auth=='old_auth',
+ msg='Unable to auth via old_auth')
+ self.do_disconnect()
+
+ def test_connect_to_nonexisting_host(self):
+ '''
+ Connect to nonexisting host. DNS request for A records should return
+ nothing.
+ '''
+ self.open_stream(('fdsfsdf.fdsf.fss', 5222))
+ self.assert_(not self.client.get_connect_type())
+
+ def test_connect_to_wrong_port(self):
+ '''
+ Connect to nonexisting server. DNS request for A records should return an
+ IP but there shouldn't be XMPP server running on specified port.
+ '''
+ self.open_stream((xmpp_server_port[0], 31337))
+ self.assert_(not self.client.get_connect_type())
+
+ def test_connect_with_wrong_creds(self):
+ '''
+ Connecting with invalid password.
+ '''
+ self.open_stream(xmpp_server_port, wrong_pass=True)
+ self.assert_(self.client.get_connect_type())
+ self.client_auth(credentials[0], 'wrong pass', credentials[2], sasl=1)
+ self.assert_(self.connection.auth is None)
+ self.do_disconnect()
+
+
+if __name__ == '__main__':
+ unittest.main()
+
+# vim: se ts=3:
diff --git a/test/integration/test_xmpp_transports_nb.py b/test/integration/test_xmpp_transports_nb.py
new file mode 100644
index 000000000..f6cbab510
--- /dev/null
+++ b/test/integration/test_xmpp_transports_nb.py
@@ -0,0 +1,277 @@
+'''
+Integration test for tranports classes. See unit for the ordinary
+unit tests of this module.
+'''
+
+import unittest
+import socket
+
+import lib
+lib.setup_env()
+
+from xmpp_mocks import IdleQueueThread, IdleMock
+from common.xmpp import transports_nb
+
+
+class AbstractTransportTest(unittest.TestCase):
+ ''' Encapsulates Idlequeue instantiation for transports and more...'''
+
+ def setUp(self):
+ ''' IdleQueue thread is run and dummy connection is created. '''
+ self.idlequeue_thread = IdleQueueThread()
+ self.idlequeue_thread.start()
+ self._setup_hook()
+
+ def tearDown(self):
+ ''' IdleQueue thread is stopped. '''
+ self._teardown_hook()
+ self.idlequeue_thread.stop_thread()
+ self.idlequeue_thread.join()
+
+ def _setup_hook(self):
+ pass
+
+ def _teardown_hook(self):
+ pass
+
+ def expect_receive(self, expected, count=1, msg=None):
+ '''
+ Returns a callback function that will assert whether the data passed to
+ it equals the one specified when calling this function.
+
+ Can be used to make sure transport dispatch correct data.
+ '''
+ def receive(data, *args, **kwargs):
+ self.assertEqual(data, expected, msg=msg)
+ self._expected_count -= 1
+ self._expected_count = count
+ return receive
+
+ def have_received_expected(self):
+ '''
+ Plays together with expect_receive(). Will return true if expected_rcv
+ callback was called as often as specified
+ '''
+ return self._expected_count == 0
+
+
+class TestNonBlockingTCP(AbstractTransportTest):
+ '''
+ Test class for NonBlockingTCP. Will actually try to connect to an existing
+ XMPP server.
+ '''
+ class MockClient(IdleMock):
+ ''' Simple client to test transport functionality '''
+ def __init__(self, idlequeue, testcase):
+ self.idlequeue = idlequeue
+ self.testcase = testcase
+ IdleMock.__init__(self)
+
+ def do_connect(self, establish_tls=False, proxy_dict=None):
+ try:
+ ips = socket.getaddrinfo('gajim.org', 5222,
+ socket.AF_UNSPEC,socket.SOCK_STREAM)
+ ip = ips[0]
+ except socket.error, e:
+ self.testcase.fail(msg=str(e))
+
+ self.socket = transports_nb.NonBlockingTCP(
+ raise_event=lambda event_type, data: self.testcase.assertTrue(
+ event_type and data),
+ on_disconnect=lambda: self.on_success(mode='SocketDisconnect'),
+ idlequeue=self.idlequeue,
+ estabilish_tls=establish_tls,
+ certs=('../data/other/cacerts.pem', 'tmp/cacerts.pem'),
+ proxy_dict=proxy_dict)
+
+ self.socket.PlugIn(self)
+
+ self.socket.connect(conn_5tuple=ip,
+ on_connect=lambda: self.on_success(mode='TCPconnect'),
+ on_connect_failure=self.on_failure)
+ self.testcase.assertTrue(self.wait(), msg='Connection timed out')
+
+ def do_disconnect(self):
+ self.socket.disconnect()
+ self.testcase.assertTrue(self.wait(), msg='Disconnect timed out')
+
+ def on_failure(self, err_message):
+ self.set_event()
+ self.testcase.fail(msg=err_message)
+
+ def on_success(self, mode, data=None):
+ if mode == "TCPconnect":
+ pass
+ if mode == "SocketDisconnect":
+ pass
+ self.set_event()
+
+ def _setup_hook(self):
+ self.client = self.MockClient(idlequeue=self.idlequeue_thread.iq,
+ testcase=self)
+
+ def _teardown_hook(self):
+ if self.client.socket.state == 'CONNECTED':
+ self.client.do_disconnect()
+
+ def test_connect_disconnect_plain(self):
+ ''' Establish plain connection '''
+ self.client.do_connect(establish_tls=False)
+ self.assertEquals(self.client.socket.state, 'CONNECTED')
+ self.client.do_disconnect()
+ self.assertEquals(self.client.socket.state, 'DISCONNECTED')
+
+# def test_connect_disconnect_ssl(self):
+# ''' Establish SSL (not TLS) connection '''
+# self.client.do_connect(establish_tls=True)
+# self.assertEquals(self.client.socket.state, 'CONNECTED')
+# self.client.do_disconnect()
+# self.assertEquals(self.client.socket.state, 'DISCONNECTED')
+
+ def test_do_receive(self):
+ ''' Test _do_receive method by overwriting socket.recv '''
+ self.client.do_connect()
+ sock = self.client.socket
+
+ # transport shall receive data
+ data = "Please don't fail"
+ sock._recv = lambda buffer: data
+ sock.onreceive(self.expect_receive(data))
+ sock._do_receive()
+ self.assertTrue(self.have_received_expected(), msg='Did not receive data')
+ self.assert_(self.client.socket.state == 'CONNECTED')
+
+ # transport shall do nothing as an non-fatal SSL is simulated
+ sock._recv = lambda buffer: None
+ sock.onreceive(self.assertFalse) # we did not receive anything...
+ sock._do_receive()
+ self.assert_(self.client.socket.state == 'CONNECTED')
+
+ # transport shall disconnect as remote side closed the connection
+ sock._recv = lambda buffer: ''
+ sock.onreceive(self.assertFalse) # we did not receive anything...
+ sock._do_receive()
+ self.assert_(self.client.socket.state == 'DISCONNECTED')
+
+ def test_do_send(self):
+ ''' Test _do_send method by overwriting socket.send '''
+ self.client.do_connect()
+ sock = self.client.socket
+
+ outgoing = [] # what we have actually send to our socket.socket
+ data_part1 = "Please don't "
+ data_part2 = "fail!"
+ data_complete = data_part1 + data_part2
+
+ # Simulate everything could be send in one go
+ def _send_all(data):
+ outgoing.append(data)
+ return len(data)
+ sock._send = _send_all
+ sock.send(data_part1)
+ sock.send(data_part2)
+ sock._do_send()
+ sock._do_send()
+ self.assertTrue(self.client.socket.state == 'CONNECTED')
+ self.assertTrue(data_part1 in outgoing and data_part2 in outgoing)
+ self.assertFalse(sock.sendqueue and sock.sendbuff,
+ msg='There is still unsend data in buffers')
+
+ # Simulate data could only be sent in chunks
+ self.chunk_count = 0
+ outgoing = []
+ def _send_chunks(data):
+ if self.chunk_count == 0:
+ outgoing.append(data_part1)
+ self.chunk_count += 1
+ return len(data_part1)
+ else:
+ outgoing.append(data_part2)
+ return len(data_part2)
+ sock._send = _send_chunks
+ sock.send(data_complete)
+ sock._do_send() # process first chunk
+ sock._do_send() # process the second one
+ self.assertTrue(self.client.socket.state == 'CONNECTED')
+ self.assertTrue(data_part1 in outgoing and data_part2 in outgoing)
+ self.assertFalse(sock.sendqueue and sock.sendbuff,
+ msg='There is still unsend data in buffers')
+
+
+class TestNonBlockingHTTP(AbstractTransportTest):
+ ''' Test class for NonBlockingHTTP transport'''
+
+ bosh_http_dict = {
+ 'http_uri': 'http://gajim.org:5280/http-bind',
+ 'http_version': 'HTTP/1.1',
+ 'http_persistent': True,
+ 'add_proxy_headers': False
+ }
+
+ def _get_transport(self, http_dict, proxy_dict=None):
+ return transports_nb.NonBlockingHTTP(
+ raise_event=None,
+ on_disconnect=None,
+ idlequeue=self.idlequeue_thread.iq,
+ estabilish_tls=False,
+ certs=None,
+ on_http_request_possible=lambda: None,
+ on_persistent_fallback=None,
+ http_dict=http_dict,
+ proxy_dict=proxy_dict,
+ )
+
+ def test_parse_own_http_message(self):
+ ''' Build a HTTP message and try to parse it afterwards '''
+ transport = self._get_transport(self.bosh_http_dict)
+
+ data = "<test>Please don't fail!</test>"
+ http_message = transport.build_http_message(data)
+ statusline, headers, http_body = transport.parse_http_message(
+ http_message)
+
+ self.assertTrue(statusline and isinstance(statusline, list))
+ self.assertTrue(headers and isinstance(headers, dict))
+ self.assertEqual(data, http_body, msg='Input and output are different')
+
+ def test_receive_http_message(self):
+ ''' Let _on_receive handle some http messages '''
+ transport = self._get_transport(self.bosh_http_dict)
+
+ header = ("HTTP/1.1 200 OK\r\nContent-Type: text/xml; charset=utf-8\r\n" +
+ "Content-Length: 88\r\n\r\n")
+ payload = "<test>Please don't fail!</test>"
+ body = "<body xmlns='http://jabber.org/protocol/httpbind'>%s</body>" \
+ % payload
+ message = "%s%s" % (header, body)
+
+ # try to receive in one go
+ transport.onreceive(self.expect_receive(body, msg='Failed: In one go'))
+ transport._on_receive(message)
+ self.assertTrue(self.have_received_expected(), msg='Failed: In one go')
+
+# FIXME: Not yet implemented.
+# def test_receive_http_message_in_chunks(self):
+# ''' Let _on_receive handle some chunked http messages '''
+# transport = self._get_transport(self.bosh_http_dict)
+#
+# header = ("HTTP/1.1 200 OK\r\nContent-Type: text/xml; charset=utf-8\r\n" +
+# "Content-Length: 88\r\n\r\n")
+# payload = "<test>Please don't fail!</test>"
+# body = "<body xmlns='http://jabber.org/protocol/httpbind'>%s</body>" \
+# % payload
+# message = "%s%s" % (header, body)
+#
+# chunk1, chunk2, chunk3 = message[:20], message[20:73], message[73:]
+# nextmessage_chunk = "\r\n\r\nHTTP/1.1 200 OK\r\nContent-Type: text/x"
+# chunks = (chunk1, chunk2, chunk3, nextmessage_chunk)
+#
+# transport.onreceive(self.expect_receive(body, msg='Failed: In chunks'))
+# for chunk in chunks:
+# transport._on_receive(chunk)
+# self.assertTrue(self.have_received_expected(), msg='Failed: In chunks')
+
+if __name__ == '__main__':
+ unittest.main()
+
+# vim: se ts=3: