diff options
author | lovetox <philipp@hoerist.com> | 2020-02-23 18:32:41 +0300 |
---|---|---|
committer | lovetox <philipp@hoerist.com> | 2020-03-07 22:21:53 +0300 |
commit | 4f5d18b4d6b18359abd532022414e6bdbd8735f5 (patch) | |
tree | 3bb3993733e86ad42e34c57f588280a7ffd85f96 /test | |
parent | 78a8dd4c8abc61521ec1d8791ea071e0059f6cfe (diff) |
Fix tests
Diffstat (limited to 'test')
-rw-r--r-- | test/__init__.py | 5 | ||||
-rw-r--r-- | test/broken/test_xmpp_smacks.py | 127 | ||||
-rw-r--r-- | test/lib/util.py | 17 | ||||
-rw-r--r-- | test/unit/test_activity.py | 4 | ||||
-rw-r--r-- | test/unit/test_avatar.py | 4 | ||||
-rw-r--r-- | test/unit/test_bookmarks.py | 8 | ||||
-rw-r--r-- | test/unit/test_datetime_parsing.py | 4 | ||||
-rw-r--r-- | test/unit/test_delay_parsing.py | 4 | ||||
-rw-r--r-- | test/unit/test_location.py | 4 | ||||
-rw-r--r-- | test/unit/test_mood.py | 4 | ||||
-rw-r--r-- | test/unit/test_pubsub.py | 12 | ||||
-rw-r--r-- | test/unit/test_sasl_scram.py | 26 | ||||
-rw-r--r-- | test/unit/test_tune.py | 4 | ||||
-rw-r--r-- | test/unit/test_xml_vulnerability.py | 39 | ||||
-rw-r--r-- | test/unit/test_xmpp_client.py | 162 | ||||
-rw-r--r-- | test/unit/test_xmpp_dispatcher.py | 98 | ||||
-rw-r--r-- | test/unit/test_xmpp_transports.py | 75 | ||||
-rw-r--r-- | test/unit/test_xmpp_transports2.py | 279 |
18 files changed, 75 insertions, 801 deletions
diff --git a/test/__init__.py b/test/__init__.py index e69de29..84a160e 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -0,0 +1,5 @@ +import logging + +# Prevents logging output in tests +log = logging.getLogger('nbxmpp') +log.setLevel(logging.CRITICAL) diff --git a/test/broken/test_xmpp_smacks.py b/test/broken/test_xmpp_smacks.py deleted file mode 100644 index 4ccd704..0000000 --- a/test/broken/test_xmpp_smacks.py +++ /dev/null @@ -1,127 +0,0 @@ -''' -Tests for smacks.py Stream Management -''' -import unittest -from unittest.mock import Mock - -from nbxmpp import dispatcher -from nbxmpp import protocol -from nbxmpp import smacks - -class TestDispatcherNB(unittest.TestCase): - ''' - Test class for NonBlocking dispatcher. Tested dispatcher will be plugged - into a mock client - ''' - def setUp(self): - self.dispatcher = dispatcher.XMPPDispatcher() - - # Setup mock client - self.client = Mock() - self.client.defaultNamespace = protocol.NS_CLIENT - self.client.Connection = Mock() # mock transport - self.con = self.client.Connection - self.con.sm = smacks.Smacks() - - def tearDown(self): - # Unplug if needed - if hasattr(self.dispatcher, '_owner'): - self.dispatcher.PlugOut() - - def _simulate_connect(self): - self.dispatcher.PlugIn(self.client) # client is owner - self.con.sm.PlugIn(self.client) - - # Simulate that we have established a connection - self.dispatcher.StreamInit() - self.dispatcher.ProcessNonBlocking("<stream:stream " - "xmlns:stream='http://etherx.jabber.org/streams' " - "xmlns='jabber:client'>") - self.dispatcher.ProcessNonBlocking("<stream:features> " - "<sm xmlns='urn:xmpp:sm:3'> <optional/> </sm> </stream:features>") - self.con.sm.negociate() - self.dispatcher.ProcessNonBlocking("<enabled xmlns='urn:xmpp:sm:3' " - "id='some-long-sm-id' resume='true'/>") - assert(self.con.sm.enabled) - - def _simulate_resume(self): - self.con.sm.resume_request() - # Resuming acknowledging 5 stanzas - self.dispatcher.ProcessNonBlocking("<resumed xmlns='urn:xmpp:sm:3' " - "id='some-long-sm-id' h='5'/>") - assert(self.con.sm.resuming) - - def _send(self, send, r, stanza): - for i in range(r): - send(stanza) - def test_messages(self): - message = '<message><body>Helloo </body></message>' - iq = '''<iq from='proxy.jabber.ru' to='j.xxxxxxxx.org/Gajim' type='error' id='18'> - <query xmlns='http://jabber.org/protocol/bytestreams'/> - <error code='403' type='auth'> - <forbidden xmlns='urn:ietf:params:xml:ns:xmpp-stanzas'/> - </error> - </iq>''' - presence = '''<presence from='xxxxxxxxx.com/Talk.v1044194B1E2' to='j.xxxxxxxx.org'> - <priority>24</priority> - <c node="http://www.google.com/xmpp/client/caps" ver="1.0.0.104" ext="share-v1 voice-v1" xmlns="http://jabber.org/protocol/caps"/> - <x stamp="20110614T23:17:51" xmlns="jabber:x:delay"/> - <status>In love Kakashi Sensei :P</status> - <x xmlns="vcard-temp:x:update"> - <photo>db4b7c52e39ba28562c74542d5988d47f09108a3</photo> - </x> - </presence> ''' - - self._simulate_connect() - uqueue = self.con.sm.uqueue - self.assertEqual(self.con.sm.out_h, 0) - self.assertEqual(self.con.sm.in_h, 0) - - # The server sends 10 stanzas - self._send(self.dispatcher.ProcessNonBlocking, 5, message) - self._send(self.dispatcher.ProcessNonBlocking, 4, iq) - self._send(self.dispatcher.ProcessNonBlocking, 1, presence) - - # The client has recieved 10 stanzas and sent none - self.assertEqual(self.con.sm.in_h, 10) - self.assertEqual(self.con.sm.out_h, 0) - - m = protocol.Message() - - # The client sends 10 stanzas - for i in range(10): - m = protocol.Message(body=str(i)) - self.dispatcher.send(m) - - # Client sends 10 stanzas and put them in the queue - self.assertEqual(self.con.sm.out_h, 10) - self.assertEqual(len(uqueue), 10) - - # The server acknowledges that it recieved 5 stanzas - self.dispatcher.ProcessNonBlocking("<a xmlns='urn:xmpp:sm:3' h='5'/>") - # 5 stanzas are removed from the queue, only 5 stanzas are left - - self.assertEqual(len(uqueue), 5) - - # Check for the right order of stanzas in the queue - l = ['5', '6', '7', '8', '9'] - for i in uqueue: - self.assertEqual(i.getBody(), l[0]) - l.pop(0) - - def test_resumption(self): - self._simulate_connect() - - m = protocol.Message() - - # The client sends 5 stanzas - for i in range(5): - m = protocol.Message(body=str(i)) - self.dispatcher.send(m) - - self._simulate_resume() - # No stanzas left - self.assertEqual(len(self.con.sm.uqueue), 0) - -if __name__ == '__main__': - unittest.main() diff --git a/test/lib/util.py b/test/lib/util.py index 834d1b2..9caeddf 100644 --- a/test/lib/util.py +++ b/test/lib/util.py @@ -3,24 +3,19 @@ from unittest.mock import Mock from test.lib.const import STREAM_START -from nbxmpp import dispatcher +from nbxmpp.dispatcher import StanzaDispatcher from nbxmpp.protocol import NS_CLIENT from nbxmpp.protocol import JID class StanzaHandlerTest(unittest.TestCase): def setUp(self): - self.dispatcher = dispatcher.XMPPDispatcher() - # Setup mock client self.client = Mock() - self.client.get_bound_jid.return_value = JID('test@test.test') - self.client.defaultNamespace = NS_CLIENT - self.client.Connection = Mock() # mock transport - self.con = self.client.Connection + self.client.is_websocket = False + self.dispatcher = StanzaDispatcher(self.client) - self.dispatcher.PlugIn(self.client) + self.client.get_bound_jid.return_value = JID('test@test.test') - # Simulate that we have established a connection - self.dispatcher.StreamInit() - self.dispatcher.ProcessNonBlocking(STREAM_START) + self.dispatcher.reset_parser() + self.dispatcher.process_data(STREAM_START) diff --git a/test/unit/test_activity.py b/test/unit/test_activity.py index 403cf7e..3cf0af3 100644 --- a/test/unit/test_activity.py +++ b/test/unit/test_activity.py @@ -45,9 +45,9 @@ class ActivityTest(StanzaHandlerTest): </message> ''' - self.dispatcher.RegisterHandler( + self.dispatcher.register_handler( *StanzaHandler(name='message', callback=_on_message, ns=NS_PUBSUB_EVENT)) - self.dispatcher.ProcessNonBlocking(event) + self.dispatcher.process_data(event) diff --git a/test/unit/test_avatar.py b/test/unit/test_avatar.py index db201c6..70de7f3 100644 --- a/test/unit/test_avatar.py +++ b/test/unit/test_avatar.py @@ -50,9 +50,9 @@ class AvatarTest(StanzaHandlerTest): </message> ''' - self.dispatcher.RegisterHandler( + self.dispatcher.register_handler( *StanzaHandler(name='message', callback=_on_message, ns=NS_PUBSUB_EVENT)) - self.dispatcher.ProcessNonBlocking(event) + self.dispatcher.process_data(event) diff --git a/test/unit/test_bookmarks.py b/test/unit/test_bookmarks.py index f8ded93..d581dc1 100644 --- a/test/unit/test_bookmarks.py +++ b/test/unit/test_bookmarks.py @@ -61,12 +61,12 @@ class BookmarkTest(StanzaHandlerTest): </message> ''' - self.dispatcher.RegisterHandler( + self.dispatcher.register_handler( *StanzaHandler(name='message', callback=_on_message, ns=NS_PUBSUB_EVENT)) - self.dispatcher.ProcessNonBlocking(event) + self.dispatcher.process_data(event) def test_bookmark_2_parsing(self): def _on_message(_con, _stanza, properties): @@ -106,9 +106,9 @@ class BookmarkTest(StanzaHandlerTest): </message> ''' - self.dispatcher.RegisterHandler( + self.dispatcher.register_handler( *StanzaHandler(name='message', callback=_on_message, ns=NS_PUBSUB_EVENT)) - self.dispatcher.ProcessNonBlocking(event) + self.dispatcher.process_data(event) diff --git a/test/unit/test_datetime_parsing.py b/test/unit/test_datetime_parsing.py index 028a3c1..8398d4b 100644 --- a/test/unit/test_datetime_parsing.py +++ b/test/unit/test_datetime_parsing.py @@ -97,3 +97,7 @@ class TestDateTime(unittest.TestCase): result = parse_datetime( time_string, check_utc=True, epoch=True) self.assertEqual(result, expected_value) + + +if __name__ == '__main__': + unittest.main() diff --git a/test/unit/test_delay_parsing.py b/test/unit/test_delay_parsing.py index 4530100..9277cf1 100644 --- a/test/unit/test_delay_parsing.py +++ b/test/unit/test_delay_parsing.py @@ -27,3 +27,7 @@ class TestHelpers(unittest.TestCase): timestamp = parse_delay(message, not_from=['romeo.com']) self.assertEqual(timestamp, 1031699305.0) + + +if __name__ == '__main__': + unittest.main() diff --git a/test/unit/test_location.py b/test/unit/test_location.py index 3a6a3fb..fe1cd5d 100644 --- a/test/unit/test_location.py +++ b/test/unit/test_location.py @@ -86,9 +86,9 @@ class LocationTest(StanzaHandlerTest): </message> ''' - self.dispatcher.RegisterHandler( + self.dispatcher.register_handler( *StanzaHandler(name='message', callback=_on_message, ns=NS_PUBSUB_EVENT)) - self.dispatcher.ProcessNonBlocking(event) + self.dispatcher.process_data(event) diff --git a/test/unit/test_mood.py b/test/unit/test_mood.py index 0603f45..af184a8 100644 --- a/test/unit/test_mood.py +++ b/test/unit/test_mood.py @@ -41,9 +41,9 @@ class MoodTest(StanzaHandlerTest): </message> ''' - self.dispatcher.RegisterHandler( + self.dispatcher.register_handler( *StanzaHandler(name='message', callback=_on_message, ns=NS_PUBSUB_EVENT)) - self.dispatcher.ProcessNonBlocking(event) + self.dispatcher.process_data(event) diff --git a/test/unit/test_pubsub.py b/test/unit/test_pubsub.py index baca203..281383d 100644 --- a/test/unit/test_pubsub.py +++ b/test/unit/test_pubsub.py @@ -29,13 +29,13 @@ class PubsubTest(StanzaHandlerTest): </message> ''' - self.dispatcher.RegisterHandler( + self.dispatcher.register_handler( *StanzaHandler(name='message', callback=_on_message, ns=NS_PUBSUB_EVENT, priority=16)) - self.dispatcher.ProcessNonBlocking(event) + self.dispatcher.process_data(event) def test_delete_event(self): def _on_message(_con, _stanza, properties): @@ -59,13 +59,13 @@ class PubsubTest(StanzaHandlerTest): </message> ''' - self.dispatcher.RegisterHandler( + self.dispatcher.register_handler( *StanzaHandler(name='message', callback=_on_message, ns=NS_PUBSUB_EVENT, priority=16)) - self.dispatcher.ProcessNonBlocking(event) + self.dispatcher.process_data(event) def test_retracted_event(self): @@ -90,10 +90,10 @@ class PubsubTest(StanzaHandlerTest): </message> ''' - self.dispatcher.RegisterHandler( + self.dispatcher.register_handler( *StanzaHandler(name='message', callback=_on_message, ns=NS_PUBSUB_EVENT, priority=16)) - self.dispatcher.ProcessNonBlocking(event) + self.dispatcher.process_data(event) diff --git a/test/unit/test_sasl_scram.py b/test/unit/test_sasl_scram.py index 78124eb..d41e48e 100644 --- a/test/unit/test_sasl_scram.py +++ b/test/unit/test_sasl_scram.py @@ -2,27 +2,33 @@ import unittest from unittest.mock import Mock from nbxmpp.auth import SCRAM_SHA_1 +from nbxmpp.util import b64encode +# Test vector from https://wiki.xmpp.org/web/SASL_and_SCRAM-SHA-1 class SCRAM(unittest.TestCase): def setUp(self): self.con = Mock() self._method = SCRAM_SHA_1(self.con, None) - self._method._client_nonce = '4691d8f313ddb02d2eed511d5617a5c6f72efa671613c598' + self._method._client_nonce = 'fyko+d2lbbFgONRv9qkxdawL' + self.maxDiff = None + self._username = 'user' + self._password = 'pencil' - self._username = 'philw' - self._password = 'testtest123' - - self.auth = '<auth xmlns="urn:ietf:params:xml:ns:xmpp-sasl" mechanism="SCRAM-SHA-1">eSwsbj1waGlsdyxyPTQ2OTFkOGYzMTNkZGIwMmQyZWVkNTExZDU2MTdhNWM2ZjcyZWZhNjcxNjEzYzU5OA==</auth>' - self.challenge = 'cj00NjkxZDhmMzEzZGRiMDJkMmVlZDUxMWQ1NjE3YTVjNmY3MmVmYTY3MTYxM2M1OThDaEJpZGEyb0NJeks5S25QdGsxSUZnPT0scz1iZFkrbkRjdUhuVGFtNzgyaG9neHNnPT0saT00MDk2' - self.response = '<response xmlns="urn:ietf:params:xml:ns:xmpp-sasl">Yz1lU3dzLHI9NDY5MWQ4ZjMxM2RkYjAyZDJlZWQ1MTFkNTYxN2E1YzZmNzJlZmE2NzE2MTNjNTk4Q2hCaWRhMm9DSXpLOUtuUHRrMUlGZz09LHA9NUd5a09hWCtSWlllR3E2L2U3YTE2UDVBeFVrPQ==</response>' - self.success = 'dj1qMGtuNlVvT1FjTmJ0MGFlYnEwV1QzYWNkSW89' + self.auth = '<auth xmlns="urn:ietf:params:xml:ns:xmpp-sasl" mechanism="SCRAM-SHA-1">%s</auth>' % b64encode('n,,n=user,r=fyko+d2lbbFgONRv9qkxdawL') + self.challenge = b64encode('r=fyko+d2lbbFgONRv9qkxdawL3rfcNHYJY1ZVvWVs7j,s=QSXCR+Q6sek8bf92,i=4096') + self.response = '<response xmlns="urn:ietf:params:xml:ns:xmpp-sasl">%s</response>' % b64encode('c=biws,r=fyko+d2lbbFgONRv9qkxdawL3rfcNHYJY1ZVvWVs7j,p=v0X8v3Bz2T0CJGbJQyF0X+HI4Ts=') + self.success = b64encode('v=rmF9pqV8S7suAoZWja4dJRkFsKQ=') def test_auth(self): self._method.initiate(self._username, self._password) - self.assertEqual(self.auth, str(self.con.send.call_args[0][0])) + self.assertEqual(self.auth, str(self.con.send_nonza.call_args[0][0])) self._method.response(self.challenge) - self.assertEqual(self.response, str(self.con.send.call_args[0][0])) + self.assertEqual(self.response, str(self.con.send_nonza.call_args[0][0])) self._method.success(self.success) + + +if __name__ == '__main__': + unittest.main() diff --git a/test/unit/test_tune.py b/test/unit/test_tune.py index b6e03fd..fcb0f45 100644 --- a/test/unit/test_tune.py +++ b/test/unit/test_tune.py @@ -52,9 +52,9 @@ class TuneTest(StanzaHandlerTest): </message> ''' - self.dispatcher.RegisterHandler( + self.dispatcher.register_handler( *StanzaHandler(name='message', callback=_on_message, ns=NS_PUBSUB_EVENT)) - self.dispatcher.ProcessNonBlocking(event) + self.dispatcher.process_data(event) diff --git a/test/unit/test_xml_vulnerability.py b/test/unit/test_xml_vulnerability.py index dcb0a7d..0db099e 100644 --- a/test/unit/test_xml_vulnerability.py +++ b/test/unit/test_xml_vulnerability.py @@ -7,15 +7,12 @@ from nbxmpp import dispatcher class XMLVulnerability(unittest.TestCase): def setUp(self): - self.client = Mock() - self.client.Connection = Mock() - self.dispatcher = dispatcher.XMPPDispatcher() - self.dispatcher.PlugIn(self.client) - self.dispatcher.StreamInit() - - def tearDown(self): - self.dispatcher = None - self.client = None + self.stream = Mock() + self.stream.is_websocket = False + self.dispatcher = dispatcher.StanzaDispatcher(self.stream) + self._error_handler = Mock() + self.dispatcher.subscribe('parsing-error', self._error_handler) + self.dispatcher.reset_parser() def test_exponential_entity_expansion(self): bomb = """<?xml version="1.0" encoding="utf-8"?> @@ -26,8 +23,8 @@ class XMLVulnerability(unittest.TestCase): ]> <bomb>&c;</bomb>""" - self.dispatcher.ProcessNonBlocking(bomb) - self.client.Connection.disconnect.assert_called() + self.dispatcher.process_data(bomb) + self._error_handler.assert_called() def test_quadratic_blowup(self): bomb = """<?xml version="1.0" encoding="utf-8"?> @@ -36,8 +33,8 @@ class XMLVulnerability(unittest.TestCase): ]> <bomb>&a;&a;&a;... repeat</bomb>""" - self.dispatcher.ProcessNonBlocking(bomb) - self.client.Connection.disconnect.assert_called() + self.dispatcher.process_data(bomb) + self._error_handler.assert_called() def test_external_entity_expansion(self): bomb = """<?xml version="1.0" encoding="utf-8"?> @@ -46,8 +43,8 @@ class XMLVulnerability(unittest.TestCase): ]> <root>ⅇ</root>""" - self.dispatcher.ProcessNonBlocking(bomb) - self.client.Connection.disconnect.assert_called() + self.dispatcher.process_data(bomb) + self._error_handler.assert_called() def test_external_local_entity_expansion(self): bomb = """<?xml version="1.0" encoding="utf-8"?> @@ -57,8 +54,8 @@ class XMLVulnerability(unittest.TestCase): ]> <root>ⅇ</root>""" - self.dispatcher.ProcessNonBlocking(bomb) - self.client.Connection.disconnect.assert_called() + self.dispatcher.process_data(bomb) + self._error_handler.assert_called() def test_dtd_retrival(self): bomb = """<?xml version="1.0" encoding="utf-8"?> @@ -69,5 +66,9 @@ class XMLVulnerability(unittest.TestCase): <body>text</body> </html>""" - self.dispatcher.ProcessNonBlocking(bomb) - self.client.Connection.disconnect.assert_called() + self.dispatcher.process_data(bomb) + self._error_handler.assert_called() + + +if __name__ == '__main__': + unittest.main() diff --git a/test/unit/test_xmpp_client.py b/test/unit/test_xmpp_client.py deleted file mode 100644 index f2f0c5f..0000000 --- a/test/unit/test_xmpp_client.py +++ /dev/null @@ -1,162 +0,0 @@ -''' -Testing script for NonBlockingClient class (nbxmpp/client.py) - -It actually connects to a xmpp server. -''' - -import unittest -from unittest.mock import Mock - -from test.lib.xmpp_mocks import MockConnection, IdleQueueThread - -from nbxmpp import client - -# (XMPP server hostname, c2s port). Script will connect to the machine. -xmpp_server_port = ('gajim.org', 5222) - -# [username, password, resource]. Script will authenticate to server above -credentials = ['unittest', 'testtest', 'res'] - -@unittest.skip("gajim.org only supports TLS/SSL connections") -class TestNonBlockingClient(unittest.TestCase): - ''' - Test Cases class for NonBlockingClient. - ''' - def setUp(self): - ''' IdleQueue thread is run and dummy connection is created. ''' - self.idlequeue_thread = IdleQueueThread() - self.connection = MockConnection() # for dummy callbacks - self.idlequeue_thread.start() - - def tearDown(self): - ''' IdleQueue thread is stopped. ''' - self.idlequeue_thread.stop_thread() - self.idlequeue_thread.join() - del self.connection - - self.client = None - - def open_stream(self, server_port, wrong_pass=False): - ''' - Method opening the XMPP connection. It returns when <stream:features> - is received from server. - - :param server_port: tuple of (hostname, port) for where the client should - connect. - ''' - - class TempConnection(): - def get_password(self, cb, mechanism): - if wrong_pass: - cb('wrong pass') - else: - cb(credentials[1]) - def on_connect_failure(self): - pass - - self.client = client.NonBlockingClient( - domain=server_port[0], - idlequeue=self.idlequeue_thread.iq, - caller=Mock(spec=TempConnection)) - - self.client.connect( - hostname=server_port[0], - port=server_port[1], - on_connect=lambda *args: self.connection.on_connect(True, *args), - on_connect_failure=lambda *args: self.connection.on_connect( - False, *args)) - - self.assertTrue(self.connection.wait(), - msg='waiting for callback from client constructor') - - # if on_connect was called, client has to be connected and vice versa - if self.connection.connect_succeeded: - self.assertTrue(self.client.get_connect_type()) - else: - self.assertTrue(not self.client.get_connect_type()) - - def client_auth(self, username, password, resource, sasl): - ''' - Method authenticating connected client with supplied credentials. Returns - when authentication is over. - - :param sasl: whether to use sasl (sasl=1) or old (sasl=0) authentication - :todo: to check and be more specific about when it returns - (bind, session..) - ''' - self.client.auth(username, password, resource, sasl, - on_auth=self.connection.on_auth) - - self.assertTrue(self.connection.wait(), msg='waiting for authentication') - - def do_disconnect(self): - ''' - Does disconnecting of connected client. Returns when TCP connection is - closed. - ''' - self.client.RegisterDisconnectHandler(self.connection.set_event) - self.client.disconnect() - - self.assertTrue(self.connection.wait(), msg='waiting for disconnecting') - - def test_proper_connect_sasl(self): - ''' - The ideal testcase - client is connected, authenticated with SASL and - then disconnected. - ''' - self.open_stream(xmpp_server_port) - - # if client is not connected, lets raise the AssertionError - self.assertTrue(self.client.get_connect_type()) - # client.disconnect() is already called from NBClient via - # _on_connected_failure, no need to call it here - - self.client_auth(credentials[0], credentials[1], credentials[2], sasl=1) - self.assertTrue(self.connection.con) - self.assertTrue(self.connection.auth=='sasl', msg='Unable to auth via SASL') - - self.do_disconnect() - - def test_proper_connect_oldauth(self): - ''' - The ideal testcase - client is connected, authenticated with old auth and - then disconnected. - ''' - self.open_stream(xmpp_server_port) - self.assertTrue(self.client.get_connect_type()) - self.client_auth(credentials[0], credentials[1], credentials[2], sasl=0) - self.assertTrue(self.connection.con) - features = self.client.Dispatcher.Stream.features - if not features.getTag('auth'): - print("Server doesn't support old authentication type, ignoring test") - else: - self.assertTrue(self.connection.auth=='old_auth', - msg='Unable to auth via old_auth') - self.do_disconnect() - - def test_connect_to_nonexisting_host(self): - ''' - Connect to nonexisting host. DNS request for A records should return - nothing. - ''' - self.open_stream(('fdsfsdf.fdsf.fss', 5222)) - self.assertTrue(not self.client.get_connect_type()) - - def test_connect_to_wrong_port(self): - ''' - Connect to nonexisting server. DNS request for A records should return an - IP but there shouldn't be XMPP server running on specified port. - ''' - self.open_stream((xmpp_server_port[0], 31337)) - self.assertTrue(not self.client.get_connect_type()) - - def test_connect_with_wrong_creds(self): - ''' - Connecting with invalid password. - ''' - self.open_stream(xmpp_server_port, wrong_pass=True) - self.assertTrue(self.client.get_connect_type()) - self.client_auth(credentials[0], 'wrong pass', credentials[2], sasl=1) - self.assertTrue(self.connection.auth is None) - self.do_disconnect() - diff --git a/test/unit/test_xmpp_dispatcher.py b/test/unit/test_xmpp_dispatcher.py deleted file mode 100644 index bf66db9..0000000 --- a/test/unit/test_xmpp_dispatcher.py +++ /dev/null @@ -1,98 +0,0 @@ -''' -Tests for dispatcher.py -''' -import unittest -from unittest.mock import Mock - -from test import lib - -from nbxmpp import dispatcher -from nbxmpp import protocol -from nbxmpp.protocol import JID - - -class TestDispatcherNB(unittest.TestCase): - ''' - Test class for NonBlocking dispatcher. Tested dispatcher will be plugged - into a mock client - ''' - def setUp(self): - self.dispatcher = dispatcher.XMPPDispatcher() - - # Setup mock client - self.client = Mock() - self.client.get_bound_jid.return_value = JID('test@test.test') - self.client.defaultNamespace = protocol.NS_CLIENT - self.client.Connection = Mock() # mock transport - self.con = self.client.Connection - - def tearDown(self): - # Unplug if needed - if hasattr(self.dispatcher, '_owner'): - self.dispatcher.PlugOut() - - def _simulate_connect(self): - self.dispatcher.PlugIn(self.client) # client is owner - # Simulate that we have established a connection - self.dispatcher.StreamInit() - self.dispatcher.ProcessNonBlocking("<stream:stream xmlns:stream='http://etherx.jabber.org/streams' xmlns='jabber:client'>") - - def test_unbound_namespace_prefix(self): - '''tests our handling of a message with an unbound namespace prefix''' - self._simulate_connect() - - msgs = [] - def _got_message(conn, msg): - msgs.append(msg) - self.dispatcher.RegisterHandler('message', _got_message) - - # should be able to parse a normal message - self.dispatcher.ProcessNonBlocking('<message from="test@test.at"><body>hello</body></message>') - self.assertEqual(1, len(msgs)) - - self.dispatcher.ProcessNonBlocking('<message from="test@test.at"><x:y/></message>') - self.assertEqual(2, len(msgs)) - # we should not have been disconnected after that message - self.con.pollend.assert_not_called() - self.con.disconnect.assert_not_called() - - # we should be able to keep parsing - self.dispatcher.ProcessNonBlocking('<message from="test@test.at"><body>still here?</body></message>') - self.assertEqual(3, len(msgs)) - - def test_process_non_blocking(self): - ''' Check for ProcessNonBlocking return types ''' - self._simulate_connect() - process = self.dispatcher.ProcessNonBlocking - - # length of data expected - data = "Please don't fail" - result = process(data) - self.assertEqual(result, len(data)) - - # no data processed, link shall still be active - result = process('') - self.assertEqual(result, '0') - - self.con.pollend.assert_not_called() - self.con.disconnect.assert_not_called() - - # simulate disconnect - result = process('</stream:stream>') - self.client.disconnect.assert_called_once() - - def test_return_stanza_handler(self): - ''' Test sasl_error_conditions transformation in protocol.py ''' - # quick'n dirty...I wasn't aware of it existance and thought it would - # always fail :-) - self._simulate_connect() - stanza = "<iq type='get' />" - def send(data): - print(lib.xml2str_sorted(data)) - self.assertEqual(lib.xml2str_sorted(data), '<iq xmlns="jabber:client" from="test@test.test" to="test@test.test" type="error"><error code="501" type="cancel"><feature-not-implemented xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" /><text xmlns="urn:ietf:params:xml:ns:xmpp-stanzas">The feature requested is not implemented by the recipient or server and therefore cannot be processed.</text></error></iq>') - self.client.send = send - self.dispatcher.ProcessNonBlocking(stanza) - - -if __name__ == '__main__': - unittest.main() diff --git a/test/unit/test_xmpp_transports.py b/test/unit/test_xmpp_transports.py deleted file mode 100644 index e79854b..0000000 --- a/test/unit/test_xmpp_transports.py +++ /dev/null @@ -1,75 +0,0 @@ -''' -Unit test for tranports classes. -''' - -import unittest - -from nbxmpp import transports - - -class TestModuleLevelFunctions(unittest.TestCase): - ''' - Test class for functions defined at module level - ''' - def test_urisplit(self): - def check_uri(uri, proto, host, port, path): - _proto, _host, _port, _path = transports.urisplit(uri) - self.assertEqual(proto, _proto) - self.assertEqual(host, _host) - self.assertEqual(path, _path) - self.assertEqual(port, _port) - - check_uri('http://httpcm.jabber.org:5280/webclient', proto='http', - host='httpcm.jabber.org', port=5280, path='/webclient') - - check_uri('http://httpcm.jabber.org/webclient', proto='http', - host='httpcm.jabber.org', port=80, path='/webclient') - - check_uri('https://httpcm.jabber.org/webclient', proto='https', - host='httpcm.jabber.org', port=443, path='/webclient') - - def test_get_proxy_data_from_dict(self): - def check_dict(proxy_dict, host, port, user, passwd): - _host, _port, _user, _passwd = transports.get_proxy_data_from_dict( - proxy_dict) - self.assertEqual(_host, host) - self.assertEqual(_port, port) - self.assertEqual(_user, user) - self.assertEqual(_passwd, passwd) - - bosh_dict = {'bosh_content': u'text/xml; charset=utf-8', - 'bosh_hold': 2, - 'bosh_http_pipelining': False, - 'bosh_uri': u'http://gajim.org:5280/http-bind', - 'bosh_useproxy': False, - 'bosh_wait': 30, - 'bosh_wait_for_restart_response': False, - 'host': u'172.16.99.11', - 'pass': u'pass', - 'port': 3128, - 'type': u'bosh', - 'useauth': True, - 'user': u'user'} - check_dict(bosh_dict, host=u'gajim.org', port=5280, user=u'user', - passwd=u'pass') - - proxy_dict = {'bosh_content': u'text/xml; charset=utf-8', - 'bosh_hold': 2, - 'bosh_http_pipelining': False, - 'bosh_port': 5280, - 'bosh_uri': u'', - 'bosh_useproxy': True, - 'bosh_wait': 30, - 'bosh_wait_for_restart_response': False, - 'host': u'172.16.99.11', - 'pass': u'pass', - 'port': 3128, - 'type': 'socks5', - 'useauth': True, - 'user': u'user'} - check_dict(proxy_dict, host=u'172.16.99.11', port=3128, user=u'user', - passwd=u'pass') - - -if __name__ == '__main__': - unittest.main() diff --git a/test/unit/test_xmpp_transports2.py b/test/unit/test_xmpp_transports2.py deleted file mode 100644 index 0bdd721..0000000 --- a/test/unit/test_xmpp_transports2.py +++ /dev/null @@ -1,279 +0,0 @@ -''' -Integration test for tranports classes. See unit for the ordinary -unit tests of this module. -''' - -import unittest -import socket - -from test.lib.xmpp_mocks import IdleQueueThread, IdleMock -from nbxmpp import transports - - -class AbstractTransportTest(unittest.TestCase): - ''' Encapsulates Idlequeue instantiation for transports and more...''' - - def setUp(self): - ''' IdleQueue thread is run and dummy connection is created. ''' - self.idlequeue_thread = IdleQueueThread() - self.idlequeue_thread.start() - self._setup_hook() - - def tearDown(self): - ''' IdleQueue thread is stopped. ''' - self._teardown_hook() - self.idlequeue_thread.stop_thread() - self.idlequeue_thread.join() - - def _setup_hook(self): - pass - - def _teardown_hook(self): - pass - - def expect_receive(self, expected, count=1, msg=None): - ''' - Returns a callback function that will assert whether the data passed to - it equals the one specified when calling this function. - - Can be used to make sure transport dispatch correct data. - ''' - def receive(data, *args, **kwargs): - self.assertEqual(data, expected, msg=msg) - self._expected_count -= 1 - self._expected_count = count - return receive - - def have_received_expected(self): - ''' - Plays together with expect_receive(). Will return true if expected_rcv - callback was called as often as specified - ''' - return self._expected_count == 0 - - -class TestNonBlockingTCP(AbstractTransportTest): - ''' - Test class for NonBlockingTCP. Will actually try to connect to an existing - XMPP server. - ''' - class MockClient(IdleMock): - ''' Simple client to test transport functionality ''' - def __init__(self, idlequeue, testcase): - self.idlequeue = idlequeue - self.testcase = testcase - IdleMock.__init__(self) - - def do_connect(self, establish_tls=False, proxy_dict=None): - try: - ips = socket.getaddrinfo('gajim.org', 5222, - socket.AF_UNSPEC, socket.SOCK_STREAM) - ip = ips[0] - except socket.error as e: - self.testcase.fail(msg=str(e)) - - self.socket = transports.NonBlockingTCP( - raise_event=lambda event_type, data: self.testcase.assertTrue( - event_type and data), - on_disconnect=lambda: self.on_success(mode='SocketDisconnect'), - idlequeue=self.idlequeue, - estabilish_tls=establish_tls, - certs=('../data/other/cacerts.pem', 'tmp/cacerts.pem'), - tls_version=None, - cipher_list=None, - alpn=True, - proxy_dict=proxy_dict) - - self.socket.PlugIn(self) - - self.socket.connect(conn_5tuple=ip, - on_connect=lambda: self.on_success(mode='TCPconnect'), - on_connect_failure=self.on_failure) - self.testcase.assertTrue(self.wait(), msg='Connection timed out') - - def do_disconnect(self): - self.socket.disconnect() - self.testcase.assertTrue(self.wait(), msg='Disconnect timed out') - - def on_failure(self, err_message): - self.set_event() - self.testcase.fail(msg=err_message) - - def on_success(self, mode, data=None): - if mode == "TCPconnect": - pass - if mode == "SocketDisconnect": - pass - self.set_event() - - def _setup_hook(self): - self.client = self.MockClient(idlequeue=self.idlequeue_thread.iq, - testcase=self) - - def _teardown_hook(self): - if self.client.socket.state == 'CONNECTED': - self.client.do_disconnect() - - def test_connect_disconnect_plain(self): - ''' Establish plain connection ''' - self.client.do_connect(establish_tls=False) - self.assertEqual(self.client.socket.state, 'CONNECTED') - self.client.do_disconnect() - self.assertEqual(self.client.socket.state, 'DISCONNECTED') - -# def test_connect_disconnect_ssl(self): -# ''' Establish SSL (not TLS) connection ''' -# self.client.do_connect(establish_tls=True) -# self.assertEquals(self.client.socket.state, 'CONNECTED') -# self.client.do_disconnect() -# self.assertEquals(self.client.socket.state, 'DISCONNECTED') - - def test_do_receive(self): - ''' Test _do_receive method by overwriting socket.recv ''' - self.client.do_connect() - sock = self.client.socket - - # transport shall receive data - data = "Please don't fail" - sock._recv = lambda buffer: data - sock.onreceive(self.expect_receive(data)) - sock._do_receive() - self.assertTrue(self.have_received_expected(), msg='Did not receive data') - self.assertEqual(self.client.socket.state, 'CONNECTED') - - # transport shall do nothing as an non-fatal SSL is simulated - sock._recv = lambda buffer: None - sock.onreceive(self.assertFalse) # we did not receive anything... - sock._do_receive() - self.assertEqual(self.client.socket.state, 'CONNECTED') - - # transport shall disconnect as remote side closed the connection - sock._recv = lambda buffer: '' - sock.onreceive(self.assertFalse) # we did not receive anything... - sock._do_receive() - self.assertEqual(self.client.socket.state, 'DISCONNECTED') - - def test_do_send(self): - ''' Test _do_send method by overwriting socket.send ''' - self.client.do_connect() - sock = self.client.socket - - outgoing = [] # what we have actually send to our socket.socket - data_part1 = "Please don't " - data_part2 = "fail!" - data_complete = data_part1 + data_part2 - - # Simulate everything could be send in one go - def _send_all(data): - outgoing.append(data) - return len(data) - sock._send = _send_all - sock.send(data_part1) - sock.send(data_part2) - sock._do_send() - sock._do_send() - self.assertTrue(self.client.socket.state == 'CONNECTED') - self.assertTrue(data_part1.encode('utf8') in outgoing) - self.assertTrue(data_part2.encode('utf8') in outgoing) - self.assertFalse(sock.sendqueue and sock.sendbuff, - msg='There is still unsend data in buffers') - - # Simulate data could only be sent in chunks - self.chunk_count = 0 - outgoing = [] - def _send_chunks(data): - if self.chunk_count == 0: - outgoing.append(data_part1) - self.chunk_count += 1 - return len(data_part1) - else: - outgoing.append(data_part2) - return len(data_part2) - sock._send = _send_chunks - sock.send(data_complete) - sock._do_send() # process first chunk - sock._do_send() # process the second one - self.assertTrue(self.client.socket.state == 'CONNECTED') - self.assertTrue(data_part1 in outgoing and data_part2 in outgoing) - self.assertFalse(sock.sendqueue and sock.sendbuff, - msg='There is still unsend data in buffers') - - -class TestNonBlockingHTTP(AbstractTransportTest): - ''' Test class for NonBlockingHTTP transport''' - - bosh_http_dict = { - 'http_uri': 'http://gajim.org:5280/http-bind', - 'http_version': 'HTTP/1.1', - 'http_persistent': True, - 'add_proxy_headers': False - } - - def _get_transport(self, http_dict, proxy_dict=None): - return transports.NonBlockingHTTP( - raise_event=None, - on_disconnect=None, - idlequeue=self.idlequeue_thread.iq, - estabilish_tls=False, - certs=None, - tls_version=None, - cipher_list=None, - on_http_request_possible=lambda: None, - on_persistent_fallback=None, - http_dict=http_dict, - proxy_dict=proxy_dict, - ) - - def test_parse_own_http_message(self): - ''' Build a HTTP message and try to parse it afterwards ''' - transport = self._get_transport(self.bosh_http_dict) - - data = "<test>Please don't fail!</test>" - http_message = transport.build_http_message(data.encode('utf-8')) - statusline, headers, http_body, buffer_rest = \ - transport.parse_http_message(http_message.decode('utf-8')) - - self.assertFalse(bool(buffer_rest)) - self.assertTrue(statusline and isinstance(statusline, list)) - self.assertTrue(headers and isinstance(headers, dict)) - self.assertEqual(data, http_body, msg='Input and output are different') - - def test_receive_http_message(self): - ''' Let _on_receive handle some http messages ''' - transport = self._get_transport(self.bosh_http_dict) - - header = ("HTTP/1.1 200 OK\r\nContent-Type: text/xml; charset=utf-8\r\n" + - "Content-Length: 88\r\n\r\n") - payload = "<test>Please don't fail!</test>" - body = "<body xmlns='http://jabber.org/protocol/httpbind'>%s</body>" \ - % payload - message = "%s%s" % (header, body) - - # try to receive in one go - transport.onreceive(self.expect_receive(body, msg='Failed: In one go')) - transport._on_receive(message) - self.assertTrue(self.have_received_expected(), msg='Failed: In one go') - - def test_receive_http_message_in_chunks(self): - ''' Let _on_receive handle some chunked http messages ''' - transport = self._get_transport(self.bosh_http_dict) - - payload = "<test>Please don't fail!\n\n</test>" - body = "<body xmlns='http://jabber.org/protocol/httpbind'>%s</body>" \ - % payload - header = "HTTP/1.1 200 OK\r\nContent-Type: text/xml; charset=utf-8\r\n" +\ - "Content-Length: %i\r\n\r\n" % len(body) - message = "%s%s" % (header, body) - - chunk1, chunk2, chunk3, chunk4 = message[:20], message[20:73], \ - message[73:85], message[85:] - nextmessage_chunk = "HTTP/1.1 200 OK\r\nContent-Type: text/x" - chunks = (chunk1, chunk2, chunk3, chunk4, nextmessage_chunk) - - transport.onreceive(self.expect_receive(body, msg='Failed: In chunks')) - for chunk in chunks: - transport._on_receive(chunk) - self.assertTrue(self.have_received_expected(), msg='Failed: In chunks') - -if __name__ == '__main__': - unittest.main() |