Welcome to mirror list, hosted at ThFree Co, Russian Federation.

dev.gajim.org/gajim/gajim.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'test/integration/test_xmpp_client_nb.py')
-rw-r--r--test/integration/test_xmpp_client_nb.py282
1 files changed, 140 insertions, 142 deletions
diff --git a/test/integration/test_xmpp_client_nb.py b/test/integration/test_xmpp_client_nb.py
index 24a54a3ca..2efdcfe91 100644
--- a/test/integration/test_xmpp_client_nb.py
+++ b/test/integration/test_xmpp_client_nb.py
@@ -24,148 +24,146 @@ xmpp_server_port = ('gajim.org', 5222)
credentials = ['unittest', 'testtest', 'res']
class TestNonBlockingClient(unittest.TestCase):
- '''
- Test Cases class for NonBlockingClient.
- '''
- def setUp(self):
- ''' IdleQueue thread is run and dummy connection is created. '''
- self.idlequeue_thread = IdleQueueThread()
- self.connection = MockConnection() # for dummy callbacks
- self.idlequeue_thread.start()
-
- def tearDown(self):
- ''' IdleQueue thread is stopped. '''
- self.idlequeue_thread.stop_thread()
- self.idlequeue_thread.join()
-
- self.client = None
-
- def open_stream(self, server_port, wrong_pass=False):
- '''
- Method opening the XMPP connection. It returns when <stream:features>
- is received from server.
-
- :param server_port: tuple of (hostname, port) for where the client should
- connect.
- '''
-
- class TempConnection():
- def get_password(self, cb):
- if wrong_pass:
- cb('wrong pass')
- else:
- cb(credentials[1])
- def on_connect_failure(self):
- pass
-
- self.client = client_nb.NonBlockingClient(
- domain=server_port[0],
- idlequeue=self.idlequeue_thread.iq,
- caller=Mock(realClass=TempConnection))
-
- self.client.connect(
- hostname=server_port[0],
- port=server_port[1],
- on_connect=lambda *args: self.connection.on_connect(True, *args),
- on_connect_failure=lambda *args: self.connection.on_connect(
- False, *args))
-
- self.assert_(self.connection.wait(),
- msg='waiting for callback from client constructor')
-
- # if on_connect was called, client has to be connected and vice versa
- if self.connection.connect_succeeded:
- self.assert_(self.client.get_connect_type())
- else:
- self.assert_(not self.client.get_connect_type())
-
- def client_auth(self, username, password, resource, sasl):
- '''
- Method authenticating connected client with supplied credentials. Returns
- when authentication is over.
-
- :param sasl: whether to use sasl (sasl=1) or old (sasl=0) authentication
- :todo: to check and be more specific about when it returns
- (bind, session..)
- '''
- self.client.auth(username, password, resource, sasl,
- on_auth=self.connection.on_auth)
-
- self.assert_(self.connection.wait(), msg='waiting for authentication')
-
- def do_disconnect(self):
- '''
- Does disconnecting of connected client. Returns when TCP connection is
- closed.
- '''
- self.client.RegisterDisconnectHandler(self.connection.set_event)
- self.client.disconnect()
-
- self.assertTrue(self.connection.wait(), msg='waiting for disconnecting')
-
- def test_proper_connect_sasl(self):
- '''
- The ideal testcase - client is connected, authenticated with SASL and
- then disconnected.
- '''
- self.open_stream(xmpp_server_port)
-
- # if client is not connected, lets raise the AssertionError
- self.assert_(self.client.get_connect_type())
- # client.disconnect() is already called from NBClient via
- # _on_connected_failure, no need to call it here
-
- self.client_auth(credentials[0], credentials[1], credentials[2], sasl=1)
- self.assert_(self.connection.con)
- self.assert_(self.connection.auth=='sasl', msg='Unable to auth via SASL')
-
- self.do_disconnect()
-
- def test_proper_connect_oldauth(self):
- '''
- The ideal testcase - client is connected, authenticated with old auth and
- then disconnected.
- '''
- self.open_stream(xmpp_server_port)
- self.assert_(self.client.get_connect_type())
- self.client_auth(credentials[0], credentials[1], credentials[2], sasl=0)
- self.assert_(self.connection.con)
- features = self.client.Dispatcher.Stream.features
- if not features.getTag('auth'):
- print "Server doesn't support old authentication type, ignoring test"
- else:
- self.assert_(self.connection.auth=='old_auth',
- msg='Unable to auth via old_auth')
- self.do_disconnect()
-
- def test_connect_to_nonexisting_host(self):
- '''
- Connect to nonexisting host. DNS request for A records should return
- nothing.
- '''
- self.open_stream(('fdsfsdf.fdsf.fss', 5222))
- self.assert_(not self.client.get_connect_type())
-
- def test_connect_to_wrong_port(self):
- '''
- Connect to nonexisting server. DNS request for A records should return an
- IP but there shouldn't be XMPP server running on specified port.
- '''
- self.open_stream((xmpp_server_port[0], 31337))
- self.assert_(not self.client.get_connect_type())
-
- def test_connect_with_wrong_creds(self):
- '''
- Connecting with invalid password.
- '''
- self.open_stream(xmpp_server_port, wrong_pass=True)
- self.assert_(self.client.get_connect_type())
- self.client_auth(credentials[0], 'wrong pass', credentials[2], sasl=1)
- self.assert_(self.connection.auth is None)
- self.do_disconnect()
+ '''
+ Test Cases class for NonBlockingClient.
+ '''
+ def setUp(self):
+ ''' IdleQueue thread is run and dummy connection is created. '''
+ self.idlequeue_thread = IdleQueueThread()
+ self.connection = MockConnection() # for dummy callbacks
+ self.idlequeue_thread.start()
+
+ def tearDown(self):
+ ''' IdleQueue thread is stopped. '''
+ self.idlequeue_thread.stop_thread()
+ self.idlequeue_thread.join()
+
+ self.client = None
+
+ def open_stream(self, server_port, wrong_pass=False):
+ '''
+ Method opening the XMPP connection. It returns when <stream:features>
+ is received from server.
+
+ :param server_port: tuple of (hostname, port) for where the client should
+ connect.
+ '''
+
+ class TempConnection():
+ def get_password(self, cb):
+ if wrong_pass:
+ cb('wrong pass')
+ else:
+ cb(credentials[1])
+ def on_connect_failure(self):
+ pass
+
+ self.client = client_nb.NonBlockingClient(
+ domain=server_port[0],
+ idlequeue=self.idlequeue_thread.iq,
+ caller=Mock(realClass=TempConnection))
+
+ self.client.connect(
+ hostname=server_port[0],
+ port=server_port[1],
+ on_connect=lambda *args: self.connection.on_connect(True, *args),
+ on_connect_failure=lambda *args: self.connection.on_connect(
+ False, *args))
+
+ self.assert_(self.connection.wait(),
+ msg='waiting for callback from client constructor')
+
+ # if on_connect was called, client has to be connected and vice versa
+ if self.connection.connect_succeeded:
+ self.assert_(self.client.get_connect_type())
+ else:
+ self.assert_(not self.client.get_connect_type())
+
+ def client_auth(self, username, password, resource, sasl):
+ '''
+ Method authenticating connected client with supplied credentials. Returns
+ when authentication is over.
+
+ :param sasl: whether to use sasl (sasl=1) or old (sasl=0) authentication
+ :todo: to check and be more specific about when it returns
+ (bind, session..)
+ '''
+ self.client.auth(username, password, resource, sasl,
+ on_auth=self.connection.on_auth)
+
+ self.assert_(self.connection.wait(), msg='waiting for authentication')
+
+ def do_disconnect(self):
+ '''
+ Does disconnecting of connected client. Returns when TCP connection is
+ closed.
+ '''
+ self.client.RegisterDisconnectHandler(self.connection.set_event)
+ self.client.disconnect()
+
+ self.assertTrue(self.connection.wait(), msg='waiting for disconnecting')
+
+ def test_proper_connect_sasl(self):
+ '''
+ The ideal testcase - client is connected, authenticated with SASL and
+ then disconnected.
+ '''
+ self.open_stream(xmpp_server_port)
+
+ # if client is not connected, lets raise the AssertionError
+ self.assert_(self.client.get_connect_type())
+ # client.disconnect() is already called from NBClient via
+ # _on_connected_failure, no need to call it here
+
+ self.client_auth(credentials[0], credentials[1], credentials[2], sasl=1)
+ self.assert_(self.connection.con)
+ self.assert_(self.connection.auth=='sasl', msg='Unable to auth via SASL')
+
+ self.do_disconnect()
+
+ def test_proper_connect_oldauth(self):
+ '''
+ The ideal testcase - client is connected, authenticated with old auth and
+ then disconnected.
+ '''
+ self.open_stream(xmpp_server_port)
+ self.assert_(self.client.get_connect_type())
+ self.client_auth(credentials[0], credentials[1], credentials[2], sasl=0)
+ self.assert_(self.connection.con)
+ features = self.client.Dispatcher.Stream.features
+ if not features.getTag('auth'):
+ print "Server doesn't support old authentication type, ignoring test"
+ else:
+ self.assert_(self.connection.auth=='old_auth',
+ msg='Unable to auth via old_auth')
+ self.do_disconnect()
+
+ def test_connect_to_nonexisting_host(self):
+ '''
+ Connect to nonexisting host. DNS request for A records should return
+ nothing.
+ '''
+ self.open_stream(('fdsfsdf.fdsf.fss', 5222))
+ self.assert_(not self.client.get_connect_type())
+
+ def test_connect_to_wrong_port(self):
+ '''
+ Connect to nonexisting server. DNS request for A records should return an
+ IP but there shouldn't be XMPP server running on specified port.
+ '''
+ self.open_stream((xmpp_server_port[0], 31337))
+ self.assert_(not self.client.get_connect_type())
+
+ def test_connect_with_wrong_creds(self):
+ '''
+ Connecting with invalid password.
+ '''
+ self.open_stream(xmpp_server_port, wrong_pass=True)
+ self.assert_(self.client.get_connect_type())
+ self.client_auth(credentials[0], 'wrong pass', credentials[2], sasl=1)
+ self.assert_(self.connection.auth is None)
+ self.do_disconnect()
if __name__ == '__main__':
- unittest.main()
-
-# vim: se ts=3:
+ unittest.main()