Welcome to mirror list, hosted at ThFree Co, Russian Federation.

dev.gajim.org/gajim/python-nbxmpp.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorYann Leboulanger <asterix@lagaule.org>2013-04-08 00:07:13 +0400
committerYann Leboulanger <asterix@lagaule.org>2013-04-08 00:07:13 +0400
commitd2b60a7d9f866d60035e45ca0ea43755f91bab8c (patch)
treec019d061d828af624047b91609842e89763e0304 /test
parentdbe3e6e69a9c230a685d143b24501fd020825311 (diff)
add test suite
Diffstat (limited to 'test')
-rw-r--r--test/lib/__init__.py13
-rw-r--r--test/lib/mock.py463
-rw-r--r--test/lib/xmpp_mocks.py95
-rwxr-xr-xtest/runtests.py51
-rw-r--r--test/unit/__init__.py5
-rw-r--r--test/unit/test_xmpp_client_nb.py163
-rw-r--r--test/unit/test_xmpp_dispatcher_nb.py97
-rw-r--r--test/unit/test_xmpp_smacks.py133
-rw-r--r--test/unit/test_xmpp_transports_nb.py78
-rw-r--r--test/unit/test_xmpp_transports_nb2.py276
10 files changed, 1374 insertions, 0 deletions
diff --git a/test/lib/__init__.py b/test/lib/__init__.py
new file mode 100644
index 0000000..af637cc
--- /dev/null
+++ b/test/lib/__init__.py
@@ -0,0 +1,13 @@
+import sys
+import os
+import getopt
+
+root = os.path.join(os.path.abspath(os.path.dirname(__file__)), '../..')
+
+# look for modules in the CWD, then gajim/test/lib, then gajim/src,
+# then everywhere else
+sys.path.insert(1, root)
+sys.path.insert(1, root + '/test/lib')
+
+def setup_env():
+ pass
diff --git a/test/lib/mock.py b/test/lib/mock.py
new file mode 100644
index 0000000..fdaf000
--- /dev/null
+++ b/test/lib/mock.py
@@ -0,0 +1,463 @@
+#
+# (c) Dave Kirby 2001 - 2005
+# mock@thedeveloperscoach.com
+#
+# Original call interceptor and call assertion code by Phil Dawes (pdawes@users.sourceforge.net)
+# Call interceptor code enhanced by Bruce Cropley (cropleyb@yahoo.com.au)
+#
+# This Python module and associated files are released under the FreeBSD
+# license. Essentially, you can do what you like with it except pretend you wrote
+# it yourself.
+#
+#
+# Copyright (c) 2005, Dave Kirby
+# Copyright (c) 2009, Yann Leboulanger
+#
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+#
+# * Neither the name of this library nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+#
+# mock@thedeveloperscoach.com
+
+
+"""
+Mock object library for Python. Mock objects can be used when unit testing
+to remove a dependency on another production class. They are typically used
+when the dependency would either pull in lots of other classes, or
+significantly slow down the execution of the test.
+They are also used to create exceptional conditions that cannot otherwise
+be easily triggered in the class under test.
+"""
+
+__version__ = "0.1.0"
+
+# Added in Python 2.1
+import inspect
+import re
+
+class MockInterfaceError(Exception):
+ pass
+
+class Mock(object):
+ """
+ The Mock class emulates any other class for testing purposes.
+ All method calls are stored for later examination.
+ """
+
+ def __init__(self, returnValues=None, realClass=None):
+ """
+ The Mock class constructor takes a dictionary of method names and
+ the values they return. Methods that are not in the returnValues
+ dictionary will return None.
+ You may also supply a class whose interface is being mocked.
+ All calls will be checked to see if they appear in the original
+ interface. Any calls to methods not appearing in the real class
+ will raise a MockInterfaceError. Any calls that would fail due to
+ non-matching parameter lists will also raise a MockInterfaceError.
+ Both of these help to prevent the Mock class getting out of sync
+ with the class it is Mocking.
+ """
+ self.mockCalledMethods = {}
+ self.mockAllCalledMethods = []
+ self.mockReturnValues = returnValues or {}
+ self.mockExpectations = {}
+ self.realClass = realClass
+ self.realClassMethods = None
+ if realClass:
+ self.realClassMethods = dict(inspect.getmembers(realClass, inspect.isroutine))
+ for retMethod in self.mockReturnValues.keys():
+ if retMethod not in self.realClassMethods:
+ raise MockInterfaceError("Return value supplied for method '%s' that was not in the original class" % retMethod)
+ self._setupSubclassMethodInterceptors()
+
+ def _setupSubclassMethodInterceptors(self):
+ methods = inspect.getmembers(self.realClass, inspect.isroutine)
+ baseMethods = dict(inspect.getmembers(Mock, inspect.ismethod))
+ for m in methods:
+ name = m[0]
+ # Don't record calls to methods of Mock base class.
+ if not name in baseMethods:
+ self.__dict__[name] = MockCallable(name, self, handcrafted=True)
+
+ def __getattr__(self, name):
+ return MockCallable(name, self)
+
+ def mockAddReturnValues(self, **methodReturnValues ):
+ self.mockReturnValues.update(methodReturnValues)
+
+ def mockSetExpectation(self, name, testFn, after=0, until=0):
+ self.mockExpectations.setdefault(name, []).append((testFn, after, until))
+
+ def _checkInterfaceCall(self, name, callParams, callKwParams):
+ """
+ Check that a call to a method of the given name to the original
+ class with the given parameters would not fail. If it would fail,
+ raise a MockInterfaceError.
+ Based on the Python 2.3.3 Reference Manual section 5.3.4: Calls.
+ """
+ if self.realClassMethods is None:
+ return
+ if name not in self.realClassMethods:
+ return
+
+ func = self.realClassMethods[name]
+ try:
+ args, varargs, varkw, defaults = inspect.getargspec(func)
+ except TypeError:
+ # func is not a Python function. It is probably a builtin,
+ # such as __repr__ or __coerce__. TODO: Checking?
+ # For now assume params are OK.
+ return
+
+ # callParams doesn't include self; args does include self.
+ numPosCallParams = 1 + len(callParams)
+
+ if numPosCallParams > len(args) and not varargs:
+ raise MockInterfaceError("Original %s() takes at most %s arguments (%s given)" %
+ (name, len(args), numPosCallParams))
+
+ # Get the number of positional arguments that appear in the call,
+ # also check for duplicate parameters and unknown parameters
+ numPosSeen = _getNumPosSeenAndCheck(numPosCallParams, callKwParams, args, varkw)
+
+ lenArgsNoDefaults = len(args) - len(defaults or [])
+ if numPosSeen < lenArgsNoDefaults:
+ raise MockInterfaceError("Original %s() takes at least %s arguments (%s given)" % (name, lenArgsNoDefaults, numPosSeen))
+
+ def mockGetAllCalls(self):
+ """
+ Return a list of MockCall objects,
+ representing all the methods in the order they were called.
+ """
+ return self.mockAllCalledMethods
+ getAllCalls = mockGetAllCalls # deprecated - kept for backward compatibility
+
+ def mockGetNamedCalls(self, methodName):
+ """
+ Return a list of MockCall objects,
+ representing all the calls to the named method in the order they were called.
+ """
+ return self.mockCalledMethods.get(methodName, [])
+ getNamedCalls = mockGetNamedCalls # deprecated - kept for backward compatibility
+
+ def mockCheckCall(self, index, name, *args, **kwargs):
+ '''test that the index-th call had the specified name and parameters'''
+ call = self.mockAllCalledMethods[index]
+ assert name == call.getName(), "%r != %r" % (name, call.getName())
+ call.checkArgs(*args, **kwargs)
+
+
+def _getNumPosSeenAndCheck(numPosCallParams, callKwParams, args, varkw):
+ """
+ Positional arguments can appear as call parameters either named as
+ a named (keyword) parameter, or just as a value to be matched by
+ position. Count the positional arguments that are given by either
+ keyword or position, and check for duplicate specifications.
+ Also check for arguments specified by keyword that do not appear
+ in the method's parameter list.
+ """
+ posSeen = {}
+ for arg in args[:numPosCallParams]:
+ posSeen[arg] = True
+ for kwp in callKwParams:
+ if kwp in posSeen:
+ raise MockInterfaceError("%s appears as both a positional and named parameter." % kwp)
+ if kwp in args:
+ posSeen[kwp] = True
+ elif not varkw:
+ raise MockInterfaceError("Original method does not have a parameter '%s'" % kwp)
+ return len(posSeen)
+
+class MockCall:
+ """
+ MockCall records the name and parameters of a call to an instance
+ of a Mock class. Instances of MockCall are created by the Mock class,
+ but can be inspected later as part of the test.
+ """
+ def __init__(self, name, params, kwparams ):
+ self.name = name
+ self.params = params
+ self.kwparams = kwparams
+
+ def checkArgs(self, *args, **kwargs):
+ assert args == self.params, "%r != %r" % (args, self.params)
+ assert kwargs == self.kwparams, "%r != %r" % (kwargs, self.kwparams)
+
+ def getParam( self, n ):
+ if isinstance(n, int):
+ return self.params[n]
+ elif isinstance(n, str):
+ return self.kwparams[n]
+ else:
+ raise IndexError, 'illegal index type for getParam'
+
+ def getNumParams(self):
+ return len(self.params)
+
+ def getNumKwParams(self):
+ return len(self.kwparams)
+
+ def getName(self):
+ return self.name
+
+ #pretty-print the method call
+ def __str__(self):
+ s = self.name + "("
+ sep = ''
+ for p in self.params:
+ s = s + sep + repr(p)
+ sep = ', '
+ items = sorted(self.kwparams.items())
+ for k, v in items:
+ s = s + sep + k + '=' + repr(v)
+ sep = ', '
+ s = s + ')'
+ return s
+ def __repr__(self):
+ return self.__str__()
+
+class MockCallable:
+ """
+ Intercepts the call and records it, then delegates to either the mock's
+ dictionary of mock return values that was passed in to the constructor,
+ or a handcrafted method of a Mock subclass.
+ """
+ def __init__(self, name, mock, handcrafted=False):
+ self.name = name
+ self.mock = mock
+ self.handcrafted = handcrafted
+
+ def __call__(self, *params, **kwparams):
+ self.mock._checkInterfaceCall(self.name, params, kwparams)
+ thisCall = self.recordCall(params, kwparams)
+ self.checkExpectations(thisCall, params, kwparams)
+ return self.makeCall(params, kwparams)
+
+ def recordCall(self, params, kwparams):
+ """
+ Record the MockCall in an ordered list of all calls, and an ordered
+ list of calls for that method name.
+ """
+ thisCall = MockCall(self.name, params, kwparams)
+ calls = self.mock.mockCalledMethods.setdefault(self.name, [])
+ calls.append(thisCall)
+ self.mock.mockAllCalledMethods.append(thisCall)
+ return thisCall
+
+ def makeCall(self, params, kwparams):
+ if self.handcrafted:
+ allPosParams = (self.mock,) + params
+ func = _findFunc(self.mock.realClass, self.name)
+ if not func:
+ raise NotImplementedError
+ return func(*allPosParams, **kwparams)
+ else:
+ returnVal = self.mock.mockReturnValues.get(self.name)
+ if isinstance(returnVal, ReturnValuesBase):
+ returnVal = returnVal.next()
+ return returnVal
+
+ def checkExpectations(self, thisCall, params, kwparams):
+ if self.name in self.mock.mockExpectations:
+ callsMade = len(self.mock.mockCalledMethods[self.name])
+ for (expectation, after, until) in self.mock.mockExpectations[self.name]:
+ if callsMade > after and (until==0 or callsMade < until):
+ assert expectation(self.mock, thisCall, len(self.mock.mockAllCalledMethods)-1), 'Expectation failed: '+str(thisCall)
+
+
+def _findFunc(cl, name):
+ """ Depth first search for a method with a given name. """
+ if name in cl.__dict__:
+ return cl.__dict__[name]
+ for base in cl.__bases__:
+ func = _findFunc(base, name)
+ if func:
+ return func
+ return None
+
+
+
+class ReturnValuesBase:
+ def next(self):
+ try:
+ return self.iter.next()
+ except StopIteration:
+ raise AssertionError("No more return values")
+ def __iter__(self):
+ return self
+
+class ReturnValues(ReturnValuesBase):
+ def __init__(self, *values):
+ self.iter = iter(values)
+
+
+class ReturnIterator(ReturnValuesBase):
+ def __init__(self, iterator):
+ self.iter = iter(iterator)
+
+
+def expectParams(*params, **keywords):
+ '''check that the callObj is called with specified params and keywords
+ '''
+ def fn(mockObj, callObj, idx):
+ return callObj.params == params and callObj.kwparams == keywords
+ return fn
+
+
+def expectAfter(*methods):
+ '''check that the function is only called after all the functions in 'methods'
+ '''
+ def fn(mockObj, callObj, idx):
+ calledMethods = [method.getName() for method in mockObj.mockGetAllCalls()]
+ #skip last entry, since that is the current call
+ calledMethods = calledMethods[:-1]
+ for method in methods:
+ if method not in calledMethods:
+ return False
+ return True
+ return fn
+
+def expectException(exception, *args, **kwargs):
+ ''' raise an exception when the method is called
+ '''
+ def fn(mockObj, callObj, idx):
+ raise exception(*args, **kwargs)
+ return fn
+
+
+def expectParam(paramIdx, cond):
+ '''check that the callObj is called with parameter specified by paramIdx (a position index or keyword)
+ fulfills the condition specified by cond.
+ cond is a function that takes a single argument, the value to test.
+ '''
+ def fn(mockObj, callObj, idx):
+ param = callObj.getParam(paramIdx)
+ return cond(param)
+ return fn
+
+def EQ(value):
+ def testFn(param):
+ return param == value
+ return testFn
+
+def NE(value):
+ def testFn(param):
+ return param != value
+ return testFn
+
+def GT(value):
+ def testFn(param):
+ return param > value
+ return testFn
+
+def LT(value):
+ def testFn(param):
+ return param < value
+ return testFn
+
+def GE(value):
+ def testFn(param):
+ return param >= value
+ return testFn
+
+def LE(value):
+ def testFn(param):
+ return param <= value
+ return testFn
+
+def AND(*condlist):
+ def testFn(param):
+ for cond in condlist:
+ if not cond(param):
+ return False
+ return True
+ return testFn
+
+def OR(*condlist):
+ def testFn(param):
+ for cond in condlist:
+ if cond(param):
+ return True
+ return False
+ return testFn
+
+def NOT(cond):
+ def testFn(param):
+ return not cond(param)
+ return testFn
+
+def MATCHES(regex, *args, **kwargs):
+ compiled_regex = re.compile(regex, *args, **kwargs)
+ def testFn(param):
+ return compiled_regex.match(param) is not None
+ return testFn
+
+def SEQ(*sequence):
+ iterator = iter(sequence)
+ def testFn(param):
+ try:
+ cond = iterator.next()
+ except StopIteration:
+ raise AssertionError('SEQ exhausted')
+ return cond(param)
+ return testFn
+
+def IS(instance):
+ def testFn(param):
+ return param is instance
+ return testFn
+
+def ISINSTANCE(class_):
+ def testFn(param):
+ return isinstance(param, class_)
+ return testFn
+
+def ISSUBCLASS(class_):
+ def testFn(param):
+ return issubclass(param, class_)
+ return testFn
+
+def CONTAINS(val):
+ def testFn(param):
+ return val in param
+ return testFn
+
+def IN(container):
+ def testFn(param):
+ return param in container
+ return testFn
+
+def HASATTR(attr):
+ def testFn(param):
+ return hasattr(param, attr)
+ return testFn
+
+def HASMETHOD(method):
+ def testFn(param):
+ return hasattr(param, method) and callable(getattr(param, method))
+ return testFn
+
+CALLABLE = callable
diff --git a/test/lib/xmpp_mocks.py b/test/lib/xmpp_mocks.py
new file mode 100644
index 0000000..a2b9799
--- /dev/null
+++ b/test/lib/xmpp_mocks.py
@@ -0,0 +1,95 @@
+'''
+Module with dummy classes for unit testing of XMPP and related code.
+'''
+
+import threading, time
+
+from mock import Mock
+
+from nbxmpp import idlequeue
+
+IDLEQUEUE_INTERVAL = 0.2 # polling interval. 200ms is used in Gajim as default
+IDLEMOCK_TIMEOUT = 30 # how long we wait for an event
+
+class IdleQueueThread(threading.Thread):
+ '''
+ Thread for regular processing of idlequeue.
+ '''
+ def __init__(self):
+ self.iq = idlequeue.IdleQueue()
+ self.stop = threading.Event() # Event to stop the thread main loop.
+ self.stop.clear()
+ threading.Thread.__init__(self)
+
+ def run(self):
+ while not self.stop.isSet():
+ self.iq.process()
+ time.sleep(IDLEQUEUE_INTERVAL)
+
+ def stop_thread(self):
+ self.stop.set()
+
+
+class IdleMock:
+ '''
+ Serves as template for testing objects that are normally controlled by GUI.
+ Allows to wait for asynchronous callbacks with wait() method.
+ '''
+ def __init__(self):
+ self._event = threading.Event()
+ self._event.clear()
+
+ def wait(self):
+ '''
+ Block until some callback sets the event and clearing the event
+ subsequently.
+ Returns True if event was set, False on timeout
+ '''
+ self._event.wait(IDLEMOCK_TIMEOUT)
+ if self._event.isSet():
+ self._event.clear()
+ return True
+ else:
+ return False
+
+ def set_event(self):
+ self._event.set()
+
+
+class MockConnection(IdleMock, Mock):
+ '''
+ Class simulating Connection class from src/common/connection.py
+
+ It is derived from Mock in order to avoid defining all methods
+ from real Connection that are called from NBClient or Dispatcher
+ ( _event_dispatcher for example)
+ '''
+
+ def __init__(self, *args):
+ self.connect_succeeded = True
+ IdleMock.__init__(self)
+ Mock.__init__(self, *args)
+
+ def on_connect(self, success, *args):
+ '''
+ Method called after connecting - after receiving <stream:features>
+ from server (NOT after TLS stream restart) or connect failure
+ '''
+ self.connect_succeeded = success
+ self.set_event()
+
+
+ def on_auth(self, con, auth):
+ '''
+ Method called after authentication, regardless of the result.
+
+ :Parameters:
+ con : NonBlockingClient
+ reference to authenticated object
+ auth : string
+ type of authetication in case of success ('old_auth', 'sasl') or
+ None in case of auth failure
+ '''
+ self.auth_connection = con
+ self.auth = auth
+ self.set_event()
diff --git a/test/runtests.py b/test/runtests.py
new file mode 100755
index 0000000..274a685
--- /dev/null
+++ b/test/runtests.py
@@ -0,0 +1,51 @@
+#!/usr/bin/env python
+
+
+'''
+Runs python-nbxmpp's Test Suite
+
+Unit tests tests will be run on each commit.
+'''
+
+import sys
+import unittest
+import getopt
+verbose = 1
+
+try:
+ shortargs = 'hv:'
+ longargs = 'help verbose='
+ opts, args = getopt.getopt(sys.argv[1:], shortargs, longargs.split())
+except getopt.error, msg:
+ print msg
+ print 'for help use --help'
+ sys.exit(2)
+for o, a in opts:
+ if o in ('-h', '--help'):
+ print 'runtests [--help] [--verbose level]'
+ sys.exit()
+ elif o in ('-v', '--verbose'):
+ try:
+ verbose = int(a)
+ except Exception:
+ print 'verbose must be a number >= 0'
+ sys.exit(2)
+
+# new test modules need to be added manually
+modules = ( 'unit.test_xmpp_dispatcher_nb',
+ 'unit.test_xmpp_transports_nb',
+ 'unit.test_xmpp_smacks',
+ #'unit.test_xmpp_client_nb', gajim.org only supports TLS/SSL connections
+ 'unit.test_xmpp_transports_nb2',
+ )
+
+nb_errors = 0
+nb_failures = 0
+
+for mod in modules:
+ suite = unittest.defaultTestLoader.loadTestsFromName(mod)
+ result = unittest.TextTestRunner(verbosity=verbose).run(suite)
+ nb_errors += len(result.errors)
+ nb_failures += len(result.failures)
+
+sys.exit(nb_errors + nb_failures)
diff --git a/test/unit/__init__.py b/test/unit/__init__.py
new file mode 100644
index 0000000..0252a7b
--- /dev/null
+++ b/test/unit/__init__.py
@@ -0,0 +1,5 @@
+'''
+
+This package just contains plain unit tests
+
+'''
diff --git a/test/unit/test_xmpp_client_nb.py b/test/unit/test_xmpp_client_nb.py
new file mode 100644
index 0000000..8ccac5b
--- /dev/null
+++ b/test/unit/test_xmpp_client_nb.py
@@ -0,0 +1,163 @@
+'''
+Testing script for NonBlockingClient class (src/common/xmpp/client_nb.py)
+
+It actually connects to a xmpp server.
+'''
+
+import unittest
+
+import lib
+lib.setup_env()
+
+from xmpp_mocks import MockConnection, IdleQueueThread
+from mock import Mock
+from nbxmpp import client_nb
+
+# (XMPP server hostname, c2s port). Script will connect to the machine.
+xmpp_server_port = ('gajim.org', 5222)
+
+# [username, password, resource]. Script will authenticate to server above
+credentials = ['unittest', 'testtest', 'res']
+
+class TestNonBlockingClient(unittest.TestCase):
+ '''
+ Test Cases class for NonBlockingClient.
+ '''
+ def setUp(self):
+ ''' IdleQueue thread is run and dummy connection is created. '''
+ self.idlequeue_thread = IdleQueueThread()
+ self.connection = MockConnection() # for dummy callbacks
+ self.idlequeue_thread.start()
+
+ def tearDown(self):
+ ''' IdleQueue thread is stopped. '''
+ self.idlequeue_thread.stop_thread()
+ self.idlequeue_thread.join()
+ del self.connection
+
+ self.client = None
+
+ def open_stream(self, server_port, wrong_pass=False):
+ '''
+ Method opening the XMPP connection. It returns when <stream:features>
+ is received from server.
+
+ :param server_port: tuple of (hostname, port) for where the client should
+ connect.
+ '''
+
+ class TempConnection():
+ def get_password(self, cb, mechanism):
+ if wrong_pass:
+ cb('wrong pass')
+ else:
+ cb(credentials[1])
+ def on_connect_failure(self):
+ pass
+
+ self.client = client_nb.NonBlockingClient(
+ domain=server_port[0],
+ idlequeue=self.idlequeue_thread.iq,
+ caller=Mock(realClass=TempConnection))
+
+ self.client.connect(
+ hostname=server_port[0],
+ port=server_port[1],
+ on_connect=lambda *args: self.connection.on_connect(True, *args),
+ on_connect_failure=lambda *args: self.connection.on_connect(
+ False, *args))
+
+ self.assert_(self.connection.wait(),
+ msg='waiting for callback from client constructor')
+
+ # if on_connect was called, client has to be connected and vice versa
+ if self.connection.connect_succeeded:
+ self.assert_(self.client.get_connect_type())
+ else:
+ self.assert_(not self.client.get_connect_type())
+
+ def client_auth(self, username, password, resource, sasl):
+ '''
+ Method authenticating connected client with supplied credentials. Returns
+ when authentication is over.
+
+ :param sasl: whether to use sasl (sasl=1) or old (sasl=0) authentication
+ :todo: to check and be more specific about when it returns
+ (bind, session..)
+ '''
+ self.client.auth(username, password, resource, sasl,
+ on_auth=self.connection.on_auth)
+
+ self.assert_(self.connection.wait(), msg='waiting for authentication')
+
+ def do_disconnect(self):
+ '''
+ Does disconnecting of connected client. Returns when TCP connection is
+ closed.
+ '''
+ self.client.RegisterDisconnectHandler(self.connection.set_event)
+ self.client.disconnect()
+
+ self.assertTrue(self.connection.wait(), msg='waiting for disconnecting')
+
+ def test_proper_connect_sasl(self):
+ '''
+ The ideal testcase - client is connected, authenticated with SASL and
+ then disconnected.
+ '''
+ self.open_stream(xmpp_server_port)
+
+ # if client is not connected, lets raise the AssertionError
+ self.assert_(self.client.get_connect_type())
+ # client.disconnect() is already called from NBClient via
+ # _on_connected_failure, no need to call it here
+
+ self.client_auth(credentials[0], credentials[1], credentials[2], sasl=1)
+ self.assert_(self.connection.con)
+ self.assert_(self.connection.auth=='sasl', msg='Unable to auth via SASL')
+
+ self.do_disconnect()
+
+ def test_proper_connect_oldauth(self):
+ '''
+ The ideal testcase - client is connected, authenticated with old auth and
+ then disconnected.
+ '''
+ self.open_stream(xmpp_server_port)
+ self.assert_(self.client.get_connect_type())
+ self.client_auth(credentials[0], credentials[1], credentials[2], sasl=0)
+ self.assert_(self.connection.con)
+ features = self.client.Dispatcher.Stream.features
+ if not features.getTag('auth'):
+ print "Server doesn't support old authentication type, ignoring test"
+ else:
+ self.assert_(self.connection.auth=='old_auth',
+ msg='Unable to auth via old_auth')
+ self.do_disconnect()
+
+ def test_connect_to_nonexisting_host(self):
+ '''
+ Connect to nonexisting host. DNS request for A records should return
+ nothing.
+ '''
+ self.open_stream(('fdsfsdf.fdsf.fss', 5222))
+ self.assert_(not self.client.get_connect_type())
+
+ def test_connect_to_wrong_port(self):
+ '''
+ Connect to nonexisting server. DNS request for A records should return an
+ IP but there shouldn't be XMPP server running on specified port.
+ '''
+ self.open_stream((xmpp_server_port[0], 31337))
+ self.assert_(not self.client.get_connect_type())
+
+ def test_connect_with_wrong_creds(self):
+ '''
+ Connecting with invalid password.
+ '''
+ self.open_stream(xmpp_server_port, wrong_pass=True)
+ self.assert_(self.client.get_connect_type())
+ self.client_auth(credentials[0], 'wrong pass', credentials[2], sasl=1)
+ self.assert_(self.connection.auth is None)
+ self.do_disconnect()
+
diff --git a/test/unit/test_xmpp_dispatcher_nb.py b/test/unit/test_xmpp_dispatcher_nb.py
new file mode 100644
index 0000000..d19bde2
--- /dev/null
+++ b/test/unit/test_xmpp_dispatcher_nb.py
@@ -0,0 +1,97 @@
+'''
+Tests for dispatcher_nb.py
+'''
+import unittest
+
+import lib
+lib.setup_env()
+
+from mock import Mock
+
+from nbxmpp import dispatcher_nb
+from nbxmpp import protocol
+
+class TestDispatcherNB(unittest.TestCase):
+ '''
+ Test class for NonBlocking dispatcher. Tested dispatcher will be plugged
+ into a mock client
+ '''
+ def setUp(self):
+ self.dispatcher = dispatcher_nb.XMPPDispatcher()
+
+ # Setup mock client
+ self.client = Mock()
+ self.client.__str__ = lambda: 'Mock' # FIXME: why do I need this one?
+ self.client._caller = Mock()
+ self.client.defaultNamespace = protocol.NS_CLIENT
+ self.client.Connection = Mock() # mock transport
+ self.con = self.client.Connection
+
+ def tearDown(self):
+ # Unplug if needed
+ if hasattr(self.dispatcher, '_owner'):
+ self.dispatcher.PlugOut()
+
+ def _simulate_connect(self):
+ self.dispatcher.PlugIn(self.client) # client is owner
+ # Simulate that we have established a connection
+ self.dispatcher.StreamInit()
+ self.dispatcher.ProcessNonBlocking("<stream:stream xmlns:stream='http://etherx.jabber.org/streams' xmlns='jabber:client'>")
+
+ def test_unbound_namespace_prefix(self):
+ '''tests our handling of a message with an unbound namespace prefix'''
+ self._simulate_connect()
+
+ msgs = []
+ def _got_message(conn, msg):
+ msgs.append(msg)
+ self.dispatcher.RegisterHandler('message', _got_message)
+
+ # should be able to parse a normal message
+ self.dispatcher.ProcessNonBlocking('<message><body>hello</body></message>')
+ self.assertEqual(1, len(msgs))
+
+ self.dispatcher.ProcessNonBlocking('<message><x:y/></message>')
+ self.assertEqual(2, len(msgs))
+ # we should not have been disconnected after that message
+ self.assertEqual(0, len(self.con.mockGetNamedCalls('pollend')))
+ self.assertEqual(0, len(self.con.mockGetNamedCalls('disconnect')))
+
+ # we should be able to keep parsing
+ self.dispatcher.ProcessNonBlocking('<message><body>still here?</body></message>')
+ self.assertEqual(3, len(msgs))
+
+ def test_process_non_blocking(self):
+ ''' Check for ProcessNonBlocking return types '''
+ self._simulate_connect()
+ process = self.dispatcher.ProcessNonBlocking
+
+ # length of data expected
+ data = "Please don't fail"
+ result = process(data)
+ self.assertEqual(result, len(data))
+
+ # no data processed, link shall still be active
+ result = process('')
+ self.assertEqual(result, '0')
+ self.assertEqual(0, len(self.con.mockGetNamedCalls('pollend')) +
+ len(self.con.mockGetNamedCalls('disconnect')))
+
+ # simulate disconnect
+ result = process('</stream:stream>')
+ self.assertEqual(1, len(self.client.mockGetNamedCalls('disconnect')))
+
+ def test_return_stanza_handler(self):
+ ''' Test sasl_error_conditions transformation in protocol.py '''
+ # quick'n dirty...I wasn't aware of it existance and thought it would
+ # always fail :-)
+ self._simulate_connect()
+ stanza = "<iq type='get' />"
+ def send(data):
+ self.assertEqual(str(data), '<iq xmlns="jabber:client" type="error"><error code="501" type="cancel"><feature-not-implemented xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" /><text xmlns="urn:ietf:params:xml:ns:xmpp-stanzas">The feature requested is not implemented by the recipient or server and therefore cannot be processed.</text></error></iq>')
+ self.client.send = send
+ self.dispatcher.ProcessNonBlocking(stanza)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/unit/test_xmpp_smacks.py b/test/unit/test_xmpp_smacks.py
new file mode 100644
index 0000000..4b10a5b
--- /dev/null
+++ b/test/unit/test_xmpp_smacks.py
@@ -0,0 +1,133 @@
+'''
+Tests for smacks.py Stream Management
+'''
+import unittest
+
+import lib
+lib.setup_env()
+
+from mock import Mock
+
+from nbxmpp import dispatcher_nb
+from nbxmpp import protocol
+from nbxmpp import smacks
+
+class TestDispatcherNB(unittest.TestCase):
+ '''
+ Test class for NonBlocking dispatcher. Tested dispatcher will be plugged
+ into a mock client
+ '''
+ def setUp(self):
+ self.dispatcher = dispatcher_nb.XMPPDispatcher()
+
+ # Setup mock client
+ self.client = Mock()
+ self.client.__str__ = lambda: 'Mock' # FIXME: why do I need this one?
+ self.client._caller = Mock()
+ self.client.defaultNamespace = protocol.NS_CLIENT
+ self.client.Connection = Mock() # mock transport
+ self.con = self.client.Connection
+ self.con.sm = smacks.Smacks(self.con)
+
+ def tearDown(self):
+ # Unplug if needed
+ if hasattr(self.dispatcher, '_owner'):
+ self.dispatcher.PlugOut()
+
+ def _simulate_connect(self):
+ self.dispatcher.PlugIn(self.client) # client is owner
+ self.con.sm.set_owner(self.client)
+ self.dispatcher.sm = self.con.sm
+ # Simulate that we have established a connection
+ self.dispatcher.StreamInit()
+ self.dispatcher.ProcessNonBlocking("<stream:stream "
+ "xmlns:stream='http://etherx.jabber.org/streams' "
+ "xmlns='jabber:client'>")
+ self.dispatcher.ProcessNonBlocking("<stream:features> "
+ "<sm xmlns='urn:xmpp:sm:2'> <optional/> </sm> </stream:features>")
+ self.con.sm.negociate()
+ self.dispatcher.ProcessNonBlocking("<enabled xmlns='urn:xmpp:sm:2' "
+ "id='some-long-sm-id' resume='true'/>")
+ assert(self.con.sm.enabled)
+
+ def _simulate_resume(self):
+ self.con.sm.resume_request()
+ # Resuming acknowledging 5 stanzas
+ self.dispatcher.ProcessNonBlocking("<resumed xmlns='urn:xmpp:sm:2' "
+ "id='some-long-sm-id' h='5'/>")
+ assert(self.con.sm.resuming)
+
+ def _send(self, send, r, stanza):
+ for i in range(r):
+ send(stanza)
+ def test_messages(self):
+ message = '<message><body>Helloo </body></message>'
+ iq = '''<iq from='proxy.jabber.ru' to='j.xxxxxxxx.org/Gajim' type='error' id='18'>
+ <query xmlns='http://jabber.org/protocol/bytestreams'/>
+ <error code='403' type='auth'>
+ <forbidden xmlns='urn:ietf:params:xml:ns:xmpp-stanzas'/>
+ </error>
+ </iq>'''
+ presence = '''<presence from='xxxxxxxxx.com/Talk.v1044194B1E2' to='j.xxxxxxxx.org'>
+ <priority>24</priority>
+ <c node="http://www.google.com/xmpp/client/caps" ver="1.0.0.104" ext="share-v1 voice-v1" xmlns="http://jabber.org/protocol/caps"/>
+ <x stamp="20110614T23:17:51" xmlns="jabber:x:delay"/>
+ <status>In love Kakashi Sensei :P</status>
+ <x xmlns="vcard-temp:x:update">
+ <photo>db4b7c52e39ba28562c74542d5988d47f09108a3</photo>
+ </x>
+ </presence> '''
+
+ self._simulate_connect()
+ uqueue = self.con.sm.uqueue
+ self.assertEqual(self.con.sm.out_h, 0)
+ self.assertEqual(self.con.sm.in_h, 0)
+
+ # The server sends 10 stanzas
+ self._send(self.dispatcher.ProcessNonBlocking, 5, message)
+ self._send(self.dispatcher.ProcessNonBlocking, 4, iq)
+ self._send(self.dispatcher.ProcessNonBlocking, 1, presence)
+
+ # The client has recieved 10 stanzas and sent none
+ self.assertEqual(self.con.sm.in_h, 10)
+ self.assertEqual(self.con.sm.out_h, 0)
+
+ m = protocol.Message()
+
+ # The client sends 10 stanzas
+ for i in range(10):
+ m = protocol.Message(body=str(i))
+ self.dispatcher.send(m)
+
+ # Client sends 10 stanzas and put them in the queue
+ self.assertEqual(self.con.sm.out_h, 10)
+ self.assertEqual(len(uqueue), 10)
+
+ # The server acknowledges that it recieved 5 stanzas
+ self.dispatcher.ProcessNonBlocking("<a xmlns='urn:xmpp:sm:2' h='5'/>")
+ # 5 stanzas are removed from the queue, only 5 stanzas are left
+
+ self.assertEqual(len(uqueue), 5)
+
+ # Check for the right order of stanzas in the queue
+ l = ['5', '6', '7', '8', '9']
+ for i in uqueue:
+ self.assertEqual(i.getBody(), l[0])
+ l.pop(0)
+
+ def test_resumption(self):
+ self._simulate_connect()
+
+ m = protocol.Message()
+
+ # The client sends 5 stanzas
+ for i in range(5):
+ m = protocol.Message(body=str(i))
+ self.dispatcher.send(m)
+
+ self._simulate_resume()
+ # No stanzas left
+ self.assertEqual(len(self.con.sm.uqueue), 0)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/unit/test_xmpp_transports_nb.py b/test/unit/test_xmpp_transports_nb.py
new file mode 100644
index 0000000..3bc68f4
--- /dev/null
+++ b/test/unit/test_xmpp_transports_nb.py
@@ -0,0 +1,78 @@
+'''
+Unit test for tranports classes.
+'''
+
+import unittest
+
+import lib
+lib.setup_env()
+
+from nbxmpp import transports_nb
+
+
+class TestModuleLevelFunctions(unittest.TestCase):
+ '''
+ Test class for functions defined at module level
+ '''
+ def test_urisplit(self):
+ def check_uri(uri, proto, host, port, path):
+ _proto, _host, _port, _path = transports_nb.urisplit(uri)
+ self.assertEqual(proto, _proto)
+ self.assertEqual(host, _host)
+ self.assertEqual(path, _path)
+ self.assertEqual(port, _port)
+
+ check_uri('http://httpcm.jabber.org:5280/webclient', proto='http',
+ host='httpcm.jabber.org', port=5280, path='/webclient')
+
+ check_uri('http://httpcm.jabber.org/webclient', proto='http',
+ host='httpcm.jabber.org', port=80, path='/webclient')
+
+ check_uri('https://httpcm.jabber.org/webclient', proto='https',
+ host='httpcm.jabber.org', port=443, path='/webclient')
+
+ def test_get_proxy_data_from_dict(self):
+ def check_dict(proxy_dict, host, port, user, passwd):
+ _host, _port, _user, _passwd = transports_nb.get_proxy_data_from_dict(
+ proxy_dict)
+ self.assertEqual(_host, host)
+ self.assertEqual(_port, port)
+ self.assertEqual(_user, user)
+ self.assertEqual(_passwd, passwd)
+
+ bosh_dict = {'bosh_content': u'text/xml; charset=utf-8',
+ 'bosh_hold': 2,
+ 'bosh_http_pipelining': False,
+ 'bosh_uri': u'http://gajim.org:5280/http-bind',
+ 'bosh_useproxy': False,
+ 'bosh_wait': 30,
+ 'bosh_wait_for_restart_response': False,
+ 'host': u'172.16.99.11',
+ 'pass': u'pass',
+ 'port': 3128,
+ 'type': u'bosh',
+ 'useauth': True,
+ 'user': u'user'}
+ check_dict(bosh_dict, host=u'gajim.org', port=5280, user=u'user',
+ passwd=u'pass')
+
+ proxy_dict = {'bosh_content': u'text/xml; charset=utf-8',
+ 'bosh_hold': 2,
+ 'bosh_http_pipelining': False,
+ 'bosh_port': 5280,
+ 'bosh_uri': u'',
+ 'bosh_useproxy': True,
+ 'bosh_wait': 30,
+ 'bosh_wait_for_restart_response': False,
+ 'host': u'172.16.99.11',
+ 'pass': u'pass',
+ 'port': 3128,
+ 'type': 'socks5',
+ 'useauth': True,
+ 'user': u'user'}
+ check_dict(proxy_dict, host=u'172.16.99.11', port=3128, user=u'user',
+ passwd=u'pass')
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/unit/test_xmpp_transports_nb2.py b/test/unit/test_xmpp_transports_nb2.py
new file mode 100644
index 0000000..43117ab
--- /dev/null
+++ b/test/unit/test_xmpp_transports_nb2.py
@@ -0,0 +1,276 @@
+'''
+Integration test for tranports classes. See unit for the ordinary
+unit tests of this module.
+'''
+
+import unittest
+import socket
+
+import lib
+lib.setup_env()
+
+from xmpp_mocks import IdleQueueThread, IdleMock
+from nbxmpp import transports_nb
+
+
+class AbstractTransportTest(unittest.TestCase):
+ ''' Encapsulates Idlequeue instantiation for transports and more...'''
+
+ def setUp(self):
+ ''' IdleQueue thread is run and dummy connection is created. '''
+ self.idlequeue_thread = IdleQueueThread()
+ self.idlequeue_thread.start()
+ self._setup_hook()
+
+ def tearDown(self):
+ ''' IdleQueue thread is stopped. '''
+ self._teardown_hook()
+ self.idlequeue_thread.stop_thread()
+ self.idlequeue_thread.join()
+
+ def _setup_hook(self):
+ pass
+
+ def _teardown_hook(self):
+ pass
+
+ def expect_receive(self, expected, count=1, msg=None):
+ '''
+ Returns a callback function that will assert whether the data passed to
+ it equals the one specified when calling this function.
+
+ Can be used to make sure transport dispatch correct data.
+ '''
+ def receive(data, *args, **kwargs):
+ self.assertEqual(data, expected, msg=msg)
+ self._expected_count -= 1
+ self._expected_count = count
+ return receive
+
+ def have_received_expected(self):
+ '''
+ Plays together with expect_receive(). Will return true if expected_rcv
+ callback was called as often as specified
+ '''
+ return self._expected_count == 0
+
+
+class TestNonBlockingTCP(AbstractTransportTest):
+ '''
+ Test class for NonBlockingTCP. Will actually try to connect to an existing
+ XMPP server.
+ '''
+ class MockClient(IdleMock):
+ ''' Simple client to test transport functionality '''
+ def __init__(self, idlequeue, testcase):
+ self.idlequeue = idlequeue
+ self.testcase = testcase
+ IdleMock.__init__(self)
+
+ def do_connect(self, establish_tls=False, proxy_dict=None):
+ try:
+ ips = socket.getaddrinfo('gajim.org', 5222,
+ socket.AF_UNSPEC, socket.SOCK_STREAM)
+ ip = ips[0]
+ except socket.error, e:
+ self.testcase.fail(msg=str(e))
+
+ self.socket = transports_nb.NonBlockingTCP(
+ raise_event=lambda event_type, data: self.testcase.assertTrue(
+ event_type and data),
+ on_disconnect=lambda: self.on_success(mode='SocketDisconnect'),
+ idlequeue=self.idlequeue,
+ estabilish_tls=establish_tls,
+ certs=('../data/other/cacerts.pem', 'tmp/cacerts.pem'),
+ proxy_dict=proxy_dict)
+
+ self.socket.PlugIn(self)
+
+ self.socket.connect(conn_5tuple=ip,
+ on_connect=lambda: self.on_success(mode='TCPconnect'),
+ on_connect_failure=self.on_failure)
+ self.testcase.assertTrue(self.wait(), msg='Connection timed out')
+
+ def do_disconnect(self):
+ self.socket.disconnect()
+ self.testcase.assertTrue(self.wait(), msg='Disconnect timed out')
+
+ def on_failure(self, err_message):
+ self.set_event()
+ self.testcase.fail(msg=err_message)
+
+ def on_success(self, mode, data=None):
+ if mode == "TCPconnect":
+ pass
+ if mode == "SocketDisconnect":
+ pass
+ self.set_event()
+
+ def _setup_hook(self):
+ self.client = self.MockClient(idlequeue=self.idlequeue_thread.iq,
+ testcase=self)
+
+ def _teardown_hook(self):
+ if self.client.socket.state == 'CONNECTED':
+ self.client.do_disconnect()
+
+ def test_connect_disconnect_plain(self):
+ ''' Establish plain connection '''
+ self.client.do_connect(establish_tls=False)
+ self.assertEquals(self.client.socket.state, 'CONNECTED')
+ self.client.do_disconnect()
+ self.assertEquals(self.client.socket.state, 'DISCONNECTED')
+
+# def test_connect_disconnect_ssl(self):
+# ''' Establish SSL (not TLS) connection '''
+# self.client.do_connect(establish_tls=True)
+# self.assertEquals(self.client.socket.state, 'CONNECTED')
+# self.client.do_disconnect()
+# self.assertEquals(self.client.socket.state, 'DISCONNECTED')
+
+ def test_do_receive(self):
+ ''' Test _do_receive method by overwriting socket.recv '''
+ self.client.do_connect()
+ sock = self.client.socket
+
+ # transport shall receive data
+ data = "Please don't fail"
+ sock._recv = lambda buffer: data
+ sock.onreceive(self.expect_receive(data))
+ sock._do_receive()
+ self.assertTrue(self.have_received_expected(), msg='Did not receive data')
+ self.assert_(self.client.socket.state == 'CONNECTED')
+
+ # transport shall do nothing as an non-fatal SSL is simulated
+ sock._recv = lambda buffer: None
+ sock.onreceive(self.assertFalse) # we did not receive anything...
+ sock._do_receive()
+ self.assert_(self.client.socket.state == 'CONNECTED')
+
+ # transport shall disconnect as remote side closed the connection
+ sock._recv = lambda buffer: ''
+ sock.onreceive(self.assertFalse) # we did not receive anything...
+ sock._do_receive()
+ self.assert_(self.client.socket.state == 'DISCONNECTED')
+
+ def test_do_send(self):
+ ''' Test _do_send method by overwriting socket.send '''
+ self.client.do_connect()
+ sock = self.client.socket
+
+ outgoing = [] # what we have actually send to our socket.socket
+ data_part1 = "Please don't "
+ data_part2 = "fail!"
+ data_complete = data_part1 + data_part2
+
+ # Simulate everything could be send in one go
+ def _send_all(data):
+ outgoing.append(data)
+ return len(data)
+ sock._send = _send_all
+ sock.send(data_part1)
+ sock.send(data_part2)
+ sock._do_send()
+ sock._do_send()
+ self.assertTrue(self.client.socket.state == 'CONNECTED')
+ self.assertTrue(data_part1 in outgoing and data_part2 in outgoing)
+ self.assertFalse(sock.sendqueue and sock.sendbuff,
+ msg='There is still unsend data in buffers')
+
+ # Simulate data could only be sent in chunks
+ self.chunk_count = 0
+ outgoing = []
+ def _send_chunks(data):
+ if self.chunk_count == 0:
+ outgoing.append(data_part1)
+ self.chunk_count += 1
+ return len(data_part1)
+ else:
+ outgoing.append(data_part2)
+ return len(data_part2)
+ sock._send = _send_chunks
+ sock.send(data_complete)
+ sock._do_send() # process first chunk
+ sock._do_send() # process the second one
+ self.assertTrue(self.client.socket.state == 'CONNECTED')
+ self.assertTrue(data_part1 in outgoing and data_part2 in outgoing)
+ self.assertFalse(sock.sendqueue and sock.sendbuff,
+ msg='There is still unsend data in buffers')
+
+
+class TestNonBlockingHTTP(AbstractTransportTest):
+ ''' Test class for NonBlockingHTTP transport'''
+
+ bosh_http_dict = {
+ 'http_uri': 'http://gajim.org:5280/http-bind',
+ 'http_version': 'HTTP/1.1',
+ 'http_persistent': True,
+ 'add_proxy_headers': False
+ }
+
+ def _get_transport(self, http_dict, proxy_dict=None):
+ return transports_nb.NonBlockingHTTP(
+ raise_event=None,
+ on_disconnect=None,
+ idlequeue=self.idlequeue_thread.iq,
+ estabilish_tls=False,
+ certs=None,
+ on_http_request_possible=lambda: None,
+ on_persistent_fallback=None,
+ http_dict=http_dict,
+ proxy_dict=proxy_dict,
+ )
+
+ def test_parse_own_http_message(self):
+ ''' Build a HTTP message and try to parse it afterwards '''
+ transport = self._get_transport(self.bosh_http_dict)
+
+ data = "<test>Please don't fail!</test>"
+ http_message = transport.build_http_message(data)
+ statusline, headers, http_body, buffer_rest = transport.parse_http_message(
+ http_message)
+
+ self.assertFalse(bool(buffer_rest))
+ self.assertTrue(statusline and isinstance(statusline, list))
+ self.assertTrue(headers and isinstance(headers, dict))
+ self.assertEqual(data, http_body, msg='Input and output are different')
+
+ def test_receive_http_message(self):
+ ''' Let _on_receive handle some http messages '''
+ transport = self._get_transport(self.bosh_http_dict)
+
+ header = ("HTTP/1.1 200 OK\r\nContent-Type: text/xml; charset=utf-8\r\n" +
+ "Content-Length: 88\r\n\r\n")
+ payload = "<test>Please don't fail!</test>"
+ body = "<body xmlns='http://jabber.org/protocol/httpbind'>%s</body>" \
+ % payload
+ message = "%s%s" % (header, body)
+
+ # try to receive in one go
+ transport.onreceive(self.expect_receive(body, msg='Failed: In one go'))
+ transport._on_receive(message)
+ self.assertTrue(self.have_received_expected(), msg='Failed: In one go')
+
+ def test_receive_http_message_in_chunks(self):
+ ''' Let _on_receive handle some chunked http messages '''
+ transport = self._get_transport(self.bosh_http_dict)
+
+ payload = "<test>Please don't fail!\n\n</test>"
+ body = "<body xmlns='http://jabber.org/protocol/httpbind'>%s</body>" \
+ % payload
+ header = "HTTP/1.1 200 OK\r\nContent-Type: text/xml; charset=utf-8\r\n" +\
+ "Content-Length: %i\r\n\r\n" % len(body)
+ message = "%s%s" % (header, body)
+
+ chunk1, chunk2, chunk3, chunk4 = message[:20], message[20:73], \
+ message[73:85], message[85:]
+ nextmessage_chunk = "HTTP/1.1 200 OK\r\nContent-Type: text/x"
+ chunks = (chunk1, chunk2, chunk3, chunk4, nextmessage_chunk)
+
+ transport.onreceive(self.expect_receive(body, msg='Failed: In chunks'))
+ for chunk in chunks:
+ transport._on_receive(chunk)
+ self.assertTrue(self.have_received_expected(), msg='Failed: In chunks')
+
+if __name__ == '__main__':
+ unittest.main()