Package nbxmpp :: Module bosh
[hide private]
[frames] | no frames]

Source Code for Module nbxmpp.bosh

  1  ## bosh.py 
  2  ## 
  3  ## 
  4  ## Copyright (C) 2008 Tomas Karasek <tom.to.the.k@gmail.com> 
  5  ## 
  6  ## This file is part of Gajim. 
  7  ## 
  8  ## Gajim is free software; you can redistribute it and/or modify 
  9  ## it under the terms of the GNU General Public License as published 
 10  ## by the Free Software Foundation; version 3 only. 
 11  ## 
 12  ## Gajim is distributed in the hope that it will be useful, 
 13  ## but WITHOUT ANY WARRANTY; without even the implied warranty of 
 14  ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 15  ## GNU General Public License for more details. 
 16  ## 
 17  ## You should have received a copy of the GNU General Public License 
 18  ## along with Gajim.  If not, see <http://www.gnu.org/licenses/>. 
 19   
 20   
 21  import locale, random 
 22  from hashlib import sha1 
 23  from transports_nb import NonBlockingTransport, NonBlockingHTTPBOSH,\ 
 24          CONNECTED, CONNECTING, DISCONNECTED, DISCONNECTING,\ 
 25          urisplit, DISCONNECT_TIMEOUT_SECONDS 
 26  from protocol import BOSHBody, Protocol, NS_CLIENT 
 27  from simplexml import Node 
 28   
 29  import logging 
 30  log = logging.getLogger('nbxmpp.bosh') 
 31   
 32  KEY_COUNT = 10 
 33   
 34  # Fake file descriptor - it's used for setting read_timeout in idlequeue for 
 35  # BOSH Transport. In TCP-derived transports this is file descriptor of socket. 
 36  FAKE_DESCRIPTOR = -1337 
 37   
 38   
39 -class NonBlockingBOSH(NonBlockingTransport):
40 - def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls, certs, 41 xmpp_server, domain, bosh_dict, proxy_creds):
42 NonBlockingTransport.__init__(self, raise_event, on_disconnect, idlequeue, 43 estabilish_tls, certs) 44 45 self.bosh_sid = None 46 if locale.getdefaultlocale()[0]: 47 self.bosh_xml_lang = locale.getdefaultlocale()[0].split('_')[0] 48 else: 49 self.bosh_xml_lang = 'en' 50 51 self.http_version = 'HTTP/1.1' 52 self.http_persistent = True 53 self.http_pipelining = bosh_dict['bosh_http_pipelining'] 54 self.bosh_to = domain 55 56 self.route_host, self.route_port = xmpp_server 57 58 self.bosh_wait = bosh_dict['bosh_wait'] 59 if not self.http_pipelining: 60 self.bosh_hold = 1 61 else: 62 self.bosh_hold = bosh_dict['bosh_hold'] 63 self.bosh_requests = self.bosh_hold 64 self.bosh_uri = bosh_dict['bosh_uri'] 65 self.bosh_content = bosh_dict['bosh_content'] 66 self.over_proxy = bosh_dict['bosh_useproxy'] 67 if estabilish_tls: 68 self.bosh_secure = 'true' 69 else: 70 self.bosh_secure = 'false' 71 self.use_proxy_auth = bosh_dict['useauth'] 72 self.proxy_creds = proxy_creds 73 self.wait_cb_time = None 74 self.http_socks = [] 75 self.stanza_buffer = [] 76 self.prio_bosh_stanzas = [] 77 self.current_recv_handler = None 78 self.current_recv_socket = None 79 self.key_stack = None 80 self.ack_checker = None 81 self.after_init = False 82 self.proxy_dict = {} 83 if self.over_proxy and self.estabilish_tls: 84 self.proxy_dict['type'] = 'http' 85 # with SSL over proxy, we do HTTP CONNECT to proxy to open a channel to 86 # BOSH Connection Manager 87 host, port = urisplit(self.bosh_uri)[1:3] 88 self.proxy_dict['xmpp_server'] = (host, port) 89 self.proxy_dict['credentials'] = self.proxy_creds 90 91 # ssl variables 92 self.ssl_fingerprint_sha1 = [] 93 self.ssl_certificate = [] 94 self.ssl_errnum = [] 95 self.ssl_cert_pem = []
96 97
98 - def connect(self, conn_5tuple, on_connect, on_connect_failure):
99 NonBlockingTransport.connect(self, conn_5tuple, on_connect, on_connect_failure) 100 101 global FAKE_DESCRIPTOR 102 FAKE_DESCRIPTOR = FAKE_DESCRIPTOR - 1 103 self.fd = FAKE_DESCRIPTOR 104 105 self.stanza_buffer = [] 106 self.prio_bosh_stanzas = [] 107 108 self.key_stack = KeyStack(KEY_COUNT) 109 self.ack_checker = AckChecker() 110 self.after_init = True 111 112 self.http_socks.append(self.get_new_http_socket()) 113 self._tcp_connecting_started() 114 115 self.http_socks[0].connect( 116 conn_5tuple = conn_5tuple, 117 on_connect = self._on_connect, 118 on_connect_failure = self._on_connect_failure)
119
120 - def _on_connect(self):
121 self.peerhost = self.http_socks[0].peerhost 122 self.ssl_lib = self.http_socks[0].ssl_lib 123 NonBlockingTransport._on_connect(self)
124 125 126
127 - def set_timeout(self, timeout):
128 if self.get_state() != DISCONNECTED and self.fd != -1: 129 NonBlockingTransport.set_timeout(self, timeout) 130 else: 131 log.warn('set_timeout: TIMEOUT NOT SET: state is %s, fd is %s' % (self.get_state(), self.fd))
132
133 - def on_http_request_possible(self):
134 """ 135 Called when HTTP request it's possible to send a HTTP request. It can be when 136 socket is connected or when HTTP response arrived 137 138 There should be always one pending request to BOSH CM. 139 """ 140 log.debug('on_http_req possible, state:\n%s' % self.get_current_state()) 141 if self.get_state()==DISCONNECTED: return 142 143 #Hack for making the non-secure warning dialog work 144 if self._owner.got_features: 145 if (hasattr(self._owner, 'NonBlockingNonSASL') or hasattr(self._owner, 'SASL')): 146 self.send_BOSH(None) 147 else: 148 # If we already got features and no auth module was plugged yet, we are 149 # probably waiting for confirmation of the "not-secure-connection" dialog. 150 # We don't send HTTP request in that case. 151 # see http://lists.jabber.ru/pipermail/ejabberd/2008-August/004027.html 152 return 153 else: 154 self.send_BOSH(None)
155 156 157
158 - def get_socket_in(self, state):
159 """ 160 Get sockets in desired state 161 """ 162 for s in self.http_socks: 163 if s.get_state()==state: return s 164 return None
165 166
167 - def get_free_socket(self):
168 """ 169 Select and returns socket eligible for sending a data to 170 """ 171 if self.http_pipelining: 172 return self.get_socket_in(CONNECTED) 173 else: 174 last_recv_time, tmpsock = 0, None 175 for s in self.http_socks: 176 # we're interested only in CONNECTED socket with no requests pending 177 if s.get_state()==CONNECTED and s.pending_requests==0: 178 # if there's more of them, we want the one with the least recent data receive 179 # (lowest last_recv_time) 180 if (last_recv_time==0) or (s.last_recv_time < last_recv_time): 181 last_recv_time = s.last_recv_time 182 tmpsock = s 183 if tmpsock: 184 return tmpsock 185 else: 186 return None
187 188
189 - def send_BOSH(self, payload):
190 """ 191 Tries to send a stanza in payload by appeding it to a buffer and plugging a 192 free socket for writing. 193 """ 194 total_pending_reqs = sum([s.pending_requests for s in self.http_socks]) 195 196 # when called after HTTP response (Payload=None) and when there are already 197 # some pending requests and no data to send, or when the socket is 198 # disconnected, we do nothing 199 if payload is None and \ 200 total_pending_reqs > 0 and \ 201 self.stanza_buffer == [] and \ 202 self.prio_bosh_stanzas == [] or \ 203 self.get_state()==DISCONNECTED: 204 return 205 206 # Add xmlns to stanza to help ejabberd server 207 if payload and isinstance(payload, Protocol): 208 if not payload.getNamespace(): 209 payload.setNamespace(NS_CLIENT) 210 211 # now the payload is put to buffer and will be sent at some point 212 self.append_stanza(payload) 213 214 # if we're about to make more requests than allowed, we don't send - stanzas will be 215 # sent after HTTP response from CM, exception is when we're disconnecting - then we 216 # send anyway 217 if total_pending_reqs >= self.bosh_requests and self.get_state()!=DISCONNECTING: 218 log.warn('attemp to make more requests than allowed by Connection Manager:\n%s' % 219 self.get_current_state()) 220 return 221 222 # when there's free CONNECTED socket, we plug it for write and the data will 223 # be sent when write is possible 224 if self.get_free_socket(): 225 self.plug_socket() 226 return 227 228 # if there is a connecting socket, we just wait for when it connects, 229 # payload will be sent in a sec when the socket connects 230 if self.get_socket_in(CONNECTING): return 231 232 # being here means there are either DISCONNECTED sockets or all sockets are 233 # CONNECTED with too many pending requests 234 s = self.get_socket_in(DISCONNECTED) 235 236 # if we have DISCONNECTED socket, lets connect it and plug for send 237 if s: 238 self.connect_and_flush(s) 239 else: 240 # otherwise create and connect a new one 241 ss = self.get_new_http_socket() 242 self.http_socks.append(ss) 243 self.connect_and_flush(ss) 244 return
245
246 - def plug_socket(self):
247 stanza = None 248 s = self.get_free_socket() 249 if s: 250 s._plug_idle(writable=True, readable=True) 251 else: 252 log.error('=====!!!!!!!!====> Couldn\'t get free socket in plug_socket())')
253
254 - def build_stanza(self, socket):
255 """ 256 Build a BOSH body tag from data in buffers and adds key, rid and ack 257 attributes to it 258 259 This method is called from _do_send() of underlying transport. This is to 260 ensure rid and keys will be processed in correct order. If I generate 261 them before plugging a socket for write (and did it for two sockets/HTTP 262 connections) in parallel, they might be sent in wrong order, which 263 results in violating the BOSH session and server-side disconnect. 264 """ 265 if self.prio_bosh_stanzas: 266 stanza, add_payload = self.prio_bosh_stanzas.pop(0) 267 if add_payload: 268 stanza.setPayload(self.stanza_buffer) 269 self.stanza_buffer = [] 270 else: 271 stanza = self.boshify_stanzas(self.stanza_buffer) 272 self.stanza_buffer = [] 273 274 stanza = self.ack_checker.backup_stanza(stanza, socket) 275 276 key, newkey = self.key_stack.get() 277 if key: 278 stanza.setAttr('key', key) 279 if newkey: 280 stanza.setAttr('newkey', newkey) 281 282 283 log.info('sending msg with rid=%s to sock %s' % (stanza.getAttr('rid'), id(socket))) 284 self.renew_bosh_wait_timeout(self.bosh_wait + 3) 285 return stanza
286 287
288 - def on_bosh_wait_timeout(self):
289 log.error('Connection Manager didn\'t respond within %s + 3 seconds --> forcing disconnect' % self.bosh_wait) 290 self.disconnect()
291 292
293 - def renew_bosh_wait_timeout(self, timeout):
294 if self.wait_cb_time is not None: 295 self.remove_bosh_wait_timeout() 296 sched_time = self.idlequeue.set_alarm(self.on_bosh_wait_timeout, timeout) 297 self.wait_cb_time = sched_time
298
299 - def remove_bosh_wait_timeout(self):
300 self.idlequeue.remove_alarm( 301 self.on_bosh_wait_timeout, 302 self.wait_cb_time)
303
304 - def on_persistent_fallback(self, socket):
305 """ 306 Called from underlying transport when server closes TCP connection 307 308 :param socket: disconnected transport object 309 """ 310 if socket.http_persistent: 311 log.warn('Fallback to nonpersistent HTTP (no pipelining as well)') 312 socket.http_persistent = False 313 self.http_persistent = False 314 self.http_pipelining = False 315 socket.disconnect(do_callback=False) 316 self.connect_and_flush(socket) 317 else: 318 socket.disconnect()
319 320 321
322 - def handle_body_attrs(self, stanza_attrs):
323 """ 324 Called for each incoming body stanza from dispatcher. Checks body 325 attributes. 326 """ 327 self.remove_bosh_wait_timeout() 328 329 if self.after_init: 330 if stanza_attrs.has_key('sid'): 331 # session ID should be only in init response 332 self.bosh_sid = stanza_attrs['sid'] 333 334 if stanza_attrs.has_key('requests'): 335 self.bosh_requests = int(stanza_attrs['requests']) 336 337 if stanza_attrs.has_key('wait'): 338 self.bosh_wait = int(stanza_attrs['wait']) 339 self.after_init = False 340 341 ack = None 342 if stanza_attrs.has_key('ack'): 343 ack = stanza_attrs['ack'] 344 self.ack_checker.process_incoming_ack(ack=ack, 345 socket=self.current_recv_socket) 346 347 if stanza_attrs.has_key('type'): 348 if stanza_attrs['type'] in ['terminate', 'terminal']: 349 condition = 'n/a' 350 if stanza_attrs.has_key('condition'): 351 condition = stanza_attrs['condition'] 352 if condition == 'n/a': 353 log.info('Received sesion-ending terminating stanza') 354 else: 355 log.error('Received terminating stanza: %s - %s' % (condition, 356 bosh_errors[condition])) 357 self.disconnect() 358 return 359 360 if stanza_attrs['type'] == 'error': 361 # recoverable error 362 pass 363 return
364 365
366 - def append_stanza(self, stanza):
367 """ 368 Append stanza to a buffer to send 369 """ 370 if stanza: 371 if isinstance(stanza, tuple): 372 # stanza is tuple of BOSH stanza and bool value for whether to add payload 373 self.prio_bosh_stanzas.append(stanza) 374 else: 375 # stanza is XMPP stanza. Will be boshified before send. 376 self.stanza_buffer.append(stanza)
377 378
379 - def send(self, stanza, now=False):
380 self.send_BOSH(stanza)
381 382 383
384 - def get_current_state(self):
385 t = '------ SOCKET_ID\tSOCKET_STATE\tPENDING_REQS\n' 386 for s in self.http_socks: 387 t = '%s------ %s\t%s\t%s\n' % (t, id(s), s.get_state(), s.pending_requests) 388 t = '%s------ prio stanzas: %s, queued XMPP stanzas: %s, not_acked stanzas: %s' \ 389 % (t, self.prio_bosh_stanzas, self.stanza_buffer, 390 self.ack_checker.get_not_acked_rids()) 391 return t
392 393
394 - def connect_and_flush(self, socket):
395 socket.connect( 396 conn_5tuple = self.conn_5tuple, 397 on_connect = self.on_http_request_possible, 398 on_connect_failure = self.disconnect)
399 400
401 - def boshify_stanzas(self, stanzas=[], body_attrs=None):
402 """ 403 Wraps zero to many stanzas by body tag with xmlns and sid 404 """ 405 log.debug('boshify_staza - type is: %s, stanza is %s' % (type(stanzas), stanzas)) 406 tag = BOSHBody(attrs={'sid': self.bosh_sid}) 407 tag.setPayload(stanzas) 408 return tag
409 410
411 - def send_init(self, after_SASL=False):
412 if after_SASL: 413 t = BOSHBody( 414 attrs={ 'to': self.bosh_to, 415 'sid': self.bosh_sid, 416 'xml:lang': self.bosh_xml_lang, 417 'xmpp:restart': 'true', 418 'secure': self.bosh_secure, 419 'xmlns:xmpp': 'urn:xmpp:xbosh'}) 420 else: 421 t = BOSHBody( 422 attrs={ 'content': self.bosh_content, 423 'hold': str(self.bosh_hold), 424 'route': 'xmpp:%s:%s' % (self.route_host, self.route_port), 425 'to': self.bosh_to, 426 'wait': str(self.bosh_wait), 427 'xml:lang': self.bosh_xml_lang, 428 'xmpp:version': '1.0', 429 'ver': '1.6', 430 'xmlns:xmpp': 'urn:xmpp:xbosh'}) 431 self.send_BOSH((t, True))
432
433 - def start_disconnect(self):
434 NonBlockingTransport.start_disconnect(self) 435 self.renew_bosh_wait_timeout(DISCONNECT_TIMEOUT_SECONDS) 436 self.send_BOSH( 437 (BOSHBody(attrs={'sid': self.bosh_sid, 'type': 'terminate'}), True))
438 439
440 - def get_new_http_socket(self):
441 http_dict = {'http_uri': self.bosh_uri, 442 'http_version': self.http_version, 443 'http_persistent': self.http_persistent, 444 'add_proxy_headers': self.over_proxy and not self.estabilish_tls} 445 if self.use_proxy_auth: 446 http_dict['proxy_user'], http_dict['proxy_pass'] = self.proxy_creds 447 448 s = NonBlockingHTTPBOSH( 449 raise_event=self.raise_event, 450 on_disconnect=self.disconnect, 451 idlequeue = self.idlequeue, 452 estabilish_tls = self.estabilish_tls, 453 certs = self.certs, 454 on_http_request_possible = self.on_http_request_possible, 455 http_dict = http_dict, 456 proxy_dict = self.proxy_dict, 457 on_persistent_fallback = self.on_persistent_fallback) 458 459 s.onreceive(self.on_received_http) 460 s.set_stanza_build_cb(self.build_stanza) 461 return s
462 463
464 - def onreceive(self, recv_handler):
465 if recv_handler is None: 466 recv_handler = self._owner.Dispatcher.ProcessNonBlocking 467 self.current_recv_handler = recv_handler
468 469
470 - def on_received_http(self, data, socket):
471 self.current_recv_socket = socket 472 self.current_recv_handler(data)
473 474
475 - def disconnect(self, do_callback=True):
476 self.remove_bosh_wait_timeout() 477 if self.get_state() == DISCONNECTED: return 478 self.fd = -1 479 for s in self.http_socks: 480 s.disconnect(do_callback=False) 481 NonBlockingTransport.disconnect(self, do_callback)
482 483
484 -def get_rand_number():
485 # with 50-bit random initial rid, session would have to go up 486 # to 7881299347898368 messages to raise rid over 2**53 487 # (see http://www.xmpp.org/extensions/xep-0124.html#rids) 488 # it's also used for sequence key initialization 489 r = random.Random() 490 r.seed() 491 return r.getrandbits(50)
492 493 494
495 -class AckChecker():
496 """ 497 Class for generating rids and generating and checking acknowledgements in 498 BOSH messages 499 """
500 - def __init__(self):
501 self.rid = get_rand_number() 502 self.ack = 1 503 self.last_rids = {} 504 self.not_acked = []
505 506
507 - def get_not_acked_rids(self): return [rid for rid, st in self.not_acked]
508
509 - def backup_stanza(self, stanza, socket):
510 socket.pending_requests += 1 511 rid = self.get_rid() 512 self.not_acked.append((rid, stanza)) 513 stanza.setAttr('rid', str(rid)) 514 self.last_rids[socket]=rid 515 516 if self.rid != self.ack + 1: 517 stanza.setAttr('ack', str(self.ack)) 518 return stanza
519
520 - def process_incoming_ack(self, socket, ack=None):
521 socket.pending_requests -= 1 522 if ack: 523 ack = int(ack) 524 else: 525 ack = self.last_rids[socket] 526 527 i = len([rid for rid, st in self.not_acked if ack >= rid]) 528 self.not_acked = self.not_acked[i:] 529 530 self.ack = ack
531 532
533 - def get_rid(self):
534 self.rid = self.rid + 1 535 return self.rid
536 537 538 539 540
541 -class KeyStack():
542 """ 543 Class implementing key sequences for BOSH messages 544 """
545 - def __init__(self, count):
546 self.count = count 547 self.keys = [] 548 self.reset() 549 self.first_call = True
550
551 - def reset(self):
552 seed = str(get_rand_number()) 553 self.keys = [sha1(seed).hexdigest()] 554 for i in range(self.count-1): 555 curr_seed = self.keys[i] 556 self.keys.append(sha1(curr_seed).hexdigest())
557
558 - def get(self):
559 if self.first_call: 560 self.first_call = False 561 return (None, self.keys.pop()) 562 563 if len(self.keys)>1: 564 return (self.keys.pop(), None) 565 else: 566 last_key = self.keys.pop() 567 self.reset() 568 new_key = self.keys.pop() 569 return (last_key, new_key)
570 571 # http://www.xmpp.org/extensions/xep-0124.html#errorstatus-terminal 572 bosh_errors = { 573 'n/a': 'none or unknown condition in terminating body stanza', 574 'bad-request': 'The format of an HTTP header or binding element received from the client is unacceptable (e.g., syntax error), or Script Syntax is not supported.', 575 'host-gone': 'The target domain specified in the "to" attribute or the target host or port specified in the "route" attribute is no longer serviced by the connection manager.', 576 'host-unknown': 'The target domain specified in the "to" attribute or the target host or port specified in the "route" attribute is unknown to the connection manager.', 577 'improper-addressing': 'The initialization element lacks a "to" or "route" attribute (or the attribute has no value) but the connection manager requires one.', 578 'internal-server-error': 'The connection manager has experienced an internal error that prevents it from servicing the request.', 579 'item-not-found': '(1) "sid" is not valid, (2) "stream" is not valid, (3) "rid" is larger than the upper limit of the expected window, (4) connection manager is unable to resend response, (5) "key" sequence is invalid', 580 'other-request': 'Another request being processed at the same time as this request caused the session to terminate.', 581 'policy-violation': 'The client has broken the session rules (polling too frequently, requesting too frequently, too many simultaneous requests).', 582 'remote-connection-failed': 'The connection manager was unable to connect to, or unable to connect securely to, or has lost its connection to, the server.', 583 'remote-stream-error': 'Encapsulates an error in the protocol being transported.', 584 'see-other-uri': 'The connection manager does not operate at this URI (e.g., the connection manager accepts only SSL or TLS connections at some https: URI rather than the http: URI requested by the client). The client may try POSTing to the URI in the content of the <uri/> child element.', 585 'system-shutdown': 'The connection manager is being shut down. All active HTTP sessions are being terminated. No new sessions can be created.', 586 'undefined-condition': 'The error is not one of those defined herein; the connection manager SHOULD include application-specific information in the content of the <body/> wrapper.' 587 } 588