Welcome to mirror list, hosted at ThFree Co, Russian Federation.

test_xmpp_transports_nb.py « integration « test - dev.gajim.org/gajim/gajim.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: f6cbab5102b6d731ae929be184d180e22beb28bf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
'''
Integration test for tranports classes. See unit for the ordinary
unit tests of this module.
'''

import unittest
import socket

import lib
lib.setup_env()

from xmpp_mocks import IdleQueueThread, IdleMock
from common.xmpp import transports_nb


class AbstractTransportTest(unittest.TestCase):
	''' Encapsulates Idlequeue instantiation for transports and more...'''

	def setUp(self):
		''' IdleQueue thread is run and dummy connection is created. '''
		self.idlequeue_thread = IdleQueueThread()
		self.idlequeue_thread.start()
		self._setup_hook()

	def tearDown(self):
		''' IdleQueue thread is stopped. '''
		self._teardown_hook()
		self.idlequeue_thread.stop_thread()
		self.idlequeue_thread.join()

	def _setup_hook(self):
		pass

	def _teardown_hook(self):
		pass

	def expect_receive(self, expected, count=1, msg=None):
		'''
		Returns a callback function that will assert whether the data passed to 
		it equals the one specified when calling this function.

		Can be used to make sure transport dispatch correct data.
		'''
		def receive(data, *args, **kwargs):
			self.assertEqual(data, expected, msg=msg)
			self._expected_count -= 1
		self._expected_count = count
		return receive

	def have_received_expected(self):
		'''
		Plays together with expect_receive(). Will return true if expected_rcv
		callback was called as often as specified
		'''
		return self._expected_count == 0


class TestNonBlockingTCP(AbstractTransportTest):
	'''
	Test class for NonBlockingTCP. Will actually try to connect to an existing
	XMPP server.
	'''
	class MockClient(IdleMock):
		''' Simple client to test transport functionality '''
		def __init__(self, idlequeue, testcase):
			self.idlequeue = idlequeue
			self.testcase = testcase
			IdleMock.__init__(self)

		def do_connect(self, establish_tls=False, proxy_dict=None):
			try:
				ips = socket.getaddrinfo('gajim.org', 5222,
					socket.AF_UNSPEC,socket.SOCK_STREAM) 
				ip = ips[0]
			except socket.error, e:
				self.testcase.fail(msg=str(e))

			self.socket = transports_nb.NonBlockingTCP(
				raise_event=lambda event_type, data: self.testcase.assertTrue(
					event_type and data),
				on_disconnect=lambda: self.on_success(mode='SocketDisconnect'),
				idlequeue=self.idlequeue,
				estabilish_tls=establish_tls,
				certs=('../data/other/cacerts.pem', 'tmp/cacerts.pem'),
				proxy_dict=proxy_dict)

			self.socket.PlugIn(self)

			self.socket.connect(conn_5tuple=ip,
				on_connect=lambda: self.on_success(mode='TCPconnect'),
				on_connect_failure=self.on_failure)
			self.testcase.assertTrue(self.wait(), msg='Connection timed out')

		def do_disconnect(self):
			self.socket.disconnect()
			self.testcase.assertTrue(self.wait(), msg='Disconnect timed out')

		def on_failure(self, err_message):
			self.set_event()
			self.testcase.fail(msg=err_message)

		def on_success(self, mode, data=None):
			if mode == "TCPconnect":
				pass
			if mode == "SocketDisconnect":
				pass
			self.set_event()

	def _setup_hook(self):
		self.client = self.MockClient(idlequeue=self.idlequeue_thread.iq,
			testcase=self)
	
	def _teardown_hook(self):
		if self.client.socket.state == 'CONNECTED':
			self.client.do_disconnect()

	def test_connect_disconnect_plain(self):
		''' Establish plain connection '''
		self.client.do_connect(establish_tls=False)
		self.assertEquals(self.client.socket.state, 'CONNECTED')
		self.client.do_disconnect()
		self.assertEquals(self.client.socket.state, 'DISCONNECTED')
	
#	def test_connect_disconnect_ssl(self):
#		''' Establish SSL (not TLS) connection '''
#		self.client.do_connect(establish_tls=True)
#		self.assertEquals(self.client.socket.state, 'CONNECTED')
#		self.client.do_disconnect()
#		self.assertEquals(self.client.socket.state, 'DISCONNECTED')

	def test_do_receive(self):
		''' Test _do_receive method by overwriting socket.recv '''
		self.client.do_connect()
		sock = self.client.socket

		# transport shall receive data
		data = "Please don't fail"
		sock._recv = lambda buffer: data
		sock.onreceive(self.expect_receive(data))
		sock._do_receive()
		self.assertTrue(self.have_received_expected(), msg='Did not receive data')
		self.assert_(self.client.socket.state == 'CONNECTED')

		# transport shall do nothing as an non-fatal SSL is simulated
		sock._recv = lambda buffer: None
		sock.onreceive(self.assertFalse) # we did not receive anything...
		sock._do_receive()
		self.assert_(self.client.socket.state == 'CONNECTED')

		# transport shall disconnect as remote side closed the connection 
		sock._recv = lambda buffer: ''
		sock.onreceive(self.assertFalse) # we did not receive anything...
		sock._do_receive()
		self.assert_(self.client.socket.state == 'DISCONNECTED')

	def test_do_send(self):
		''' Test _do_send method by overwriting socket.send '''
		self.client.do_connect()
		sock = self.client.socket

		outgoing = [] # what we have actually send to our socket.socket
		data_part1 = "Please don't "
		data_part2 = "fail!"
		data_complete = data_part1 + data_part2
	
		# Simulate everything could be send in one go
		def _send_all(data):
			outgoing.append(data)
			return len(data)
		sock._send = _send_all
		sock.send(data_part1)
		sock.send(data_part2)
		sock._do_send()
		sock._do_send()
		self.assertTrue(self.client.socket.state == 'CONNECTED')
		self.assertTrue(data_part1 in outgoing and data_part2 in outgoing)
		self.assertFalse(sock.sendqueue and sock.sendbuff,
			msg='There is still unsend data in buffers')

		# Simulate data could only be sent in chunks
		self.chunk_count = 0
		outgoing = []
		def _send_chunks(data):
			if self.chunk_count == 0:
				outgoing.append(data_part1)
				self.chunk_count += 1
				return len(data_part1)
			else:
				outgoing.append(data_part2)
				return len(data_part2)
		sock._send = _send_chunks
		sock.send(data_complete)
		sock._do_send() # process first chunk
		sock._do_send() # process the second one
		self.assertTrue(self.client.socket.state == 'CONNECTED')
		self.assertTrue(data_part1 in outgoing and data_part2 in outgoing)
		self.assertFalse(sock.sendqueue and sock.sendbuff,
			msg='There is still unsend data in buffers')


class TestNonBlockingHTTP(AbstractTransportTest):
	''' Test class for NonBlockingHTTP transport'''

	bosh_http_dict = {
		'http_uri': 'http://gajim.org:5280/http-bind',			
		'http_version': 'HTTP/1.1',
		'http_persistent': True,
		'add_proxy_headers': False
	}

	def _get_transport(self, http_dict, proxy_dict=None):
		return transports_nb.NonBlockingHTTP(
			raise_event=None,
			on_disconnect=None,
			idlequeue=self.idlequeue_thread.iq,
			estabilish_tls=False,
			certs=None,
			on_http_request_possible=lambda: None,
			on_persistent_fallback=None,
			http_dict=http_dict,
			proxy_dict=proxy_dict,
			)
	
	def test_parse_own_http_message(self):
		''' Build a HTTP message and try to parse it afterwards '''
		transport = self._get_transport(self.bosh_http_dict)

		data = "<test>Please don't fail!</test>"
		http_message = transport.build_http_message(data)
		statusline, headers, http_body = transport.parse_http_message(
			http_message)

		self.assertTrue(statusline and isinstance(statusline, list))
		self.assertTrue(headers and isinstance(headers, dict))
		self.assertEqual(data, http_body, msg='Input and output are different')

	def test_receive_http_message(self):
		''' Let _on_receive handle some http messages '''
		transport = self._get_transport(self.bosh_http_dict)
		
		header = ("HTTP/1.1 200 OK\r\nContent-Type: text/xml; charset=utf-8\r\n" +
			"Content-Length: 88\r\n\r\n")
		payload = "<test>Please don't fail!</test>"
		body = "<body xmlns='http://jabber.org/protocol/httpbind'>%s</body>" \
			% payload
		message = "%s%s" % (header, body)

		# try to receive in one go
		transport.onreceive(self.expect_receive(body, msg='Failed: In one go'))
		transport._on_receive(message)
		self.assertTrue(self.have_received_expected(), msg='Failed: In one go')

# FIXME: Not yet implemented.
#	def test_receive_http_message_in_chunks(self):
#		''' Let _on_receive handle some chunked http messages  '''
#		transport = self._get_transport(self.bosh_http_dict)
#		
#		header = ("HTTP/1.1 200 OK\r\nContent-Type: text/xml; charset=utf-8\r\n" +
#			"Content-Length: 88\r\n\r\n")
#		payload = "<test>Please don't fail!</test>"
#		body = "<body xmlns='http://jabber.org/protocol/httpbind'>%s</body>" \
#			% payload
#		message = "%s%s" % (header, body)
#
#		chunk1, chunk2, chunk3  = message[:20], message[20:73], message[73:]
#		nextmessage_chunk = "\r\n\r\nHTTP/1.1 200 OK\r\nContent-Type: text/x"
#		chunks = (chunk1, chunk2, chunk3, nextmessage_chunk)
#
#		transport.onreceive(self.expect_receive(body, msg='Failed: In chunks'))
#		for chunk in chunks:
#			transport._on_receive(chunk)
#		self.assertTrue(self.have_received_expected(), msg='Failed: In chunks')

if __name__ == '__main__':
	unittest.main()

# vim: se ts=3: