Package nbxmpp :: Module bosh
[hide private]
[frames] | no frames]

Source Code for Module nbxmpp.bosh

  1  ## bosh.py 
  2  ## 
  3  ## 
  4  ## Copyright (C) 2008 Tomas Karasek <tom.to.the.k@gmail.com> 
  5  ## 
  6  ## This file is part of Gajim. 
  7  ## 
  8  ## Gajim is free software; you can redistribute it and/or modify 
  9  ## it under the terms of the GNU General Public License as published 
 10  ## by the Free Software Foundation; version 3 only. 
 11  ## 
 12  ## Gajim is distributed in the hope that it will be useful, 
 13  ## but WITHOUT ANY WARRANTY; without even the implied warranty of 
 14  ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 15  ## GNU General Public License for more details. 
 16  ## 
 17  ## You should have received a copy of the GNU General Public License 
 18  ## along with Gajim.  If not, see <http://www.gnu.org/licenses/>. 
 19   
 20   
 21  import locale, random 
 22  from hashlib import sha1 
 23  from transports_nb import NonBlockingTransport, NonBlockingHTTPBOSH,\ 
 24          CONNECTED, CONNECTING, DISCONNECTED, DISCONNECTING,\ 
 25          urisplit, DISCONNECT_TIMEOUT_SECONDS 
 26  from protocol import BOSHBody, Protocol, NS_CLIENT 
 27  from simplexml import Node 
 28   
 29  import logging 
 30  log = logging.getLogger('nbxmpp.bosh') 
 31   
 32  KEY_COUNT = 10 
 33   
 34  # Fake file descriptor - it's used for setting read_timeout in idlequeue for 
 35  # BOSH Transport. In TCP-derived transports this is file descriptor of socket. 
 36  FAKE_DESCRIPTOR = -1337 
 37   
 38   
39 -class NonBlockingBOSH(NonBlockingTransport):
40 - def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls, certs, 41 xmpp_server, domain, bosh_dict, proxy_creds):
42 NonBlockingTransport.__init__(self, raise_event, on_disconnect, idlequeue, 43 estabilish_tls, certs) 44 45 self.bosh_sid = None 46 if locale.getdefaultlocale()[0]: 47 self.bosh_xml_lang = locale.getdefaultlocale()[0].split('_')[0] 48 else: 49 self.bosh_xml_lang = 'en' 50 51 self.http_version = 'HTTP/1.1' 52 self.http_persistent = True 53 self.http_pipelining = bosh_dict['bosh_http_pipelining'] 54 self.bosh_to = domain 55 56 self.route_host, self.route_port = xmpp_server 57 58 self.bosh_wait = bosh_dict['bosh_wait'] 59 if not self.http_pipelining: 60 self.bosh_hold = 1 61 else: 62 self.bosh_hold = bosh_dict['bosh_hold'] 63 self.bosh_requests = self.bosh_hold 64 self.bosh_uri = bosh_dict['bosh_uri'] 65 self.bosh_content = bosh_dict['bosh_content'] 66 self.over_proxy = bosh_dict['bosh_useproxy'] 67 if estabilish_tls: 68 self.bosh_secure = 'true' 69 else: 70 self.bosh_secure = 'false' 71 self.use_proxy_auth = bosh_dict['useauth'] 72 self.proxy_creds = proxy_creds 73 self.wait_cb_time = None 74 self.http_socks = [] 75 self.stanza_buffer = [] 76 self.prio_bosh_stanzas = [] 77 self.current_recv_handler = None 78 self.current_recv_socket = None 79 self.key_stack = None 80 self.ack_checker = None 81 self.after_init = False 82 self.proxy_dict = {} 83 if self.over_proxy and self.estabilish_tls: 84 self.proxy_dict['type'] = 'http' 85 # with SSL over proxy, we do HTTP CONNECT to proxy to open a channel to 86 # BOSH Connection Manager 87 host, port = urisplit(self.bosh_uri)[1:3] 88 self.proxy_dict['xmpp_server'] = (host, port) 89 self.proxy_dict['credentials'] = self.proxy_creds
90 91
92 - def connect(self, conn_5tuple, on_connect, on_connect_failure):
93 NonBlockingTransport.connect(self, conn_5tuple, on_connect, on_connect_failure) 94 95 global FAKE_DESCRIPTOR 96 FAKE_DESCRIPTOR = FAKE_DESCRIPTOR - 1 97 self.fd = FAKE_DESCRIPTOR 98 99 self.stanza_buffer = [] 100 self.prio_bosh_stanzas = [] 101 102 self.key_stack = KeyStack(KEY_COUNT) 103 self.ack_checker = AckChecker() 104 self.after_init = True 105 106 self.http_socks.append(self.get_new_http_socket()) 107 self._tcp_connecting_started() 108 109 self.http_socks[0].connect( 110 conn_5tuple = conn_5tuple, 111 on_connect = self._on_connect, 112 on_connect_failure = self._on_connect_failure)
113
114 - def _on_connect(self):
115 self.peerhost = self.http_socks[0].peerhost 116 self.ssl_lib = self.http_socks[0].ssl_lib 117 NonBlockingTransport._on_connect(self)
118 119 120
121 - def set_timeout(self, timeout):
122 if self.get_state() != DISCONNECTED and self.fd != -1: 123 NonBlockingTransport.set_timeout(self, timeout) 124 else: 125 log.warn('set_timeout: TIMEOUT NOT SET: state is %s, fd is %s' % (self.get_state(), self.fd))
126
127 - def on_http_request_possible(self):
128 """ 129 Called when HTTP request it's possible to send a HTTP request. It can be when 130 socket is connected or when HTTP response arrived 131 132 There should be always one pending request to BOSH CM. 133 """ 134 log.debug('on_http_req possible, state:\n%s' % self.get_current_state()) 135 if self.get_state()==DISCONNECTED: return 136 137 #Hack for making the non-secure warning dialog work 138 if self._owner.got_features: 139 if (hasattr(self._owner, 'NonBlockingNonSASL') or hasattr(self._owner, 'SASL')): 140 self.send_BOSH(None) 141 else: 142 # If we already got features and no auth module was plugged yet, we are 143 # probably waiting for confirmation of the "not-secure-connection" dialog. 144 # We don't send HTTP request in that case. 145 # see http://lists.jabber.ru/pipermail/ejabberd/2008-August/004027.html 146 return 147 else: 148 self.send_BOSH(None)
149 150 151
152 - def get_socket_in(self, state):
153 """ 154 Get sockets in desired state 155 """ 156 for s in self.http_socks: 157 if s.get_state()==state: return s 158 return None
159 160
161 - def get_free_socket(self):
162 """ 163 Select and returns socket eligible for sending a data to 164 """ 165 if self.http_pipelining: 166 return self.get_socket_in(CONNECTED) 167 else: 168 last_recv_time, tmpsock = 0, None 169 for s in self.http_socks: 170 # we're interested only in CONNECTED socket with no requests pending 171 if s.get_state()==CONNECTED and s.pending_requests==0: 172 # if there's more of them, we want the one with the least recent data receive 173 # (lowest last_recv_time) 174 if (last_recv_time==0) or (s.last_recv_time < last_recv_time): 175 last_recv_time = s.last_recv_time 176 tmpsock = s 177 if tmpsock: 178 return tmpsock 179 else: 180 return None
181 182
183 - def send_BOSH(self, payload):
184 """ 185 Tries to send a stanza in payload by appeding it to a buffer and plugging a 186 free socket for writing. 187 """ 188 total_pending_reqs = sum([s.pending_requests for s in self.http_socks]) 189 190 # when called after HTTP response (Payload=None) and when there are already 191 # some pending requests and no data to send, or when the socket is 192 # disconnected, we do nothing 193 if payload is None and \ 194 total_pending_reqs > 0 and \ 195 self.stanza_buffer == [] and \ 196 self.prio_bosh_stanzas == [] or \ 197 self.get_state()==DISCONNECTED: 198 return 199 200 # Add xmlns to stanza to help ejabberd server 201 if payload and isinstance(payload, Protocol): 202 if not payload.getNamespace(): 203 payload.setNamespace(NS_CLIENT) 204 205 # now the payload is put to buffer and will be sent at some point 206 self.append_stanza(payload) 207 208 # if we're about to make more requests than allowed, we don't send - stanzas will be 209 # sent after HTTP response from CM, exception is when we're disconnecting - then we 210 # send anyway 211 if total_pending_reqs >= self.bosh_requests and self.get_state()!=DISCONNECTING: 212 log.warn('attemp to make more requests than allowed by Connection Manager:\n%s' % 213 self.get_current_state()) 214 return 215 216 # when there's free CONNECTED socket, we plug it for write and the data will 217 # be sent when write is possible 218 if self.get_free_socket(): 219 self.plug_socket() 220 return 221 222 # if there is a connecting socket, we just wait for when it connects, 223 # payload will be sent in a sec when the socket connects 224 if self.get_socket_in(CONNECTING): return 225 226 # being here means there are either DISCONNECTED sockets or all sockets are 227 # CONNECTED with too many pending requests 228 s = self.get_socket_in(DISCONNECTED) 229 230 # if we have DISCONNECTED socket, lets connect it and plug for send 231 if s: 232 self.connect_and_flush(s) 233 else: 234 # otherwise create and connect a new one 235 ss = self.get_new_http_socket() 236 self.http_socks.append(ss) 237 self.connect_and_flush(ss) 238 return
239
240 - def plug_socket(self):
241 stanza = None 242 s = self.get_free_socket() 243 if s: 244 s._plug_idle(writable=True, readable=True) 245 else: 246 log.error('=====!!!!!!!!====> Couldn\'t get free socket in plug_socket())')
247
248 - def build_stanza(self, socket):
249 """ 250 Build a BOSH body tag from data in buffers and adds key, rid and ack 251 attributes to it 252 253 This method is called from _do_send() of underlying transport. This is to 254 ensure rid and keys will be processed in correct order. If I generate 255 them before plugging a socket for write (and did it for two sockets/HTTP 256 connections) in parallel, they might be sent in wrong order, which 257 results in violating the BOSH session and server-side disconnect. 258 """ 259 if self.prio_bosh_stanzas: 260 stanza, add_payload = self.prio_bosh_stanzas.pop(0) 261 if add_payload: 262 stanza.setPayload(self.stanza_buffer) 263 self.stanza_buffer = [] 264 else: 265 stanza = self.boshify_stanzas(self.stanza_buffer) 266 self.stanza_buffer = [] 267 268 stanza = self.ack_checker.backup_stanza(stanza, socket) 269 270 key, newkey = self.key_stack.get() 271 if key: 272 stanza.setAttr('key', key) 273 if newkey: 274 stanza.setAttr('newkey', newkey) 275 276 277 log.info('sending msg with rid=%s to sock %s' % (stanza.getAttr('rid'), id(socket))) 278 self.renew_bosh_wait_timeout(self.bosh_wait + 3) 279 return stanza
280 281
282 - def on_bosh_wait_timeout(self):
283 log.error('Connection Manager didn\'t respond within %s + 3 seconds --> forcing disconnect' % self.bosh_wait) 284 self.disconnect()
285 286
287 - def renew_bosh_wait_timeout(self, timeout):
288 if self.wait_cb_time is not None: 289 self.remove_bosh_wait_timeout() 290 sched_time = self.idlequeue.set_alarm(self.on_bosh_wait_timeout, timeout) 291 self.wait_cb_time = sched_time
292
293 - def remove_bosh_wait_timeout(self):
294 self.idlequeue.remove_alarm( 295 self.on_bosh_wait_timeout, 296 self.wait_cb_time)
297
298 - def on_persistent_fallback(self, socket):
299 """ 300 Called from underlying transport when server closes TCP connection 301 302 :param socket: disconnected transport object 303 """ 304 if socket.http_persistent: 305 log.warn('Fallback to nonpersistent HTTP (no pipelining as well)') 306 socket.http_persistent = False 307 self.http_persistent = False 308 self.http_pipelining = False 309 socket.disconnect(do_callback=False) 310 self.connect_and_flush(socket) 311 else: 312 socket.disconnect()
313 314 315
316 - def handle_body_attrs(self, stanza_attrs):
317 """ 318 Called for each incoming body stanza from dispatcher. Checks body 319 attributes. 320 """ 321 self.remove_bosh_wait_timeout() 322 323 if self.after_init: 324 if stanza_attrs.has_key('sid'): 325 # session ID should be only in init response 326 self.bosh_sid = stanza_attrs['sid'] 327 328 if stanza_attrs.has_key('requests'): 329 self.bosh_requests = int(stanza_attrs['requests']) 330 331 if stanza_attrs.has_key('wait'): 332 self.bosh_wait = int(stanza_attrs['wait']) 333 self.after_init = False 334 335 ack = None 336 if stanza_attrs.has_key('ack'): 337 ack = stanza_attrs['ack'] 338 self.ack_checker.process_incoming_ack(ack=ack, 339 socket=self.current_recv_socket) 340 341 if stanza_attrs.has_key('type'): 342 if stanza_attrs['type'] in ['terminate', 'terminal']: 343 condition = 'n/a' 344 if stanza_attrs.has_key('condition'): 345 condition = stanza_attrs['condition'] 346 if condition == 'n/a': 347 log.info('Received sesion-ending terminating stanza') 348 else: 349 log.error('Received terminating stanza: %s - %s' % (condition, 350 bosh_errors[condition])) 351 self.disconnect() 352 return 353 354 if stanza_attrs['type'] == 'error': 355 # recoverable error 356 pass 357 return
358 359
360 - def append_stanza(self, stanza):
361 """ 362 Append stanza to a buffer to send 363 """ 364 if stanza: 365 if isinstance(stanza, tuple): 366 # stanza is tuple of BOSH stanza and bool value for whether to add payload 367 self.prio_bosh_stanzas.append(stanza) 368 else: 369 # stanza is XMPP stanza. Will be boshified before send. 370 self.stanza_buffer.append(stanza)
371 372
373 - def send(self, stanza, now=False):
374 self.send_BOSH(stanza)
375 376 377
378 - def get_current_state(self):
379 t = '------ SOCKET_ID\tSOCKET_STATE\tPENDING_REQS\n' 380 for s in self.http_socks: 381 t = '%s------ %s\t%s\t%s\n' % (t, id(s), s.get_state(), s.pending_requests) 382 t = '%s------ prio stanzas: %s, queued XMPP stanzas: %s, not_acked stanzas: %s' \ 383 % (t, self.prio_bosh_stanzas, self.stanza_buffer, 384 self.ack_checker.get_not_acked_rids()) 385 return t
386 387
388 - def connect_and_flush(self, socket):
389 socket.connect( 390 conn_5tuple = self.conn_5tuple, 391 on_connect = self.on_http_request_possible, 392 on_connect_failure = self.disconnect)
393 394
395 - def boshify_stanzas(self, stanzas=[], body_attrs=None):
396 """ 397 Wraps zero to many stanzas by body tag with xmlns and sid 398 """ 399 log.debug('boshify_staza - type is: %s, stanza is %s' % (type(stanzas), stanzas)) 400 tag = BOSHBody(attrs={'sid': self.bosh_sid}) 401 tag.setPayload(stanzas) 402 return tag
403 404
405 - def send_init(self, after_SASL=False):
406 if after_SASL: 407 t = BOSHBody( 408 attrs={ 'to': self.bosh_to, 409 'sid': self.bosh_sid, 410 'xml:lang': self.bosh_xml_lang, 411 'xmpp:restart': 'true', 412 'secure': self.bosh_secure, 413 'xmlns:xmpp': 'urn:xmpp:xbosh'}) 414 else: 415 t = BOSHBody( 416 attrs={ 'content': self.bosh_content, 417 'hold': str(self.bosh_hold), 418 'route': 'xmpp:%s:%s' % (self.route_host, self.route_port), 419 'to': self.bosh_to, 420 'wait': str(self.bosh_wait), 421 'xml:lang': self.bosh_xml_lang, 422 'xmpp:version': '1.0', 423 'ver': '1.6', 424 'xmlns:xmpp': 'urn:xmpp:xbosh'}) 425 self.send_BOSH((t, True))
426
427 - def start_disconnect(self):
428 NonBlockingTransport.start_disconnect(self) 429 self.renew_bosh_wait_timeout(DISCONNECT_TIMEOUT_SECONDS) 430 self.send_BOSH( 431 (BOSHBody(attrs={'sid': self.bosh_sid, 'type': 'terminate'}), True))
432 433
434 - def get_new_http_socket(self):
435 http_dict = {'http_uri': self.bosh_uri, 436 'http_version': self.http_version, 437 'http_persistent': self.http_persistent, 438 'add_proxy_headers': self.over_proxy and not self.estabilish_tls} 439 if self.use_proxy_auth: 440 http_dict['proxy_user'], http_dict['proxy_pass'] = self.proxy_creds 441 442 s = NonBlockingHTTPBOSH( 443 raise_event=self.raise_event, 444 on_disconnect=self.disconnect, 445 idlequeue = self.idlequeue, 446 estabilish_tls = self.estabilish_tls, 447 certs = self.certs, 448 on_http_request_possible = self.on_http_request_possible, 449 http_dict = http_dict, 450 proxy_dict = self.proxy_dict, 451 on_persistent_fallback = self.on_persistent_fallback) 452 453 s.onreceive(self.on_received_http) 454 s.set_stanza_build_cb(self.build_stanza) 455 return s
456 457
458 - def onreceive(self, recv_handler):
459 if recv_handler is None: 460 recv_handler = self._owner.Dispatcher.ProcessNonBlocking 461 self.current_recv_handler = recv_handler
462 463
464 - def on_received_http(self, data, socket):
465 self.current_recv_socket = socket 466 self.current_recv_handler(data)
467 468
469 - def disconnect(self, do_callback=True):
470 self.remove_bosh_wait_timeout() 471 if self.get_state() == DISCONNECTED: return 472 self.fd = -1 473 for s in self.http_socks: 474 s.disconnect(do_callback=False) 475 NonBlockingTransport.disconnect(self, do_callback)
476 477
478 -def get_rand_number():
479 # with 50-bit random initial rid, session would have to go up 480 # to 7881299347898368 messages to raise rid over 2**53 481 # (see http://www.xmpp.org/extensions/xep-0124.html#rids) 482 # it's also used for sequence key initialization 483 r = random.Random() 484 r.seed() 485 return r.getrandbits(50)
486 487 488
489 -class AckChecker():
490 """ 491 Class for generating rids and generating and checking acknowledgements in 492 BOSH messages 493 """
494 - def __init__(self):
495 self.rid = get_rand_number() 496 self.ack = 1 497 self.last_rids = {} 498 self.not_acked = []
499 500
501 - def get_not_acked_rids(self): return [rid for rid, st in self.not_acked]
502
503 - def backup_stanza(self, stanza, socket):
504 socket.pending_requests += 1 505 rid = self.get_rid() 506 self.not_acked.append((rid, stanza)) 507 stanza.setAttr('rid', str(rid)) 508 self.last_rids[socket]=rid 509 510 if self.rid != self.ack + 1: 511 stanza.setAttr('ack', str(self.ack)) 512 return stanza
513
514 - def process_incoming_ack(self, socket, ack=None):
515 socket.pending_requests -= 1 516 if ack: 517 ack = int(ack) 518 else: 519 ack = self.last_rids[socket] 520 521 i = len([rid for rid, st in self.not_acked if ack >= rid]) 522 self.not_acked = self.not_acked[i:] 523 524 self.ack = ack
525 526
527 - def get_rid(self):
528 self.rid = self.rid + 1 529 return self.rid
530 531 532 533 534
535 -class KeyStack():
536 """ 537 Class implementing key sequences for BOSH messages 538 """
539 - def __init__(self, count):
540 self.count = count 541 self.keys = [] 542 self.reset() 543 self.first_call = True
544
545 - def reset(self):
546 seed = str(get_rand_number()) 547 self.keys = [sha1(seed).hexdigest()] 548 for i in range(self.count-1): 549 curr_seed = self.keys[i] 550 self.keys.append(sha1(curr_seed).hexdigest())
551
552 - def get(self):
553 if self.first_call: 554 self.first_call = False 555 return (None, self.keys.pop()) 556 557 if len(self.keys)>1: 558 return (self.keys.pop(), None) 559 else: 560 last_key = self.keys.pop() 561 self.reset() 562 new_key = self.keys.pop() 563 return (last_key, new_key)
564 565 # http://www.xmpp.org/extensions/xep-0124.html#errorstatus-terminal 566 bosh_errors = { 567 'n/a': 'none or unknown condition in terminating body stanza', 568 'bad-request': 'The format of an HTTP header or binding element received from the client is unacceptable (e.g., syntax error), or Script Syntax is not supported.', 569 'host-gone': 'The target domain specified in the "to" attribute or the target host or port specified in the "route" attribute is no longer serviced by the connection manager.', 570 'host-unknown': 'The target domain specified in the "to" attribute or the target host or port specified in the "route" attribute is unknown to the connection manager.', 571 'improper-addressing': 'The initialization element lacks a "to" or "route" attribute (or the attribute has no value) but the connection manager requires one.', 572 'internal-server-error': 'The connection manager has experienced an internal error that prevents it from servicing the request.', 573 'item-not-found': '(1) "sid" is not valid, (2) "stream" is not valid, (3) "rid" is larger than the upper limit of the expected window, (4) connection manager is unable to resend response, (5) "key" sequence is invalid', 574 'other-request': 'Another request being processed at the same time as this request caused the session to terminate.', 575 'policy-violation': 'The client has broken the session rules (polling too frequently, requesting too frequently, too many simultaneous requests).', 576 'remote-connection-failed': 'The connection manager was unable to connect to, or unable to connect securely to, or has lost its connection to, the server.', 577 'remote-stream-error': 'Encapsulates an error in the protocol being transported.', 578 'see-other-uri': 'The connection manager does not operate at this URI (e.g., the connection manager accepts only SSL or TLS connections at some https: URI rather than the http: URI requested by the client). The client may try POSTing to the URI in the content of the <uri/> child element.', 579 'system-shutdown': 'The connection manager is being shut down. All active HTTP sessions are being terminated. No new sessions can be created.', 580 'undefined-condition': 'The error is not one of those defined herein; the connection manager SHOULD include application-specific information in the content of the <body/> wrapper.' 581 } 582