1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 import locale, random
22 from hashlib import sha1
23 from transports_nb import NonBlockingTransport, NonBlockingHTTPBOSH,\
24 CONNECTED, CONNECTING, DISCONNECTED, DISCONNECTING,\
25 urisplit, DISCONNECT_TIMEOUT_SECONDS
26 from protocol import BOSHBody, Protocol, NS_CLIENT
27 from simplexml import Node
28
29 import logging
30 log = logging.getLogger('nbxmpp.bosh')
31
32 KEY_COUNT = 10
33
34
35
36 FAKE_DESCRIPTOR = -1337
37
38
40 - def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls, certs,
41 xmpp_server, domain, bosh_dict, proxy_creds):
42 NonBlockingTransport.__init__(self, raise_event, on_disconnect, idlequeue,
43 estabilish_tls, certs)
44
45 self.bosh_sid = None
46 if locale.getdefaultlocale()[0]:
47 self.bosh_xml_lang = locale.getdefaultlocale()[0].split('_')[0]
48 else:
49 self.bosh_xml_lang = 'en'
50
51 self.http_version = 'HTTP/1.1'
52 self.http_persistent = True
53 self.http_pipelining = bosh_dict['bosh_http_pipelining']
54 self.bosh_to = domain
55
56 self.route_host, self.route_port = xmpp_server
57
58 self.bosh_wait = bosh_dict['bosh_wait']
59 if not self.http_pipelining:
60 self.bosh_hold = 1
61 else:
62 self.bosh_hold = bosh_dict['bosh_hold']
63 self.bosh_requests = self.bosh_hold
64 self.bosh_uri = bosh_dict['bosh_uri']
65 self.bosh_content = bosh_dict['bosh_content']
66 self.over_proxy = bosh_dict['bosh_useproxy']
67 if estabilish_tls:
68 self.bosh_secure = 'true'
69 else:
70 self.bosh_secure = 'false'
71 self.use_proxy_auth = bosh_dict['useauth']
72 self.proxy_creds = proxy_creds
73 self.wait_cb_time = None
74 self.http_socks = []
75 self.stanza_buffer = []
76 self.prio_bosh_stanzas = []
77 self.current_recv_handler = None
78 self.current_recv_socket = None
79 self.key_stack = None
80 self.ack_checker = None
81 self.after_init = False
82 self.proxy_dict = {}
83 if self.over_proxy and self.estabilish_tls:
84 self.proxy_dict['type'] = 'http'
85
86
87 host, port = urisplit(self.bosh_uri)[1:3]
88 self.proxy_dict['xmpp_server'] = (host, port)
89 self.proxy_dict['credentials'] = self.proxy_creds
90
91
92 self.ssl_fingerprint_sha1 = []
93 self.ssl_certificate = []
94 self.ssl_errnum = []
95 self.ssl_cert_pem = []
96
97
98 - def connect(self, conn_5tuple, on_connect, on_connect_failure):
119
124
125
126
132
134 """
135 Called when HTTP request it's possible to send a HTTP request. It can be when
136 socket is connected or when HTTP response arrived
137
138 There should be always one pending request to BOSH CM.
139 """
140 log.debug('on_http_req possible, state:\n%s' % self.get_current_state())
141 if self.get_state()==DISCONNECTED: return
142
143
144 if self._owner.got_features:
145 if (hasattr(self._owner, 'NonBlockingNonSASL') or hasattr(self._owner, 'SASL')):
146 self.send_BOSH(None)
147 else:
148
149
150
151
152 return
153 else:
154 self.send_BOSH(None)
155
156
157
159 """
160 Get sockets in desired state
161 """
162 for s in self.http_socks:
163 if s.get_state()==state: return s
164 return None
165
166
168 """
169 Select and returns socket eligible for sending a data to
170 """
171 if self.http_pipelining:
172 return self.get_socket_in(CONNECTED)
173 else:
174 last_recv_time, tmpsock = 0, None
175 for s in self.http_socks:
176
177 if s.get_state()==CONNECTED and s.pending_requests==0:
178
179
180 if (last_recv_time==0) or (s.last_recv_time < last_recv_time):
181 last_recv_time = s.last_recv_time
182 tmpsock = s
183 if tmpsock:
184 return tmpsock
185 else:
186 return None
187
188
245
247 stanza = None
248 s = self.get_free_socket()
249 if s:
250 s._plug_idle(writable=True, readable=True)
251 else:
252 log.error('=====!!!!!!!!====> Couldn\'t get free socket in plug_socket())')
253
255 """
256 Build a BOSH body tag from data in buffers and adds key, rid and ack
257 attributes to it
258
259 This method is called from _do_send() of underlying transport. This is to
260 ensure rid and keys will be processed in correct order. If I generate
261 them before plugging a socket for write (and did it for two sockets/HTTP
262 connections) in parallel, they might be sent in wrong order, which
263 results in violating the BOSH session and server-side disconnect.
264 """
265 if self.prio_bosh_stanzas:
266 stanza, add_payload = self.prio_bosh_stanzas.pop(0)
267 if add_payload:
268 stanza.setPayload(self.stanza_buffer)
269 self.stanza_buffer = []
270 else:
271 stanza = self.boshify_stanzas(self.stanza_buffer)
272 self.stanza_buffer = []
273
274 stanza = self.ack_checker.backup_stanza(stanza, socket)
275
276 key, newkey = self.key_stack.get()
277 if key:
278 stanza.setAttr('key', key)
279 if newkey:
280 stanza.setAttr('newkey', newkey)
281
282
283 log.info('sending msg with rid=%s to sock %s' % (stanza.getAttr('rid'), id(socket)))
284 self.renew_bosh_wait_timeout(self.bosh_wait + 3)
285 return stanza
286
287
289 log.error('Connection Manager didn\'t respond within %s + 3 seconds --> forcing disconnect' % self.bosh_wait)
290 self.disconnect()
291
292
298
303
305 """
306 Called from underlying transport when server closes TCP connection
307
308 :param socket: disconnected transport object
309 """
310 if socket.http_persistent:
311 log.warn('Fallback to nonpersistent HTTP (no pipelining as well)')
312 socket.http_persistent = False
313 self.http_persistent = False
314 self.http_pipelining = False
315 socket.disconnect(do_callback=False)
316 self.connect_and_flush(socket)
317 else:
318 socket.disconnect()
319
320
321
322 - def handle_body_attrs(self, stanza_attrs):
323 """
324 Called for each incoming body stanza from dispatcher. Checks body
325 attributes.
326 """
327 self.remove_bosh_wait_timeout()
328
329 if self.after_init:
330 if stanza_attrs.has_key('sid'):
331
332 self.bosh_sid = stanza_attrs['sid']
333
334 if stanza_attrs.has_key('requests'):
335 self.bosh_requests = int(stanza_attrs['requests'])
336
337 if stanza_attrs.has_key('wait'):
338 self.bosh_wait = int(stanza_attrs['wait'])
339 self.after_init = False
340
341 ack = None
342 if stanza_attrs.has_key('ack'):
343 ack = stanza_attrs['ack']
344 self.ack_checker.process_incoming_ack(ack=ack,
345 socket=self.current_recv_socket)
346
347 if stanza_attrs.has_key('type'):
348 if stanza_attrs['type'] in ['terminate', 'terminal']:
349 condition = 'n/a'
350 if stanza_attrs.has_key('condition'):
351 condition = stanza_attrs['condition']
352 if condition == 'n/a':
353 log.info('Received sesion-ending terminating stanza')
354 else:
355 log.error('Received terminating stanza: %s - %s' % (condition,
356 bosh_errors[condition]))
357 self.disconnect()
358 return
359
360 if stanza_attrs['type'] == 'error':
361
362 pass
363 return
364
365
367 """
368 Append stanza to a buffer to send
369 """
370 if stanza:
371 if isinstance(stanza, tuple):
372
373 self.prio_bosh_stanzas.append(stanza)
374 else:
375
376 self.stanza_buffer.append(stanza)
377
378
379 - def send(self, stanza, now=False):
381
382
383
385 t = '------ SOCKET_ID\tSOCKET_STATE\tPENDING_REQS\n'
386 for s in self.http_socks:
387 t = '%s------ %s\t%s\t%s\n' % (t, id(s), s.get_state(), s.pending_requests)
388 t = '%s------ prio stanzas: %s, queued XMPP stanzas: %s, not_acked stanzas: %s' \
389 % (t, self.prio_bosh_stanzas, self.stanza_buffer,
390 self.ack_checker.get_not_acked_rids())
391 return t
392
393
399
400
402 """
403 Wraps zero to many stanzas by body tag with xmlns and sid
404 """
405 log.debug('boshify_staza - type is: %s, stanza is %s' % (type(stanzas), stanzas))
406 tag = BOSHBody(attrs={'sid': self.bosh_sid})
407 tag.setPayload(stanzas)
408 return tag
409
410
412 if after_SASL:
413 t = BOSHBody(
414 attrs={ 'to': self.bosh_to,
415 'sid': self.bosh_sid,
416 'xml:lang': self.bosh_xml_lang,
417 'xmpp:restart': 'true',
418 'secure': self.bosh_secure,
419 'xmlns:xmpp': 'urn:xmpp:xbosh'})
420 else:
421 t = BOSHBody(
422 attrs={ 'content': self.bosh_content,
423 'hold': str(self.bosh_hold),
424 'route': 'xmpp:%s:%s' % (self.route_host, self.route_port),
425 'to': self.bosh_to,
426 'wait': str(self.bosh_wait),
427 'xml:lang': self.bosh_xml_lang,
428 'xmpp:version': '1.0',
429 'ver': '1.6',
430 'xmlns:xmpp': 'urn:xmpp:xbosh'})
431 self.send_BOSH((t, True))
432
438
439
441 http_dict = {'http_uri': self.bosh_uri,
442 'http_version': self.http_version,
443 'http_persistent': self.http_persistent,
444 'add_proxy_headers': self.over_proxy and not self.estabilish_tls}
445 if self.use_proxy_auth:
446 http_dict['proxy_user'], http_dict['proxy_pass'] = self.proxy_creds
447
448 s = NonBlockingHTTPBOSH(
449 raise_event=self.raise_event,
450 on_disconnect=self.disconnect,
451 idlequeue = self.idlequeue,
452 estabilish_tls = self.estabilish_tls,
453 certs = self.certs,
454 on_http_request_possible = self.on_http_request_possible,
455 http_dict = http_dict,
456 proxy_dict = self.proxy_dict,
457 on_persistent_fallback = self.on_persistent_fallback)
458
459 s.onreceive(self.on_received_http)
460 s.set_stanza_build_cb(self.build_stanza)
461 return s
462
463
468
469
471 self.current_recv_socket = socket
472 self.current_recv_handler(data)
473
474
482
483
485
486
487
488
489 r = random.Random()
490 r.seed()
491 return r.getrandbits(50)
492
493
494
496 """
497 Class for generating rids and generating and checking acknowledgements in
498 BOSH messages
499 """
501 self.rid = get_rand_number()
502 self.ack = 1
503 self.last_rids = {}
504 self.not_acked = []
505
506
508
510 socket.pending_requests += 1
511 rid = self.get_rid()
512 self.not_acked.append((rid, stanza))
513 stanza.setAttr('rid', str(rid))
514 self.last_rids[socket]=rid
515
516 if self.rid != self.ack + 1:
517 stanza.setAttr('ack', str(self.ack))
518 return stanza
519
521 socket.pending_requests -= 1
522 if ack:
523 ack = int(ack)
524 else:
525 ack = self.last_rids[socket]
526
527 i = len([rid for rid, st in self.not_acked if ack >= rid])
528 self.not_acked = self.not_acked[i:]
529
530 self.ack = ack
531
532
534 self.rid = self.rid + 1
535 return self.rid
536
537
538
539
540
542 """
543 Class implementing key sequences for BOSH messages
544 """
546 self.count = count
547 self.keys = []
548 self.reset()
549 self.first_call = True
550
552 seed = str(get_rand_number())
553 self.keys = [sha1(seed).hexdigest()]
554 for i in range(self.count-1):
555 curr_seed = self.keys[i]
556 self.keys.append(sha1(curr_seed).hexdigest())
557
559 if self.first_call:
560 self.first_call = False
561 return (None, self.keys.pop())
562
563 if len(self.keys)>1:
564 return (self.keys.pop(), None)
565 else:
566 last_key = self.keys.pop()
567 self.reset()
568 new_key = self.keys.pop()
569 return (last_key, new_key)
570
571
572 bosh_errors = {
573 'n/a': 'none or unknown condition in terminating body stanza',
574 'bad-request': 'The format of an HTTP header or binding element received from the client is unacceptable (e.g., syntax error), or Script Syntax is not supported.',
575 'host-gone': 'The target domain specified in the "to" attribute or the target host or port specified in the "route" attribute is no longer serviced by the connection manager.',
576 'host-unknown': 'The target domain specified in the "to" attribute or the target host or port specified in the "route" attribute is unknown to the connection manager.',
577 'improper-addressing': 'The initialization element lacks a "to" or "route" attribute (or the attribute has no value) but the connection manager requires one.',
578 'internal-server-error': 'The connection manager has experienced an internal error that prevents it from servicing the request.',
579 'item-not-found': '(1) "sid" is not valid, (2) "stream" is not valid, (3) "rid" is larger than the upper limit of the expected window, (4) connection manager is unable to resend response, (5) "key" sequence is invalid',
580 'other-request': 'Another request being processed at the same time as this request caused the session to terminate.',
581 'policy-violation': 'The client has broken the session rules (polling too frequently, requesting too frequently, too many simultaneous requests).',
582 'remote-connection-failed': 'The connection manager was unable to connect to, or unable to connect securely to, or has lost its connection to, the server.',
583 'remote-stream-error': 'Encapsulates an error in the protocol being transported.',
584 'see-other-uri': 'The connection manager does not operate at this URI (e.g., the connection manager accepts only SSL or TLS connections at some https: URI rather than the http: URI requested by the client). The client may try POSTing to the URI in the content of the <uri/> child element.',
585 'system-shutdown': 'The connection manager is being shut down. All active HTTP sessions are being terminated. No new sessions can be created.',
586 'undefined-condition': 'The error is not one of those defined herein; the connection manager SHOULD include application-specific information in the content of the <body/> wrapper.'
587 }
588