Welcome to mirror list, hosted at ThFree Co, Russian Federation.

dev.gajim.org/gajim/gajim.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYann Leboulanger <asterix@lagaule.org>2013-04-08 01:41:15 +0400
committerYann Leboulanger <asterix@lagaule.org>2013-04-08 01:41:15 +0400
commitd2992815ee801d064567af1bca7a39a768094a30 (patch)
treeb27cffb558986a53acc4e1887e605fc2f42867c5 /test/integration
parentaef4452ff56f663f0fc198a92df5ce969e5f1c52 (diff)
re-enable test suite
Diffstat (limited to 'test/integration')
-rw-r--r--test/integration/test_gui_event_integration.py14
-rw-r--r--test/integration/test_xmpp_client_nb.py169
-rw-r--r--test/integration/test_xmpp_transports_nb.py276
3 files changed, 12 insertions, 447 deletions
diff --git a/test/integration/test_gui_event_integration.py b/test/integration/test_gui_event_integration.py
index c5999389f..248562939 100644
--- a/test/integration/test_gui_event_integration.py
+++ b/test/integration/test_gui_event_integration.py
@@ -6,6 +6,8 @@ import unittest
import lib
lib.setup_env()
+import nbxmpp
+
from common import gajim
from common import contacts as contacts_module
from gajim import Interface
@@ -46,8 +48,16 @@ class TestStatusChange(unittest.TestCase):
def contact_comes_online(self, account, jid, resource, prio):
'''a remote contact comes online'''
- gajim.interface.handle_event_notify(account, (jid, 'online', "I'm back!",
- resource, prio, None, time.time(), None))
+ xml = """<presence from='%s/%s' id='123'>
+ <c node='http://gajim.org' ver='pRCD6cgQ4SDqNMCjdhRV6TECx5o='
+ hash='sha-1' xmlns='http://jabber.org/protocol/caps'/>
+ <status>I'm back!</status>
+ </presence>
+ """ % (jid, resource)
+ msg = nbxmpp.protocol.Presence(node=nbxmpp.simplexml.XML2Node(xml))
+ gajim.connections[account]._presenceCB(None, msg)
+# gajim.interface.handle_event_notify(account, (jid, 'online', "I'm back!",
+# resource, prio, None, time.time(), None))
contact = None
for c in gajim.contacts.get_contacts(account, jid):
diff --git a/test/integration/test_xmpp_client_nb.py b/test/integration/test_xmpp_client_nb.py
deleted file mode 100644
index 3159552ba..000000000
--- a/test/integration/test_xmpp_client_nb.py
+++ /dev/null
@@ -1,169 +0,0 @@
-'''
-Testing script for NonBlockingClient class (src/common/xmpp/client_nb.py)
-
-It actually connects to a xmpp server.
-'''
-
-import unittest
-
-import lib
-lib.setup_env()
-
-from xmpp_mocks import MockConnection, IdleQueueThread
-from mock import Mock
-from common.xmpp import client_nb
-
-#import logging
-#log = logging.getLogger('gajim')
-#log.setLevel(logging.DEBUG)
-
-# (XMPP server hostname, c2s port). Script will connect to the machine.
-xmpp_server_port = ('gajim.org', 5222)
-
-# [username, password, passphrase]. Script will authenticate to server above
-credentials = ['unittest', 'testtest', 'res']
-
-class TestNonBlockingClient(unittest.TestCase):
- '''
- Test Cases class for NonBlockingClient.
- '''
- def setUp(self):
- ''' IdleQueue thread is run and dummy connection is created. '''
- self.idlequeue_thread = IdleQueueThread()
- self.connection = MockConnection() # for dummy callbacks
- self.idlequeue_thread.start()
-
- def tearDown(self):
- ''' IdleQueue thread is stopped. '''
- self.idlequeue_thread.stop_thread()
- self.idlequeue_thread.join()
-
- self.client = None
-
- def open_stream(self, server_port, wrong_pass=False):
- '''
- Method opening the XMPP connection. It returns when <stream:features>
- is received from server.
-
- :param server_port: tuple of (hostname, port) for where the client should
- connect.
- '''
-
- class TempConnection():
- def get_password(self, cb, mechanism):
- if wrong_pass:
- cb('wrong pass')
- else:
- cb(credentials[1])
- def on_connect_failure(self):
- pass
-
- self.client = client_nb.NonBlockingClient(
- domain=server_port[0],
- idlequeue=self.idlequeue_thread.iq,
- caller=Mock(realClass=TempConnection))
-
- self.client.connect(
- hostname=server_port[0],
- port=server_port[1],
- on_connect=lambda *args: self.connection.on_connect(True, *args),
- on_connect_failure=lambda *args: self.connection.on_connect(
- False, *args))
-
- self.assert_(self.connection.wait(),
- msg='waiting for callback from client constructor')
-
- # if on_connect was called, client has to be connected and vice versa
- if self.connection.connect_succeeded:
- self.assert_(self.client.get_connect_type())
- else:
- self.assert_(not self.client.get_connect_type())
-
- def client_auth(self, username, password, resource, sasl):
- '''
- Method authenticating connected client with supplied credentials. Returns
- when authentication is over.
-
- :param sasl: whether to use sasl (sasl=1) or old (sasl=0) authentication
- :todo: to check and be more specific about when it returns
- (bind, session..)
- '''
- self.client.auth(username, password, resource, sasl,
- on_auth=self.connection.on_auth)
-
- self.assert_(self.connection.wait(), msg='waiting for authentication')
-
- def do_disconnect(self):
- '''
- Does disconnecting of connected client. Returns when TCP connection is
- closed.
- '''
- self.client.RegisterDisconnectHandler(self.connection.set_event)
- self.client.disconnect()
-
- self.assertTrue(self.connection.wait(), msg='waiting for disconnecting')
-
- def test_proper_connect_sasl(self):
- '''
- The ideal testcase - client is connected, authenticated with SASL and
- then disconnected.
- '''
- self.open_stream(xmpp_server_port)
-
- # if client is not connected, lets raise the AssertionError
- self.assert_(self.client.get_connect_type())
- # client.disconnect() is already called from NBClient via
- # _on_connected_failure, no need to call it here
-
- self.client_auth(credentials[0], credentials[1], credentials[2], sasl=1)
- self.assert_(self.connection.con)
- self.assert_(self.connection.auth=='sasl', msg='Unable to auth via SASL')
-
- self.do_disconnect()
-
- def test_proper_connect_oldauth(self):
- '''
- The ideal testcase - client is connected, authenticated with old auth and
- then disconnected.
- '''
- self.open_stream(xmpp_server_port)
- self.assert_(self.client.get_connect_type())
- self.client_auth(credentials[0], credentials[1], credentials[2], sasl=0)
- self.assert_(self.connection.con)
- features = self.client.Dispatcher.Stream.features
- if not features.getTag('auth'):
- print("Server doesn't support old authentication type, ignoring test")
- else:
- self.assert_(self.connection.auth=='old_auth',
- msg='Unable to auth via old_auth')
- self.do_disconnect()
-
- def test_connect_to_nonexisting_host(self):
- '''
- Connect to nonexisting host. DNS request for A records should return
- nothing.
- '''
- self.open_stream(('fdsfsdf.fdsf.fss', 5222))
- self.assert_(not self.client.get_connect_type())
-
- def test_connect_to_wrong_port(self):
- '''
- Connect to nonexisting server. DNS request for A records should return an
- IP but there shouldn't be XMPP server running on specified port.
- '''
- self.open_stream((xmpp_server_port[0], 31337))
- self.assert_(not self.client.get_connect_type())
-
- def test_connect_with_wrong_creds(self):
- '''
- Connecting with invalid password.
- '''
- self.open_stream(xmpp_server_port, wrong_pass=True)
- self.assert_(self.client.get_connect_type())
- self.client_auth(credentials[0], 'wrong pass', credentials[2], sasl=1)
- self.assert_(self.connection.auth is None)
- self.do_disconnect()
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/test/integration/test_xmpp_transports_nb.py b/test/integration/test_xmpp_transports_nb.py
deleted file mode 100644
index 9f7fb0d5d..000000000
--- a/test/integration/test_xmpp_transports_nb.py
+++ /dev/null
@@ -1,276 +0,0 @@
-'''
-Integration test for tranports classes. See unit for the ordinary
-unit tests of this module.
-'''
-
-import unittest
-import socket
-
-import lib
-lib.setup_env()
-
-from xmpp_mocks import IdleQueueThread, IdleMock
-from common.xmpp import transports_nb
-
-
-class AbstractTransportTest(unittest.TestCase):
- ''' Encapsulates Idlequeue instantiation for transports and more...'''
-
- def setUp(self):
- ''' IdleQueue thread is run and dummy connection is created. '''
- self.idlequeue_thread = IdleQueueThread()
- self.idlequeue_thread.start()
- self._setup_hook()
-
- def tearDown(self):
- ''' IdleQueue thread is stopped. '''
- self._teardown_hook()
- self.idlequeue_thread.stop_thread()
- self.idlequeue_thread.join()
-
- def _setup_hook(self):
- pass
-
- def _teardown_hook(self):
- pass
-
- def expect_receive(self, expected, count=1, msg=None):
- '''
- Returns a callback function that will assert whether the data passed to
- it equals the one specified when calling this function.
-
- Can be used to make sure transport dispatch correct data.
- '''
- def receive(data, *args, **kwargs):
- self.assertEqual(data, expected, msg=msg)
- self._expected_count -= 1
- self._expected_count = count
- return receive
-
- def have_received_expected(self):
- '''
- Plays together with expect_receive(). Will return true if expected_rcv
- callback was called as often as specified
- '''
- return self._expected_count == 0
-
-
-class TestNonBlockingTCP(AbstractTransportTest):
- '''
- Test class for NonBlockingTCP. Will actually try to connect to an existing
- XMPP server.
- '''
- class MockClient(IdleMock):
- ''' Simple client to test transport functionality '''
- def __init__(self, idlequeue, testcase):
- self.idlequeue = idlequeue
- self.testcase = testcase
- IdleMock.__init__(self)
-
- def do_connect(self, establish_tls=False, proxy_dict=None):
- try:
- ips = socket.getaddrinfo('gajim.org', 5222,
- socket.AF_UNSPEC, socket.SOCK_STREAM)
- ip = ips[0]
- except socket.error as e:
- self.testcase.fail(msg=str(e))
-
- self.socket = transports_nb.NonBlockingTCP(
- raise_event=lambda event_type, data: self.testcase.assertTrue(
- event_type and data),
- on_disconnect=lambda: self.on_success(mode='SocketDisconnect'),
- idlequeue=self.idlequeue,
- estabilish_tls=establish_tls,
- certs=('../data/other/cacerts.pem', 'tmp/cacerts.pem'),
- proxy_dict=proxy_dict)
-
- self.socket.PlugIn(self)
-
- self.socket.connect(conn_5tuple=ip,
- on_connect=lambda: self.on_success(mode='TCPconnect'),
- on_connect_failure=self.on_failure)
- self.testcase.assertTrue(self.wait(), msg='Connection timed out')
-
- def do_disconnect(self):
- self.socket.disconnect()
- self.testcase.assertTrue(self.wait(), msg='Disconnect timed out')
-
- def on_failure(self, err_message):
- self.set_event()
- self.testcase.fail(msg=err_message)
-
- def on_success(self, mode, data=None):
- if mode == "TCPconnect":
- pass
- if mode == "SocketDisconnect":
- pass
- self.set_event()
-
- def _setup_hook(self):
- self.client = self.MockClient(idlequeue=self.idlequeue_thread.iq,
- testcase=self)
-
- def _teardown_hook(self):
- if self.client.socket.state == 'CONNECTED':
- self.client.do_disconnect()
-
- def test_connect_disconnect_plain(self):
- ''' Establish plain connection '''
- self.client.do_connect(establish_tls=False)
- self.assertEquals(self.client.socket.state, 'CONNECTED')
- self.client.do_disconnect()
- self.assertEquals(self.client.socket.state, 'DISCONNECTED')
-
-# def test_connect_disconnect_ssl(self):
-# ''' Establish SSL (not TLS) connection '''
-# self.client.do_connect(establish_tls=True)
-# self.assertEquals(self.client.socket.state, 'CONNECTED')
-# self.client.do_disconnect()
-# self.assertEquals(self.client.socket.state, 'DISCONNECTED')
-
- def test_do_receive(self):
- ''' Test _do_receive method by overwriting socket.recv '''
- self.client.do_connect()
- sock = self.client.socket
-
- # transport shall receive data
- data = "Please don't fail"
- sock._recv = lambda buffer: data
- sock.onreceive(self.expect_receive(data))
- sock._do_receive()
- self.assertTrue(self.have_received_expected(), msg='Did not receive data')
- self.assert_(self.client.socket.state == 'CONNECTED')
-
- # transport shall do nothing as an non-fatal SSL is simulated
- sock._recv = lambda buffer: None
- sock.onreceive(self.assertFalse) # we did not receive anything...
- sock._do_receive()
- self.assert_(self.client.socket.state == 'CONNECTED')
-
- # transport shall disconnect as remote side closed the connection
- sock._recv = lambda buffer: ''
- sock.onreceive(self.assertFalse) # we did not receive anything...
- sock._do_receive()
- self.assert_(self.client.socket.state == 'DISCONNECTED')
-
- def test_do_send(self):
- ''' Test _do_send method by overwriting socket.send '''
- self.client.do_connect()
- sock = self.client.socket
-
- outgoing = [] # what we have actually send to our socket.socket
- data_part1 = "Please don't "
- data_part2 = "fail!"
- data_complete = data_part1 + data_part2
-
- # Simulate everything could be send in one go
- def _send_all(data):
- outgoing.append(data)
- return len(data)
- sock._send = _send_all
- sock.send(data_part1)
- sock.send(data_part2)
- sock._do_send()
- sock._do_send()
- self.assertTrue(self.client.socket.state == 'CONNECTED')
- self.assertTrue(data_part1 in outgoing and data_part2 in outgoing)
- self.assertFalse(sock.sendqueue and sock.sendbuff,
- msg='There is still unsend data in buffers')
-
- # Simulate data could only be sent in chunks
- self.chunk_count = 0
- outgoing = []
- def _send_chunks(data):
- if self.chunk_count == 0:
- outgoing.append(data_part1)
- self.chunk_count += 1
- return len(data_part1)
- else:
- outgoing.append(data_part2)
- return len(data_part2)
- sock._send = _send_chunks
- sock.send(data_complete)
- sock._do_send() # process first chunk
- sock._do_send() # process the second one
- self.assertTrue(self.client.socket.state == 'CONNECTED')
- self.assertTrue(data_part1 in outgoing and data_part2 in outgoing)
- self.assertFalse(sock.sendqueue and sock.sendbuff,
- msg='There is still unsend data in buffers')
-
-
-class TestNonBlockingHTTP(AbstractTransportTest):
- ''' Test class for NonBlockingHTTP transport'''
-
- bosh_http_dict = {
- 'http_uri': 'http://gajim.org:5280/http-bind',
- 'http_version': 'HTTP/1.1',
- 'http_persistent': True,
- 'add_proxy_headers': False
- }
-
- def _get_transport(self, http_dict, proxy_dict=None):
- return transports_nb.NonBlockingHTTP(
- raise_event=None,
- on_disconnect=None,
- idlequeue=self.idlequeue_thread.iq,
- estabilish_tls=False,
- certs=None,
- on_http_request_possible=lambda: None,
- on_persistent_fallback=None,
- http_dict=http_dict,
- proxy_dict=proxy_dict,
- )
-
- def test_parse_own_http_message(self):
- ''' Build a HTTP message and try to parse it afterwards '''
- transport = self._get_transport(self.bosh_http_dict)
-
- data = "<test>Please don't fail!</test>"
- http_message = transport.build_http_message(data)
- statusline, headers, http_body, buffer_rest = transport.parse_http_message(
- http_message)
-
- self.assertFalse(bool(buffer_rest))
- self.assertTrue(statusline and isinstance(statusline, list))
- self.assertTrue(headers and isinstance(headers, dict))
- self.assertEqual(data, http_body, msg='Input and output are different')
-
- def test_receive_http_message(self):
- ''' Let _on_receive handle some http messages '''
- transport = self._get_transport(self.bosh_http_dict)
-
- header = ("HTTP/1.1 200 OK\r\nContent-Type: text/xml; charset=utf-8\r\n" +
- "Content-Length: 88\r\n\r\n")
- payload = "<test>Please don't fail!</test>"
- body = "<body xmlns='http://jabber.org/protocol/httpbind'>%s</body>" \
- % payload
- message = "%s%s" % (header, body)
-
- # try to receive in one go
- transport.onreceive(self.expect_receive(body, msg='Failed: In one go'))
- transport._on_receive(message)
- self.assertTrue(self.have_received_expected(), msg='Failed: In one go')
-
- def test_receive_http_message_in_chunks(self):
- ''' Let _on_receive handle some chunked http messages '''
- transport = self._get_transport(self.bosh_http_dict)
-
- payload = "<test>Please don't fail!\n\n</test>"
- body = "<body xmlns='http://jabber.org/protocol/httpbind'>%s</body>" \
- % payload
- header = "HTTP/1.1 200 OK\r\nContent-Type: text/xml; charset=utf-8\r\n" +\
- "Content-Length: %i\r\n\r\n" % len(body)
- message = "%s%s" % (header, body)
-
- chunk1, chunk2, chunk3, chunk4 = message[:20], message[20:73], \
- message[73:85], message[85:]
- nextmessage_chunk = "\r\n\r\nHTTP/1.1 200 OK\r\nContent-Type: text/x"
- chunks = (chunk1, chunk2, chunk3, chunk4, nextmessage_chunk)
-
- transport.onreceive(self.expect_receive(body, msg='Failed: In chunks'))
- for chunk in chunks:
- transport._on_receive(chunk)
- self.assertTrue(self.have_received_expected(), msg='Failed: In chunks')
-
-if __name__ == '__main__':
- unittest.main()