Welcome to mirror list, hosted at ThFree Co, Russian Federation.

xmpp_mocks.py « lib « test - dev.gajim.org/gajim/python-nbxmpp.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: e461d10ca809b2a7f67d7639e23f34b5ddf78dc6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
'''
Module with dummy classes for unit testing of XMPP and related code.
'''

import threading, time

from test.lib.mock import Mock

from nbxmpp import idlequeue

IDLEQUEUE_INTERVAL = 0.2 # polling interval. 200ms is used in Gajim as default
IDLEMOCK_TIMEOUT = 30 # how long we wait for an event

class IdleQueueThread(threading.Thread):
    '''
    Thread for regular processing of idlequeue.
    '''
    def __init__(self):
        self.iq = idlequeue.SelectIdleQueue()
        self.stop = threading.Event() # Event to stop the thread main loop.
        self.stop.clear()
        threading.Thread.__init__(self)

    def run(self):
        while not self.stop.isSet():
            self.iq.process()
            time.sleep(IDLEQUEUE_INTERVAL)

    def stop_thread(self):
        self.stop.set()


class IdleMock:
    '''
    Serves as template for testing objects that are normally controlled by GUI.
    Allows to wait for asynchronous callbacks with wait() method.
    '''
    def __init__(self):
        self._event = threading.Event()
        self._event.clear()

    def wait(self):
        '''
        Block until some callback sets the event and clearing the event
        subsequently.
        Returns True if event was set, False on timeout
        '''
        self._event.wait(IDLEMOCK_TIMEOUT)
        if self._event.isSet():
            self._event.clear()
            return True
        else:
            return False

    def set_event(self):
        self._event.set()


class MockConnection(IdleMock, Mock):
    '''
    Class simulating Connection class from src/common/connection.py

    It is derived from Mock in order to avoid defining all methods
    from real Connection that are called from NBClient or Dispatcher
    ( _event_dispatcher for example)
    '''

    def __init__(self, *args):
        self.connect_succeeded = True
        IdleMock.__init__(self)
        Mock.__init__(self, *args)

    def on_connect(self, success, *args):
        '''
        Method called after connecting - after receiving <stream:features>
        from server (NOT after TLS stream restart) or connect failure
        '''
        self.connect_succeeded = success
        self.set_event()


    def on_auth(self, con, auth):
        '''
        Method called after authentication, regardless of the result.

        :Parameters:
                con : NonBlockingClient
                        reference to authenticated object
                auth : string
                        type of authetication in case of success ('old_auth', 'sasl') or
                        None in case of auth failure
        '''
        self.auth_connection = con
        self.auth = auth
        self.set_event()