Welcome to mirror list, hosted at ThFree Co, Russian Federation.

dev.gajim.org/gajim/python-nbxmpp.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorlovetox <philipp@hoerist.com>2020-02-23 18:32:41 +0300
committerlovetox <philipp@hoerist.com>2020-03-07 22:21:53 +0300
commit4f5d18b4d6b18359abd532022414e6bdbd8735f5 (patch)
tree3bb3993733e86ad42e34c57f588280a7ffd85f96 /test
parent78a8dd4c8abc61521ec1d8791ea071e0059f6cfe (diff)
Fix tests
Diffstat (limited to 'test')
-rw-r--r--test/__init__.py5
-rw-r--r--test/broken/test_xmpp_smacks.py127
-rw-r--r--test/lib/util.py17
-rw-r--r--test/unit/test_activity.py4
-rw-r--r--test/unit/test_avatar.py4
-rw-r--r--test/unit/test_bookmarks.py8
-rw-r--r--test/unit/test_datetime_parsing.py4
-rw-r--r--test/unit/test_delay_parsing.py4
-rw-r--r--test/unit/test_location.py4
-rw-r--r--test/unit/test_mood.py4
-rw-r--r--test/unit/test_pubsub.py12
-rw-r--r--test/unit/test_sasl_scram.py26
-rw-r--r--test/unit/test_tune.py4
-rw-r--r--test/unit/test_xml_vulnerability.py39
-rw-r--r--test/unit/test_xmpp_client.py162
-rw-r--r--test/unit/test_xmpp_dispatcher.py98
-rw-r--r--test/unit/test_xmpp_transports.py75
-rw-r--r--test/unit/test_xmpp_transports2.py279
18 files changed, 75 insertions, 801 deletions
diff --git a/test/__init__.py b/test/__init__.py
index e69de29..84a160e 100644
--- a/test/__init__.py
+++ b/test/__init__.py
@@ -0,0 +1,5 @@
+import logging
+
+# Prevents logging output in tests
+log = logging.getLogger('nbxmpp')
+log.setLevel(logging.CRITICAL)
diff --git a/test/broken/test_xmpp_smacks.py b/test/broken/test_xmpp_smacks.py
deleted file mode 100644
index 4ccd704..0000000
--- a/test/broken/test_xmpp_smacks.py
+++ /dev/null
@@ -1,127 +0,0 @@
-'''
-Tests for smacks.py Stream Management
-'''
-import unittest
-from unittest.mock import Mock
-
-from nbxmpp import dispatcher
-from nbxmpp import protocol
-from nbxmpp import smacks
-
-class TestDispatcherNB(unittest.TestCase):
- '''
- Test class for NonBlocking dispatcher. Tested dispatcher will be plugged
- into a mock client
- '''
- def setUp(self):
- self.dispatcher = dispatcher.XMPPDispatcher()
-
- # Setup mock client
- self.client = Mock()
- self.client.defaultNamespace = protocol.NS_CLIENT
- self.client.Connection = Mock() # mock transport
- self.con = self.client.Connection
- self.con.sm = smacks.Smacks()
-
- def tearDown(self):
- # Unplug if needed
- if hasattr(self.dispatcher, '_owner'):
- self.dispatcher.PlugOut()
-
- def _simulate_connect(self):
- self.dispatcher.PlugIn(self.client) # client is owner
- self.con.sm.PlugIn(self.client)
-
- # Simulate that we have established a connection
- self.dispatcher.StreamInit()
- self.dispatcher.ProcessNonBlocking("<stream:stream "
- "xmlns:stream='http://etherx.jabber.org/streams' "
- "xmlns='jabber:client'>")
- self.dispatcher.ProcessNonBlocking("<stream:features> "
- "<sm xmlns='urn:xmpp:sm:3'> <optional/> </sm> </stream:features>")
- self.con.sm.negociate()
- self.dispatcher.ProcessNonBlocking("<enabled xmlns='urn:xmpp:sm:3' "
- "id='some-long-sm-id' resume='true'/>")
- assert(self.con.sm.enabled)
-
- def _simulate_resume(self):
- self.con.sm.resume_request()
- # Resuming acknowledging 5 stanzas
- self.dispatcher.ProcessNonBlocking("<resumed xmlns='urn:xmpp:sm:3' "
- "id='some-long-sm-id' h='5'/>")
- assert(self.con.sm.resuming)
-
- def _send(self, send, r, stanza):
- for i in range(r):
- send(stanza)
- def test_messages(self):
- message = '<message><body>Helloo </body></message>'
- iq = '''<iq from='proxy.jabber.ru' to='j.xxxxxxxx.org/Gajim' type='error' id='18'>
- <query xmlns='http://jabber.org/protocol/bytestreams'/>
- <error code='403' type='auth'>
- <forbidden xmlns='urn:ietf:params:xml:ns:xmpp-stanzas'/>
- </error>
- </iq>'''
- presence = '''<presence from='xxxxxxxxx.com/Talk.v1044194B1E2' to='j.xxxxxxxx.org'>
- <priority>24</priority>
- <c node="http://www.google.com/xmpp/client/caps" ver="1.0.0.104" ext="share-v1 voice-v1" xmlns="http://jabber.org/protocol/caps"/>
- <x stamp="20110614T23:17:51" xmlns="jabber:x:delay"/>
- <status>In love Kakashi Sensei :P</status>
- <x xmlns="vcard-temp:x:update">
- <photo>db4b7c52e39ba28562c74542d5988d47f09108a3</photo>
- </x>
- </presence> '''
-
- self._simulate_connect()
- uqueue = self.con.sm.uqueue
- self.assertEqual(self.con.sm.out_h, 0)
- self.assertEqual(self.con.sm.in_h, 0)
-
- # The server sends 10 stanzas
- self._send(self.dispatcher.ProcessNonBlocking, 5, message)
- self._send(self.dispatcher.ProcessNonBlocking, 4, iq)
- self._send(self.dispatcher.ProcessNonBlocking, 1, presence)
-
- # The client has recieved 10 stanzas and sent none
- self.assertEqual(self.con.sm.in_h, 10)
- self.assertEqual(self.con.sm.out_h, 0)
-
- m = protocol.Message()
-
- # The client sends 10 stanzas
- for i in range(10):
- m = protocol.Message(body=str(i))
- self.dispatcher.send(m)
-
- # Client sends 10 stanzas and put them in the queue
- self.assertEqual(self.con.sm.out_h, 10)
- self.assertEqual(len(uqueue), 10)
-
- # The server acknowledges that it recieved 5 stanzas
- self.dispatcher.ProcessNonBlocking("<a xmlns='urn:xmpp:sm:3' h='5'/>")
- # 5 stanzas are removed from the queue, only 5 stanzas are left
-
- self.assertEqual(len(uqueue), 5)
-
- # Check for the right order of stanzas in the queue
- l = ['5', '6', '7', '8', '9']
- for i in uqueue:
- self.assertEqual(i.getBody(), l[0])
- l.pop(0)
-
- def test_resumption(self):
- self._simulate_connect()
-
- m = protocol.Message()
-
- # The client sends 5 stanzas
- for i in range(5):
- m = protocol.Message(body=str(i))
- self.dispatcher.send(m)
-
- self._simulate_resume()
- # No stanzas left
- self.assertEqual(len(self.con.sm.uqueue), 0)
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/test/lib/util.py b/test/lib/util.py
index 834d1b2..9caeddf 100644
--- a/test/lib/util.py
+++ b/test/lib/util.py
@@ -3,24 +3,19 @@ from unittest.mock import Mock
from test.lib.const import STREAM_START
-from nbxmpp import dispatcher
+from nbxmpp.dispatcher import StanzaDispatcher
from nbxmpp.protocol import NS_CLIENT
from nbxmpp.protocol import JID
class StanzaHandlerTest(unittest.TestCase):
def setUp(self):
- self.dispatcher = dispatcher.XMPPDispatcher()
-
# Setup mock client
self.client = Mock()
- self.client.get_bound_jid.return_value = JID('test@test.test')
- self.client.defaultNamespace = NS_CLIENT
- self.client.Connection = Mock() # mock transport
- self.con = self.client.Connection
+ self.client.is_websocket = False
+ self.dispatcher = StanzaDispatcher(self.client)
- self.dispatcher.PlugIn(self.client)
+ self.client.get_bound_jid.return_value = JID('test@test.test')
- # Simulate that we have established a connection
- self.dispatcher.StreamInit()
- self.dispatcher.ProcessNonBlocking(STREAM_START)
+ self.dispatcher.reset_parser()
+ self.dispatcher.process_data(STREAM_START)
diff --git a/test/unit/test_activity.py b/test/unit/test_activity.py
index 403cf7e..3cf0af3 100644
--- a/test/unit/test_activity.py
+++ b/test/unit/test_activity.py
@@ -45,9 +45,9 @@ class ActivityTest(StanzaHandlerTest):
</message>
'''
- self.dispatcher.RegisterHandler(
+ self.dispatcher.register_handler(
*StanzaHandler(name='message',
callback=_on_message,
ns=NS_PUBSUB_EVENT))
- self.dispatcher.ProcessNonBlocking(event)
+ self.dispatcher.process_data(event)
diff --git a/test/unit/test_avatar.py b/test/unit/test_avatar.py
index db201c6..70de7f3 100644
--- a/test/unit/test_avatar.py
+++ b/test/unit/test_avatar.py
@@ -50,9 +50,9 @@ class AvatarTest(StanzaHandlerTest):
</message>
'''
- self.dispatcher.RegisterHandler(
+ self.dispatcher.register_handler(
*StanzaHandler(name='message',
callback=_on_message,
ns=NS_PUBSUB_EVENT))
- self.dispatcher.ProcessNonBlocking(event)
+ self.dispatcher.process_data(event)
diff --git a/test/unit/test_bookmarks.py b/test/unit/test_bookmarks.py
index f8ded93..d581dc1 100644
--- a/test/unit/test_bookmarks.py
+++ b/test/unit/test_bookmarks.py
@@ -61,12 +61,12 @@ class BookmarkTest(StanzaHandlerTest):
</message>
'''
- self.dispatcher.RegisterHandler(
+ self.dispatcher.register_handler(
*StanzaHandler(name='message',
callback=_on_message,
ns=NS_PUBSUB_EVENT))
- self.dispatcher.ProcessNonBlocking(event)
+ self.dispatcher.process_data(event)
def test_bookmark_2_parsing(self):
def _on_message(_con, _stanza, properties):
@@ -106,9 +106,9 @@ class BookmarkTest(StanzaHandlerTest):
</message>
'''
- self.dispatcher.RegisterHandler(
+ self.dispatcher.register_handler(
*StanzaHandler(name='message',
callback=_on_message,
ns=NS_PUBSUB_EVENT))
- self.dispatcher.ProcessNonBlocking(event)
+ self.dispatcher.process_data(event)
diff --git a/test/unit/test_datetime_parsing.py b/test/unit/test_datetime_parsing.py
index 028a3c1..8398d4b 100644
--- a/test/unit/test_datetime_parsing.py
+++ b/test/unit/test_datetime_parsing.py
@@ -97,3 +97,7 @@ class TestDateTime(unittest.TestCase):
result = parse_datetime(
time_string, check_utc=True, epoch=True)
self.assertEqual(result, expected_value)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/unit/test_delay_parsing.py b/test/unit/test_delay_parsing.py
index 4530100..9277cf1 100644
--- a/test/unit/test_delay_parsing.py
+++ b/test/unit/test_delay_parsing.py
@@ -27,3 +27,7 @@ class TestHelpers(unittest.TestCase):
timestamp = parse_delay(message, not_from=['romeo.com'])
self.assertEqual(timestamp, 1031699305.0)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/unit/test_location.py b/test/unit/test_location.py
index 3a6a3fb..fe1cd5d 100644
--- a/test/unit/test_location.py
+++ b/test/unit/test_location.py
@@ -86,9 +86,9 @@ class LocationTest(StanzaHandlerTest):
</message>
'''
- self.dispatcher.RegisterHandler(
+ self.dispatcher.register_handler(
*StanzaHandler(name='message',
callback=_on_message,
ns=NS_PUBSUB_EVENT))
- self.dispatcher.ProcessNonBlocking(event)
+ self.dispatcher.process_data(event)
diff --git a/test/unit/test_mood.py b/test/unit/test_mood.py
index 0603f45..af184a8 100644
--- a/test/unit/test_mood.py
+++ b/test/unit/test_mood.py
@@ -41,9 +41,9 @@ class MoodTest(StanzaHandlerTest):
</message>
'''
- self.dispatcher.RegisterHandler(
+ self.dispatcher.register_handler(
*StanzaHandler(name='message',
callback=_on_message,
ns=NS_PUBSUB_EVENT))
- self.dispatcher.ProcessNonBlocking(event)
+ self.dispatcher.process_data(event)
diff --git a/test/unit/test_pubsub.py b/test/unit/test_pubsub.py
index baca203..281383d 100644
--- a/test/unit/test_pubsub.py
+++ b/test/unit/test_pubsub.py
@@ -29,13 +29,13 @@ class PubsubTest(StanzaHandlerTest):
</message>
'''
- self.dispatcher.RegisterHandler(
+ self.dispatcher.register_handler(
*StanzaHandler(name='message',
callback=_on_message,
ns=NS_PUBSUB_EVENT,
priority=16))
- self.dispatcher.ProcessNonBlocking(event)
+ self.dispatcher.process_data(event)
def test_delete_event(self):
def _on_message(_con, _stanza, properties):
@@ -59,13 +59,13 @@ class PubsubTest(StanzaHandlerTest):
</message>
'''
- self.dispatcher.RegisterHandler(
+ self.dispatcher.register_handler(
*StanzaHandler(name='message',
callback=_on_message,
ns=NS_PUBSUB_EVENT,
priority=16))
- self.dispatcher.ProcessNonBlocking(event)
+ self.dispatcher.process_data(event)
def test_retracted_event(self):
@@ -90,10 +90,10 @@ class PubsubTest(StanzaHandlerTest):
</message>
'''
- self.dispatcher.RegisterHandler(
+ self.dispatcher.register_handler(
*StanzaHandler(name='message',
callback=_on_message,
ns=NS_PUBSUB_EVENT,
priority=16))
- self.dispatcher.ProcessNonBlocking(event)
+ self.dispatcher.process_data(event)
diff --git a/test/unit/test_sasl_scram.py b/test/unit/test_sasl_scram.py
index 78124eb..d41e48e 100644
--- a/test/unit/test_sasl_scram.py
+++ b/test/unit/test_sasl_scram.py
@@ -2,27 +2,33 @@ import unittest
from unittest.mock import Mock
from nbxmpp.auth import SCRAM_SHA_1
+from nbxmpp.util import b64encode
+# Test vector from https://wiki.xmpp.org/web/SASL_and_SCRAM-SHA-1
class SCRAM(unittest.TestCase):
def setUp(self):
self.con = Mock()
self._method = SCRAM_SHA_1(self.con, None)
- self._method._client_nonce = '4691d8f313ddb02d2eed511d5617a5c6f72efa671613c598'
+ self._method._client_nonce = 'fyko+d2lbbFgONRv9qkxdawL'
+ self.maxDiff = None
+ self._username = 'user'
+ self._password = 'pencil'
- self._username = 'philw'
- self._password = 'testtest123'
-
- self.auth = '<auth xmlns="urn:ietf:params:xml:ns:xmpp-sasl" mechanism="SCRAM-SHA-1">eSwsbj1waGlsdyxyPTQ2OTFkOGYzMTNkZGIwMmQyZWVkNTExZDU2MTdhNWM2ZjcyZWZhNjcxNjEzYzU5OA==</auth>'
- self.challenge = 'cj00NjkxZDhmMzEzZGRiMDJkMmVlZDUxMWQ1NjE3YTVjNmY3MmVmYTY3MTYxM2M1OThDaEJpZGEyb0NJeks5S25QdGsxSUZnPT0scz1iZFkrbkRjdUhuVGFtNzgyaG9neHNnPT0saT00MDk2'
- self.response = '<response xmlns="urn:ietf:params:xml:ns:xmpp-sasl">Yz1lU3dzLHI9NDY5MWQ4ZjMxM2RkYjAyZDJlZWQ1MTFkNTYxN2E1YzZmNzJlZmE2NzE2MTNjNTk4Q2hCaWRhMm9DSXpLOUtuUHRrMUlGZz09LHA9NUd5a09hWCtSWlllR3E2L2U3YTE2UDVBeFVrPQ==</response>'
- self.success = 'dj1qMGtuNlVvT1FjTmJ0MGFlYnEwV1QzYWNkSW89'
+ self.auth = '<auth xmlns="urn:ietf:params:xml:ns:xmpp-sasl" mechanism="SCRAM-SHA-1">%s</auth>' % b64encode('n,,n=user,r=fyko+d2lbbFgONRv9qkxdawL')
+ self.challenge = b64encode('r=fyko+d2lbbFgONRv9qkxdawL3rfcNHYJY1ZVvWVs7j,s=QSXCR+Q6sek8bf92,i=4096')
+ self.response = '<response xmlns="urn:ietf:params:xml:ns:xmpp-sasl">%s</response>' % b64encode('c=biws,r=fyko+d2lbbFgONRv9qkxdawL3rfcNHYJY1ZVvWVs7j,p=v0X8v3Bz2T0CJGbJQyF0X+HI4Ts=')
+ self.success = b64encode('v=rmF9pqV8S7suAoZWja4dJRkFsKQ=')
def test_auth(self):
self._method.initiate(self._username, self._password)
- self.assertEqual(self.auth, str(self.con.send.call_args[0][0]))
+ self.assertEqual(self.auth, str(self.con.send_nonza.call_args[0][0]))
self._method.response(self.challenge)
- self.assertEqual(self.response, str(self.con.send.call_args[0][0]))
+ self.assertEqual(self.response, str(self.con.send_nonza.call_args[0][0]))
self._method.success(self.success)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/unit/test_tune.py b/test/unit/test_tune.py
index b6e03fd..fcb0f45 100644
--- a/test/unit/test_tune.py
+++ b/test/unit/test_tune.py
@@ -52,9 +52,9 @@ class TuneTest(StanzaHandlerTest):
</message>
'''
- self.dispatcher.RegisterHandler(
+ self.dispatcher.register_handler(
*StanzaHandler(name='message',
callback=_on_message,
ns=NS_PUBSUB_EVENT))
- self.dispatcher.ProcessNonBlocking(event)
+ self.dispatcher.process_data(event)
diff --git a/test/unit/test_xml_vulnerability.py b/test/unit/test_xml_vulnerability.py
index dcb0a7d..0db099e 100644
--- a/test/unit/test_xml_vulnerability.py
+++ b/test/unit/test_xml_vulnerability.py
@@ -7,15 +7,12 @@ from nbxmpp import dispatcher
class XMLVulnerability(unittest.TestCase):
def setUp(self):
- self.client = Mock()
- self.client.Connection = Mock()
- self.dispatcher = dispatcher.XMPPDispatcher()
- self.dispatcher.PlugIn(self.client)
- self.dispatcher.StreamInit()
-
- def tearDown(self):
- self.dispatcher = None
- self.client = None
+ self.stream = Mock()
+ self.stream.is_websocket = False
+ self.dispatcher = dispatcher.StanzaDispatcher(self.stream)
+ self._error_handler = Mock()
+ self.dispatcher.subscribe('parsing-error', self._error_handler)
+ self.dispatcher.reset_parser()
def test_exponential_entity_expansion(self):
bomb = """<?xml version="1.0" encoding="utf-8"?>
@@ -26,8 +23,8 @@ class XMLVulnerability(unittest.TestCase):
]>
<bomb>&c;</bomb>"""
- self.dispatcher.ProcessNonBlocking(bomb)
- self.client.Connection.disconnect.assert_called()
+ self.dispatcher.process_data(bomb)
+ self._error_handler.assert_called()
def test_quadratic_blowup(self):
bomb = """<?xml version="1.0" encoding="utf-8"?>
@@ -36,8 +33,8 @@ class XMLVulnerability(unittest.TestCase):
]>
<bomb>&a;&a;&a;... repeat</bomb>"""
- self.dispatcher.ProcessNonBlocking(bomb)
- self.client.Connection.disconnect.assert_called()
+ self.dispatcher.process_data(bomb)
+ self._error_handler.assert_called()
def test_external_entity_expansion(self):
bomb = """<?xml version="1.0" encoding="utf-8"?>
@@ -46,8 +43,8 @@ class XMLVulnerability(unittest.TestCase):
]>
<root>&ee;</root>"""
- self.dispatcher.ProcessNonBlocking(bomb)
- self.client.Connection.disconnect.assert_called()
+ self.dispatcher.process_data(bomb)
+ self._error_handler.assert_called()
def test_external_local_entity_expansion(self):
bomb = """<?xml version="1.0" encoding="utf-8"?>
@@ -57,8 +54,8 @@ class XMLVulnerability(unittest.TestCase):
]>
<root>&ee;</root>"""
- self.dispatcher.ProcessNonBlocking(bomb)
- self.client.Connection.disconnect.assert_called()
+ self.dispatcher.process_data(bomb)
+ self._error_handler.assert_called()
def test_dtd_retrival(self):
bomb = """<?xml version="1.0" encoding="utf-8"?>
@@ -69,5 +66,9 @@ class XMLVulnerability(unittest.TestCase):
<body>text</body>
</html>"""
- self.dispatcher.ProcessNonBlocking(bomb)
- self.client.Connection.disconnect.assert_called()
+ self.dispatcher.process_data(bomb)
+ self._error_handler.assert_called()
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/unit/test_xmpp_client.py b/test/unit/test_xmpp_client.py
deleted file mode 100644
index f2f0c5f..0000000
--- a/test/unit/test_xmpp_client.py
+++ /dev/null
@@ -1,162 +0,0 @@
-'''
-Testing script for NonBlockingClient class (nbxmpp/client.py)
-
-It actually connects to a xmpp server.
-'''
-
-import unittest
-from unittest.mock import Mock
-
-from test.lib.xmpp_mocks import MockConnection, IdleQueueThread
-
-from nbxmpp import client
-
-# (XMPP server hostname, c2s port). Script will connect to the machine.
-xmpp_server_port = ('gajim.org', 5222)
-
-# [username, password, resource]. Script will authenticate to server above
-credentials = ['unittest', 'testtest', 'res']
-
-@unittest.skip("gajim.org only supports TLS/SSL connections")
-class TestNonBlockingClient(unittest.TestCase):
- '''
- Test Cases class for NonBlockingClient.
- '''
- def setUp(self):
- ''' IdleQueue thread is run and dummy connection is created. '''
- self.idlequeue_thread = IdleQueueThread()
- self.connection = MockConnection() # for dummy callbacks
- self.idlequeue_thread.start()
-
- def tearDown(self):
- ''' IdleQueue thread is stopped. '''
- self.idlequeue_thread.stop_thread()
- self.idlequeue_thread.join()
- del self.connection
-
- self.client = None
-
- def open_stream(self, server_port, wrong_pass=False):
- '''
- Method opening the XMPP connection. It returns when <stream:features>
- is received from server.
-
- :param server_port: tuple of (hostname, port) for where the client should
- connect.
- '''
-
- class TempConnection():
- def get_password(self, cb, mechanism):
- if wrong_pass:
- cb('wrong pass')
- else:
- cb(credentials[1])
- def on_connect_failure(self):
- pass
-
- self.client = client.NonBlockingClient(
- domain=server_port[0],
- idlequeue=self.idlequeue_thread.iq,
- caller=Mock(spec=TempConnection))
-
- self.client.connect(
- hostname=server_port[0],
- port=server_port[1],
- on_connect=lambda *args: self.connection.on_connect(True, *args),
- on_connect_failure=lambda *args: self.connection.on_connect(
- False, *args))
-
- self.assertTrue(self.connection.wait(),
- msg='waiting for callback from client constructor')
-
- # if on_connect was called, client has to be connected and vice versa
- if self.connection.connect_succeeded:
- self.assertTrue(self.client.get_connect_type())
- else:
- self.assertTrue(not self.client.get_connect_type())
-
- def client_auth(self, username, password, resource, sasl):
- '''
- Method authenticating connected client with supplied credentials. Returns
- when authentication is over.
-
- :param sasl: whether to use sasl (sasl=1) or old (sasl=0) authentication
- :todo: to check and be more specific about when it returns
- (bind, session..)
- '''
- self.client.auth(username, password, resource, sasl,
- on_auth=self.connection.on_auth)
-
- self.assertTrue(self.connection.wait(), msg='waiting for authentication')
-
- def do_disconnect(self):
- '''
- Does disconnecting of connected client. Returns when TCP connection is
- closed.
- '''
- self.client.RegisterDisconnectHandler(self.connection.set_event)
- self.client.disconnect()
-
- self.assertTrue(self.connection.wait(), msg='waiting for disconnecting')
-
- def test_proper_connect_sasl(self):
- '''
- The ideal testcase - client is connected, authenticated with SASL and
- then disconnected.
- '''
- self.open_stream(xmpp_server_port)
-
- # if client is not connected, lets raise the AssertionError
- self.assertTrue(self.client.get_connect_type())
- # client.disconnect() is already called from NBClient via
- # _on_connected_failure, no need to call it here
-
- self.client_auth(credentials[0], credentials[1], credentials[2], sasl=1)
- self.assertTrue(self.connection.con)
- self.assertTrue(self.connection.auth=='sasl', msg='Unable to auth via SASL')
-
- self.do_disconnect()
-
- def test_proper_connect_oldauth(self):
- '''
- The ideal testcase - client is connected, authenticated with old auth and
- then disconnected.
- '''
- self.open_stream(xmpp_server_port)
- self.assertTrue(self.client.get_connect_type())
- self.client_auth(credentials[0], credentials[1], credentials[2], sasl=0)
- self.assertTrue(self.connection.con)
- features = self.client.Dispatcher.Stream.features
- if not features.getTag('auth'):
- print("Server doesn't support old authentication type, ignoring test")
- else:
- self.assertTrue(self.connection.auth=='old_auth',
- msg='Unable to auth via old_auth')
- self.do_disconnect()
-
- def test_connect_to_nonexisting_host(self):
- '''
- Connect to nonexisting host. DNS request for A records should return
- nothing.
- '''
- self.open_stream(('fdsfsdf.fdsf.fss', 5222))
- self.assertTrue(not self.client.get_connect_type())
-
- def test_connect_to_wrong_port(self):
- '''
- Connect to nonexisting server. DNS request for A records should return an
- IP but there shouldn't be XMPP server running on specified port.
- '''
- self.open_stream((xmpp_server_port[0], 31337))
- self.assertTrue(not self.client.get_connect_type())
-
- def test_connect_with_wrong_creds(self):
- '''
- Connecting with invalid password.
- '''
- self.open_stream(xmpp_server_port, wrong_pass=True)
- self.assertTrue(self.client.get_connect_type())
- self.client_auth(credentials[0], 'wrong pass', credentials[2], sasl=1)
- self.assertTrue(self.connection.auth is None)
- self.do_disconnect()
-
diff --git a/test/unit/test_xmpp_dispatcher.py b/test/unit/test_xmpp_dispatcher.py
deleted file mode 100644
index bf66db9..0000000
--- a/test/unit/test_xmpp_dispatcher.py
+++ /dev/null
@@ -1,98 +0,0 @@
-'''
-Tests for dispatcher.py
-'''
-import unittest
-from unittest.mock import Mock
-
-from test import lib
-
-from nbxmpp import dispatcher
-from nbxmpp import protocol
-from nbxmpp.protocol import JID
-
-
-class TestDispatcherNB(unittest.TestCase):
- '''
- Test class for NonBlocking dispatcher. Tested dispatcher will be plugged
- into a mock client
- '''
- def setUp(self):
- self.dispatcher = dispatcher.XMPPDispatcher()
-
- # Setup mock client
- self.client = Mock()
- self.client.get_bound_jid.return_value = JID('test@test.test')
- self.client.defaultNamespace = protocol.NS_CLIENT
- self.client.Connection = Mock() # mock transport
- self.con = self.client.Connection
-
- def tearDown(self):
- # Unplug if needed
- if hasattr(self.dispatcher, '_owner'):
- self.dispatcher.PlugOut()
-
- def _simulate_connect(self):
- self.dispatcher.PlugIn(self.client) # client is owner
- # Simulate that we have established a connection
- self.dispatcher.StreamInit()
- self.dispatcher.ProcessNonBlocking("<stream:stream xmlns:stream='http://etherx.jabber.org/streams' xmlns='jabber:client'>")
-
- def test_unbound_namespace_prefix(self):
- '''tests our handling of a message with an unbound namespace prefix'''
- self._simulate_connect()
-
- msgs = []
- def _got_message(conn, msg):
- msgs.append(msg)
- self.dispatcher.RegisterHandler('message', _got_message)
-
- # should be able to parse a normal message
- self.dispatcher.ProcessNonBlocking('<message from="test@test.at"><body>hello</body></message>')
- self.assertEqual(1, len(msgs))
-
- self.dispatcher.ProcessNonBlocking('<message from="test@test.at"><x:y/></message>')
- self.assertEqual(2, len(msgs))
- # we should not have been disconnected after that message
- self.con.pollend.assert_not_called()
- self.con.disconnect.assert_not_called()
-
- # we should be able to keep parsing
- self.dispatcher.ProcessNonBlocking('<message from="test@test.at"><body>still here?</body></message>')
- self.assertEqual(3, len(msgs))
-
- def test_process_non_blocking(self):
- ''' Check for ProcessNonBlocking return types '''
- self._simulate_connect()
- process = self.dispatcher.ProcessNonBlocking
-
- # length of data expected
- data = "Please don't fail"
- result = process(data)
- self.assertEqual(result, len(data))
-
- # no data processed, link shall still be active
- result = process('')
- self.assertEqual(result, '0')
-
- self.con.pollend.assert_not_called()
- self.con.disconnect.assert_not_called()
-
- # simulate disconnect
- result = process('</stream:stream>')
- self.client.disconnect.assert_called_once()
-
- def test_return_stanza_handler(self):
- ''' Test sasl_error_conditions transformation in protocol.py '''
- # quick'n dirty...I wasn't aware of it existance and thought it would
- # always fail :-)
- self._simulate_connect()
- stanza = "<iq type='get' />"
- def send(data):
- print(lib.xml2str_sorted(data))
- self.assertEqual(lib.xml2str_sorted(data), '<iq xmlns="jabber:client" from="test@test.test" to="test@test.test" type="error"><error code="501" type="cancel"><feature-not-implemented xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" /><text xmlns="urn:ietf:params:xml:ns:xmpp-stanzas">The feature requested is not implemented by the recipient or server and therefore cannot be processed.</text></error></iq>')
- self.client.send = send
- self.dispatcher.ProcessNonBlocking(stanza)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/test/unit/test_xmpp_transports.py b/test/unit/test_xmpp_transports.py
deleted file mode 100644
index e79854b..0000000
--- a/test/unit/test_xmpp_transports.py
+++ /dev/null
@@ -1,75 +0,0 @@
-'''
-Unit test for tranports classes.
-'''
-
-import unittest
-
-from nbxmpp import transports
-
-
-class TestModuleLevelFunctions(unittest.TestCase):
- '''
- Test class for functions defined at module level
- '''
- def test_urisplit(self):
- def check_uri(uri, proto, host, port, path):
- _proto, _host, _port, _path = transports.urisplit(uri)
- self.assertEqual(proto, _proto)
- self.assertEqual(host, _host)
- self.assertEqual(path, _path)
- self.assertEqual(port, _port)
-
- check_uri('http://httpcm.jabber.org:5280/webclient', proto='http',
- host='httpcm.jabber.org', port=5280, path='/webclient')
-
- check_uri('http://httpcm.jabber.org/webclient', proto='http',
- host='httpcm.jabber.org', port=80, path='/webclient')
-
- check_uri('https://httpcm.jabber.org/webclient', proto='https',
- host='httpcm.jabber.org', port=443, path='/webclient')
-
- def test_get_proxy_data_from_dict(self):
- def check_dict(proxy_dict, host, port, user, passwd):
- _host, _port, _user, _passwd = transports.get_proxy_data_from_dict(
- proxy_dict)
- self.assertEqual(_host, host)
- self.assertEqual(_port, port)
- self.assertEqual(_user, user)
- self.assertEqual(_passwd, passwd)
-
- bosh_dict = {'bosh_content': u'text/xml; charset=utf-8',
- 'bosh_hold': 2,
- 'bosh_http_pipelining': False,
- 'bosh_uri': u'http://gajim.org:5280/http-bind',
- 'bosh_useproxy': False,
- 'bosh_wait': 30,
- 'bosh_wait_for_restart_response': False,
- 'host': u'172.16.99.11',
- 'pass': u'pass',
- 'port': 3128,
- 'type': u'bosh',
- 'useauth': True,
- 'user': u'user'}
- check_dict(bosh_dict, host=u'gajim.org', port=5280, user=u'user',
- passwd=u'pass')
-
- proxy_dict = {'bosh_content': u'text/xml; charset=utf-8',
- 'bosh_hold': 2,
- 'bosh_http_pipelining': False,
- 'bosh_port': 5280,
- 'bosh_uri': u'',
- 'bosh_useproxy': True,
- 'bosh_wait': 30,
- 'bosh_wait_for_restart_response': False,
- 'host': u'172.16.99.11',
- 'pass': u'pass',
- 'port': 3128,
- 'type': 'socks5',
- 'useauth': True,
- 'user': u'user'}
- check_dict(proxy_dict, host=u'172.16.99.11', port=3128, user=u'user',
- passwd=u'pass')
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/test/unit/test_xmpp_transports2.py b/test/unit/test_xmpp_transports2.py
deleted file mode 100644
index 0bdd721..0000000
--- a/test/unit/test_xmpp_transports2.py
+++ /dev/null
@@ -1,279 +0,0 @@
-'''
-Integration test for tranports classes. See unit for the ordinary
-unit tests of this module.
-'''
-
-import unittest
-import socket
-
-from test.lib.xmpp_mocks import IdleQueueThread, IdleMock
-from nbxmpp import transports
-
-
-class AbstractTransportTest(unittest.TestCase):
- ''' Encapsulates Idlequeue instantiation for transports and more...'''
-
- def setUp(self):
- ''' IdleQueue thread is run and dummy connection is created. '''
- self.idlequeue_thread = IdleQueueThread()
- self.idlequeue_thread.start()
- self._setup_hook()
-
- def tearDown(self):
- ''' IdleQueue thread is stopped. '''
- self._teardown_hook()
- self.idlequeue_thread.stop_thread()
- self.idlequeue_thread.join()
-
- def _setup_hook(self):
- pass
-
- def _teardown_hook(self):
- pass
-
- def expect_receive(self, expected, count=1, msg=None):
- '''
- Returns a callback function that will assert whether the data passed to
- it equals the one specified when calling this function.
-
- Can be used to make sure transport dispatch correct data.
- '''
- def receive(data, *args, **kwargs):
- self.assertEqual(data, expected, msg=msg)
- self._expected_count -= 1
- self._expected_count = count
- return receive
-
- def have_received_expected(self):
- '''
- Plays together with expect_receive(). Will return true if expected_rcv
- callback was called as often as specified
- '''
- return self._expected_count == 0
-
-
-class TestNonBlockingTCP(AbstractTransportTest):
- '''
- Test class for NonBlockingTCP. Will actually try to connect to an existing
- XMPP server.
- '''
- class MockClient(IdleMock):
- ''' Simple client to test transport functionality '''
- def __init__(self, idlequeue, testcase):
- self.idlequeue = idlequeue
- self.testcase = testcase
- IdleMock.__init__(self)
-
- def do_connect(self, establish_tls=False, proxy_dict=None):
- try:
- ips = socket.getaddrinfo('gajim.org', 5222,
- socket.AF_UNSPEC, socket.SOCK_STREAM)
- ip = ips[0]
- except socket.error as e:
- self.testcase.fail(msg=str(e))
-
- self.socket = transports.NonBlockingTCP(
- raise_event=lambda event_type, data: self.testcase.assertTrue(
- event_type and data),
- on_disconnect=lambda: self.on_success(mode='SocketDisconnect'),
- idlequeue=self.idlequeue,
- estabilish_tls=establish_tls,
- certs=('../data/other/cacerts.pem', 'tmp/cacerts.pem'),
- tls_version=None,
- cipher_list=None,
- alpn=True,
- proxy_dict=proxy_dict)
-
- self.socket.PlugIn(self)
-
- self.socket.connect(conn_5tuple=ip,
- on_connect=lambda: self.on_success(mode='TCPconnect'),
- on_connect_failure=self.on_failure)
- self.testcase.assertTrue(self.wait(), msg='Connection timed out')
-
- def do_disconnect(self):
- self.socket.disconnect()
- self.testcase.assertTrue(self.wait(), msg='Disconnect timed out')
-
- def on_failure(self, err_message):
- self.set_event()
- self.testcase.fail(msg=err_message)
-
- def on_success(self, mode, data=None):
- if mode == "TCPconnect":
- pass
- if mode == "SocketDisconnect":
- pass
- self.set_event()
-
- def _setup_hook(self):
- self.client = self.MockClient(idlequeue=self.idlequeue_thread.iq,
- testcase=self)
-
- def _teardown_hook(self):
- if self.client.socket.state == 'CONNECTED':
- self.client.do_disconnect()
-
- def test_connect_disconnect_plain(self):
- ''' Establish plain connection '''
- self.client.do_connect(establish_tls=False)
- self.assertEqual(self.client.socket.state, 'CONNECTED')
- self.client.do_disconnect()
- self.assertEqual(self.client.socket.state, 'DISCONNECTED')
-
-# def test_connect_disconnect_ssl(self):
-# ''' Establish SSL (not TLS) connection '''
-# self.client.do_connect(establish_tls=True)
-# self.assertEquals(self.client.socket.state, 'CONNECTED')
-# self.client.do_disconnect()
-# self.assertEquals(self.client.socket.state, 'DISCONNECTED')
-
- def test_do_receive(self):
- ''' Test _do_receive method by overwriting socket.recv '''
- self.client.do_connect()
- sock = self.client.socket
-
- # transport shall receive data
- data = "Please don't fail"
- sock._recv = lambda buffer: data
- sock.onreceive(self.expect_receive(data))
- sock._do_receive()
- self.assertTrue(self.have_received_expected(), msg='Did not receive data')
- self.assertEqual(self.client.socket.state, 'CONNECTED')
-
- # transport shall do nothing as an non-fatal SSL is simulated
- sock._recv = lambda buffer: None
- sock.onreceive(self.assertFalse) # we did not receive anything...
- sock._do_receive()
- self.assertEqual(self.client.socket.state, 'CONNECTED')
-
- # transport shall disconnect as remote side closed the connection
- sock._recv = lambda buffer: ''
- sock.onreceive(self.assertFalse) # we did not receive anything...
- sock._do_receive()
- self.assertEqual(self.client.socket.state, 'DISCONNECTED')
-
- def test_do_send(self):
- ''' Test _do_send method by overwriting socket.send '''
- self.client.do_connect()
- sock = self.client.socket
-
- outgoing = [] # what we have actually send to our socket.socket
- data_part1 = "Please don't "
- data_part2 = "fail!"
- data_complete = data_part1 + data_part2
-
- # Simulate everything could be send in one go
- def _send_all(data):
- outgoing.append(data)
- return len(data)
- sock._send = _send_all
- sock.send(data_part1)
- sock.send(data_part2)
- sock._do_send()
- sock._do_send()
- self.assertTrue(self.client.socket.state == 'CONNECTED')
- self.assertTrue(data_part1.encode('utf8') in outgoing)
- self.assertTrue(data_part2.encode('utf8') in outgoing)
- self.assertFalse(sock.sendqueue and sock.sendbuff,
- msg='There is still unsend data in buffers')
-
- # Simulate data could only be sent in chunks
- self.chunk_count = 0
- outgoing = []
- def _send_chunks(data):
- if self.chunk_count == 0:
- outgoing.append(data_part1)
- self.chunk_count += 1
- return len(data_part1)
- else:
- outgoing.append(data_part2)
- return len(data_part2)
- sock._send = _send_chunks
- sock.send(data_complete)
- sock._do_send() # process first chunk
- sock._do_send() # process the second one
- self.assertTrue(self.client.socket.state == 'CONNECTED')
- self.assertTrue(data_part1 in outgoing and data_part2 in outgoing)
- self.assertFalse(sock.sendqueue and sock.sendbuff,
- msg='There is still unsend data in buffers')
-
-
-class TestNonBlockingHTTP(AbstractTransportTest):
- ''' Test class for NonBlockingHTTP transport'''
-
- bosh_http_dict = {
- 'http_uri': 'http://gajim.org:5280/http-bind',
- 'http_version': 'HTTP/1.1',
- 'http_persistent': True,
- 'add_proxy_headers': False
- }
-
- def _get_transport(self, http_dict, proxy_dict=None):
- return transports.NonBlockingHTTP(
- raise_event=None,
- on_disconnect=None,
- idlequeue=self.idlequeue_thread.iq,
- estabilish_tls=False,
- certs=None,
- tls_version=None,
- cipher_list=None,
- on_http_request_possible=lambda: None,
- on_persistent_fallback=None,
- http_dict=http_dict,
- proxy_dict=proxy_dict,
- )
-
- def test_parse_own_http_message(self):
- ''' Build a HTTP message and try to parse it afterwards '''
- transport = self._get_transport(self.bosh_http_dict)
-
- data = "<test>Please don't fail!</test>"
- http_message = transport.build_http_message(data.encode('utf-8'))
- statusline, headers, http_body, buffer_rest = \
- transport.parse_http_message(http_message.decode('utf-8'))
-
- self.assertFalse(bool(buffer_rest))
- self.assertTrue(statusline and isinstance(statusline, list))
- self.assertTrue(headers and isinstance(headers, dict))
- self.assertEqual(data, http_body, msg='Input and output are different')
-
- def test_receive_http_message(self):
- ''' Let _on_receive handle some http messages '''
- transport = self._get_transport(self.bosh_http_dict)
-
- header = ("HTTP/1.1 200 OK\r\nContent-Type: text/xml; charset=utf-8\r\n" +
- "Content-Length: 88\r\n\r\n")
- payload = "<test>Please don't fail!</test>"
- body = "<body xmlns='http://jabber.org/protocol/httpbind'>%s</body>" \
- % payload
- message = "%s%s" % (header, body)
-
- # try to receive in one go
- transport.onreceive(self.expect_receive(body, msg='Failed: In one go'))
- transport._on_receive(message)
- self.assertTrue(self.have_received_expected(), msg='Failed: In one go')
-
- def test_receive_http_message_in_chunks(self):
- ''' Let _on_receive handle some chunked http messages '''
- transport = self._get_transport(self.bosh_http_dict)
-
- payload = "<test>Please don't fail!\n\n</test>"
- body = "<body xmlns='http://jabber.org/protocol/httpbind'>%s</body>" \
- % payload
- header = "HTTP/1.1 200 OK\r\nContent-Type: text/xml; charset=utf-8\r\n" +\
- "Content-Length: %i\r\n\r\n" % len(body)
- message = "%s%s" % (header, body)
-
- chunk1, chunk2, chunk3, chunk4 = message[:20], message[20:73], \
- message[73:85], message[85:]
- nextmessage_chunk = "HTTP/1.1 200 OK\r\nContent-Type: text/x"
- chunks = (chunk1, chunk2, chunk3, chunk4, nextmessage_chunk)
-
- transport.onreceive(self.expect_receive(body, msg='Failed: In chunks'))
- for chunk in chunks:
- transport._on_receive(chunk)
- self.assertTrue(self.have_received_expected(), msg='Failed: In chunks')
-
-if __name__ == '__main__':
- unittest.main()