Package nbxmpp :: Module client_nb
[hide private]
[frames] | no frames]

Source Code for Module nbxmpp.client_nb

  1  ##   client_nb.py 
  2  ##         based on client.py, changes backported up to revision 1.60 
  3  ## 
  4  ##   Copyright (C) 2003-2005 Alexey "Snake" Nezhdanov 
  5  ##         modified by Dimitur Kirov <dkirov@gmail.com> 
  6  ## 
  7  ##   This program is free software; you can redistribute it and/or modify 
  8  ##   it under the terms of the GNU General Public License as published by 
  9  ##   the Free Software Foundation; either version 2, or (at your option) 
 10  ##   any later version. 
 11  ## 
 12  ##   This program is distributed in the hope that it will be useful, 
 13  ##   but WITHOUT ANY WARRANTY; without even the implied warranty of 
 14  ##   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 15  ##   GNU General Public License for more details. 
 16   
 17  # $Id: client.py,v 1.52 2006/01/02 19:40:55 normanr Exp $ 
 18   
 19  """ 
 20  Client class establishs connection to XMPP Server and handles authentication 
 21  """ 
 22   
 23  import socket 
 24  import transports_nb, dispatcher_nb, auth_nb, roster_nb, protocol, bosh 
 25  from protocol import NS_TLS 
 26   
 27  import logging 
 28  log = logging.getLogger('nbxmpp.client_nb') 
 29   
 30   
31 -class NonBlockingClient:
32 """ 33 Client class is XMPP connection mountpoint. Objects for authentication, 34 network communication, roster, xml parsing ... are plugged to client object. 35 Client implements the abstract behavior - mostly negotioation and callbacks 36 handling, whereas underlying modules take care of feature-specific logic 37 """ 38
39 - def __init__(self, domain, idlequeue, caller=None):
40 """ 41 Caches connection data 42 43 :param domain: domain - for to: attribute (from account info) 44 :param idlequeue: processing idlequeue 45 :param caller: calling object - it has to implement methods 46 _event_dispatcher which is called from dispatcher instance 47 """ 48 self.Namespace = protocol.NS_CLIENT 49 self.defaultNamespace = self.Namespace 50 51 self.idlequeue = idlequeue 52 self.disconnect_handlers = [] 53 54 self.Server = domain 55 self.xmpp_hostname = None # FQDN hostname to connect to 56 57 # caller is who initiated this client, it is in needed to register 58 # the EventDispatcher 59 self._caller = caller 60 self._owner = self 61 self._registered_name = None # our full jid, set after successful auth 62 self.connected = '' 63 self.ip_addresses = [] 64 self.socket = None 65 self.on_connect = None 66 self.on_proxy_failure = None 67 self.on_connect_failure = None 68 self.proxy = None 69 self.got_features = False 70 self.stream_started = False 71 self.disconnecting = False 72 self.protocol_type = 'XMPP'
73
74 - def disconnect(self, message=''):
75 """ 76 Called on disconnection - disconnect callback is picked based on state of 77 the client. 78 """ 79 # to avoid recursive calls 80 if self.ip_addresses: 81 self._try_next_ip() 82 return 83 if self.disconnecting: return 84 85 log.info('Disconnecting NBClient: %s' % message) 86 87 if 'NonBlockingRoster' in self.__dict__: 88 self.NonBlockingRoster.PlugOut() 89 if 'NonBlockingBind' in self.__dict__: 90 self.NonBlockingBind.PlugOut() 91 if 'NonBlockingNonSASL' in self.__dict__: 92 self.NonBlockingNonSASL.PlugOut() 93 if 'SASL' in self.__dict__: 94 self.SASL.PlugOut() 95 if 'NonBlockingTCP' in self.__dict__: 96 self.NonBlockingTCP.PlugOut() 97 if 'NonBlockingHTTP' in self.__dict__: 98 self.NonBlockingHTTP.PlugOut() 99 if 'NonBlockingBOSH' in self.__dict__: 100 self.NonBlockingBOSH.PlugOut() 101 # FIXME: we never unplug dispatcher, only on next connect 102 # See _xmpp_connect_machine and SASLHandler 103 104 connected = self.connected 105 stream_started = self.stream_started 106 107 self.connected = '' 108 self.stream_started = False 109 110 self.disconnecting = True 111 112 log.debug('Client disconnected..') 113 if connected == '': 114 # if we're disconnecting before connection to XMPP sever is opened, 115 # we don't call disconnect handlers but on_connect_failure callback 116 if self.proxy: 117 # with proxy, we have different failure callback 118 log.debug('calling on_proxy_failure cb') 119 self.on_proxy_failure(reason=message) 120 else: 121 log.debug('calling on_connect_failure cb') 122 self.on_connect_failure() 123 else: 124 # we are connected to XMPP server 125 if not stream_started: 126 # if error occur before XML stream was opened, e.g. no response on 127 # init request, we call the on_connect_failure callback because 128 # proper connection is not established yet and it's not a proxy 129 # issue 130 log.debug('calling on_connect_failure cb') 131 self._caller.streamError = message 132 self.on_connect_failure() 133 else: 134 # with open connection, we are calling the disconnect handlers 135 for i in reversed(self.disconnect_handlers): 136 log.debug('Calling disconnect handler %s' % i) 137 i() 138 self.disconnecting = False
139
140 - def connect(self, on_connect, on_connect_failure, hostname=None, port=5222, 141 on_proxy_failure=None, proxy=None, secure_tuple=('plain', None, 142 None)):
143 """ 144 Open XMPP connection (open XML streams in both directions) 145 146 :param on_connect: called after stream is successfully opened 147 :param on_connect_failure: called when error occures during connection 148 :param hostname: hostname of XMPP server from SRV request 149 :param port: port number of XMPP server 150 :param on_proxy_failure: called if error occurres during TCP connection to 151 proxy server or during proxy connecting process 152 :param proxy: dictionary with proxy data. It should contain at least 153 values for keys 'host' and 'port' - connection details for proxy serve 154 and optionally keys 'user' and 'pass' as proxy credentials 155 :param secure_tuple: tuple of (desired connection type, cacerts, mycerts) 156 connection type can be 'ssl' - TLS established after TCP connection, 157 'tls' - TLS established after negotiation with starttls, or 'plain'. 158 cacerts, mycerts - see tls_nb.NonBlockingTLS constructor for more 159 details 160 """ 161 self.on_connect = on_connect 162 self.on_connect_failure=on_connect_failure 163 self.on_proxy_failure = on_proxy_failure 164 self.desired_security, self.cacerts, self.mycerts = secure_tuple 165 self.Connection = None 166 self.Port = port 167 self.proxy = proxy 168 169 if hostname: 170 self.xmpp_hostname = hostname 171 else: 172 self.xmpp_hostname = self.Server 173 174 # We only check for SSL here as for TLS we will first have to start a 175 # PLAIN connection and negotiate TLS afterwards. 176 # establish_tls will instruct transport to start secure connection 177 # directly 178 establish_tls = self.desired_security == 'ssl' 179 certs = (self.cacerts, self.mycerts) 180 181 proxy_dict = {} 182 tcp_host = self.xmpp_hostname 183 tcp_port = self.Port 184 185 if proxy: 186 # with proxies, client connects to proxy instead of directly to 187 # XMPP server ((hostname, port)) 188 # tcp_host is hostname of machine used for socket connection 189 # (DNS request will be done for proxy or BOSH CM hostname) 190 tcp_host, tcp_port, proxy_user, proxy_pass = \ 191 transports_nb.get_proxy_data_from_dict(proxy) 192 193 if proxy['type'] == 'bosh': 194 # Setup BOSH transport 195 self.socket = bosh.NonBlockingBOSH.get_instance( 196 on_disconnect=self.disconnect, 197 raise_event=self.raise_event, 198 idlequeue=self.idlequeue, 199 estabilish_tls=establish_tls, 200 certs=certs, 201 proxy_creds=(proxy_user, proxy_pass), 202 xmpp_server=(self.xmpp_hostname, self.Port), 203 domain=self.Server, 204 bosh_dict=proxy) 205 self.protocol_type = 'BOSH' 206 self.wait_for_restart_response = \ 207 proxy['bosh_wait_for_restart_response'] 208 else: 209 # http proxy 210 proxy_dict['type'] = proxy['type'] 211 proxy_dict['xmpp_server'] = (self.xmpp_hostname, self.Port) 212 proxy_dict['credentials'] = (proxy_user, proxy_pass) 213 214 if not proxy or proxy['type'] != 'bosh': 215 # Setup ordinary TCP transport 216 self.socket = transports_nb.NonBlockingTCP.get_instance( 217 on_disconnect=self.disconnect, 218 raise_event=self.raise_event, 219 idlequeue=self.idlequeue, 220 estabilish_tls=establish_tls, 221 certs=certs, 222 proxy_dict=proxy_dict) 223 224 # plug transport into client as self.Connection 225 self.socket.PlugIn(self) 226 227 self._resolve_hostname( 228 hostname=tcp_host, 229 port=tcp_port, 230 on_success=self._try_next_ip)
231
232 - def _resolve_hostname(self, hostname, port, on_success):
233 """ 234 Wrapper for getaddinfo call 235 236 FIXME: getaddinfo blocks 237 """ 238 try: 239 self.ip_addresses = socket.getaddrinfo(hostname, port, 240 socket.AF_UNSPEC, socket.SOCK_STREAM) 241 except socket.gaierror, (errnum, errstr): 242 self.disconnect(message='Lookup failure for %s:%s, hostname: %s - %s' % 243 (self.Server, self.Port, hostname, errstr)) 244 else: 245 on_success()
246
247 - def _try_next_ip(self, err_message=None):
248 """ 249 Iterate over IP addresses tries to connect to it 250 """ 251 if err_message: 252 log.debug('While looping over DNS A records: %s' % err_message) 253 if self.ip_addresses == []: 254 msg = 'Run out of hosts for name %s:%s.' % (self.Server, self.Port) 255 msg = msg + ' Error for last IP: %s' % err_message 256 self.disconnect(msg) 257 else: 258 self.current_ip = self.ip_addresses.pop(0) 259 self.socket.connect( 260 conn_5tuple=self.current_ip, 261 on_connect=lambda: self._xmpp_connect(), 262 on_connect_failure=self._try_next_ip)
263
264 - def incoming_stream_version(self):
265 """ 266 Get version of xml stream 267 """ 268 if 'version' in self.Dispatcher.Stream._document_attrs: 269 return self.Dispatcher.Stream._document_attrs['version'] 270 else: 271 return None
272
273 - def _xmpp_connect(self, socket_type=None):
274 """ 275 Start XMPP connecting process - open the XML stream. Is called after TCP 276 connection is established or after switch to TLS when successfully 277 negotiated with <starttls>. 278 """ 279 # socket_type contains info which transport connection was established 280 if not socket_type: 281 if self.Connection.ssl_lib: 282 # When ssl_lib is set we connected via SSL 283 socket_type = 'ssl' 284 else: 285 # PLAIN is default 286 socket_type = 'plain' 287 self.connected = socket_type 288 self._xmpp_connect_machine()
289
290 - def _xmpp_connect_machine(self, mode=None, data=None):
291 """ 292 Finite automaton taking care of stream opening and features tag handling. 293 Calls _on_stream_start when stream is started, and disconnect() on 294 failure. 295 """ 296 log.info('-------------xmpp_connect_machine() >> mode: %s, data: %s...' % 297 (mode, str(data)[:20])) 298 299 def on_next_receive(mode): 300 """ 301 Set desired on_receive callback on transport based on the state of 302 connect_machine. 303 """ 304 log.info('setting %s on next receive' % mode) 305 if mode is None: 306 self.onreceive(None) # switch to Dispatcher.ProcessNonBlocking 307 else: 308 self.onreceive(lambda _data:self._xmpp_connect_machine(mode, _data))
309 310 if not mode: 311 # starting state 312 if self.__dict__.has_key('Dispatcher'): 313 self.Dispatcher.PlugOut() 314 self.got_features = False 315 dispatcher_nb.Dispatcher.get_instance().PlugIn(self) 316 on_next_receive('RECEIVE_DOCUMENT_ATTRIBUTES') 317 318 elif mode == 'FAILURE': 319 self.disconnect('During XMPP connect: %s' % data) 320 321 elif mode == 'RECEIVE_DOCUMENT_ATTRIBUTES': 322 if data: 323 self.Dispatcher.ProcessNonBlocking(data) 324 self.ip_addresses = [] 325 if not hasattr(self, 'Dispatcher') or \ 326 self.Dispatcher.Stream._document_attrs is None: 327 self._xmpp_connect_machine( 328 mode='FAILURE', 329 data='Error on stream open') 330 return 331 332 # if terminating stanza was received after init request then client gets 333 # disconnected from bosh transport plugin and we have to end the stream 334 # negotiating process straight away. 335 # fixes #4657 336 if not self.connected: return 337 338 if self.incoming_stream_version() == '1.0': 339 if not self.got_features: 340 on_next_receive('RECEIVE_STREAM_FEATURES') 341 else: 342 log.info('got STREAM FEATURES in first recv') 343 self._xmpp_connect_machine(mode='STREAM_STARTED') 344 else: 345 log.info('incoming stream version less than 1.0') 346 self._xmpp_connect_machine(mode='STREAM_STARTED') 347 348 elif mode == 'RECEIVE_STREAM_FEATURES': 349 if data: 350 # sometimes <features> are received together with document 351 # attributes and sometimes on next receive... 352 self.Dispatcher.ProcessNonBlocking(data) 353 if not self.got_features: 354 self._xmpp_connect_machine( 355 mode='FAILURE', 356 data='Missing <features> in 1.0 stream') 357 else: 358 log.info('got STREAM FEATURES in second recv') 359 self._xmpp_connect_machine(mode='STREAM_STARTED') 360 361 elif mode == 'STREAM_STARTED': 362 self._on_stream_start()
363
364 - def _on_stream_start(self):
365 """ 366 Called after XMPP stream is opened. TLS negotiation may follow if 367 supported and desired. 368 """ 369 self.stream_started = True 370 if not hasattr(self, 'onreceive'): 371 # we may already have been disconnected 372 return 373 self.onreceive(None) 374 375 if self.connected == 'plain': 376 if self.desired_security == 'plain': 377 # if we want and have plain connection, we're done now 378 self._on_connect() 379 else: 380 # try to negotiate TLS 381 if self.incoming_stream_version() != '1.0': 382 # if stream version is less than 1.0, we can't do more 383 log.info('While connecting with type = "tls": stream version ' + 384 'is less than 1.0') 385 self._on_connect() 386 return 387 if self.Dispatcher.Stream.features.getTag('starttls'): 388 # Server advertises TLS support, start negotiation 389 self.stream_started = False 390 log.info('TLS supported by remote server. Requesting TLS start.') 391 self._tls_negotiation_handler() 392 else: 393 log.info('While connecting with type = "tls": TLS unsupported ' + 394 'by remote server') 395 self._on_connect() 396 397 elif self.connected in ['ssl', 'tls']: 398 self._on_connect() 399 else: 400 assert False, 'Stream opened for unsupported connection'
401
402 - def _tls_negotiation_handler(self, con=None, tag=None):
403 """ 404 Take care of TLS negotioation with <starttls> 405 """ 406 log.info('-------------tls_negotiaton_handler() >> tag: %s' % tag) 407 if not con and not tag: 408 # starting state when we send the <starttls> 409 self.RegisterHandlerOnce('proceed', self._tls_negotiation_handler, 410 xmlns=NS_TLS) 411 self.RegisterHandlerOnce('failure', self._tls_negotiation_handler, 412 xmlns=NS_TLS) 413 self.send('<starttls xmlns="%s"/>' % NS_TLS) 414 else: 415 # we got <proceed> or <failure> 416 if tag.getNamespace() != NS_TLS: 417 self.disconnect('Unknown namespace: %s' % tag.getNamespace()) 418 return 419 tagname = tag.getName() 420 if tagname == 'failure': 421 self.disconnect('TLS <failure> received: %s' % tag) 422 return 423 log.info('Got starttls proceed response. Switching to TLS/SSL...') 424 # following call wouldn't work for BOSH transport but it doesn't matter 425 # because <starttls> negotiation with BOSH is forbidden 426 self.Connection.tls_init( 427 on_succ = lambda: self._xmpp_connect(socket_type='tls'), 428 on_fail = lambda: self.disconnect('error while etabilishing TLS'))
429
430 - def _on_connect(self):
431 """ 432 Preceed call of on_connect callback 433 """ 434 self.onreceive(None) 435 self.on_connect(self, self.connected)
436
437 - def raise_event(self, event_type, data):
438 """ 439 Raise event to connection instance. DATA_SENT and DATA_RECIVED events 440 are used in XML console to show XMPP traffic 441 """ 442 log.info('raising event from transport: :::::%s::::\n_____________\n%s\n_____________\n' % (event_type, data)) 443 if hasattr(self, 'Dispatcher'): 444 self.Dispatcher.Event('', event_type, data)
445 446 ############################################################################### 447 ### follows code for authentication, resource bind, session and roster download 448 ############################################################################### 449
450 - def auth(self, user, password, resource='', sasl=True, on_auth=None):
451 """ 452 Authenticate connnection and bind resource. If resource is not provided 453 random one or library name used 454 455 :param user: XMPP username 456 :param password: XMPP password 457 :param resource: resource that shall be used for auth/connecting 458 :param sasl: Boolean indicating if SASL shall be used. (default: True) 459 :param on_auth: Callback, called after auth. On auth failure, argument 460 is None. 461 """ 462 self._User, self._Password = user, password 463 self._Resource, self._sasl = resource, sasl 464 self.on_auth = on_auth 465 self._on_doc_attrs() 466 return
467
468 - def _on_old_auth(self, res):
469 """ 470 Callback used by NON-SASL auth. On auth failure, res is None 471 """ 472 if res: 473 self.connected += '+old_auth' 474 self.on_auth(self, 'old_auth') 475 else: 476 self.on_auth(self, None)
477
478 - def _on_sasl_auth(self, res):
479 """ 480 Used internally. On auth failure, res is None 481 """ 482 self.onreceive(None) 483 if res: 484 self.connected += '+sasl' 485 self.on_auth(self, 'sasl') 486 else: 487 self.on_auth(self, None)
488
489 - def _on_doc_attrs(self):
490 """ 491 Plug authentication objects and start auth 492 """ 493 if self._sasl: 494 auth_nb.SASL.get_instance(self._User, self._Password, 495 self._on_start_sasl).PlugIn(self) 496 if not hasattr(self, 'SASL'): 497 return 498 if not self._sasl or self.SASL.startsasl == 'not-supported': 499 if not self._Resource: 500 self._Resource = 'xmpppy' 501 auth_nb.NonBlockingNonSASL.get_instance(self._User, self._Password, 502 self._Resource, self._on_old_auth).PlugIn(self) 503 return 504 self.SASL.auth() 505 return True
506
507 - def _on_start_sasl(self, data=None):
508 """ 509 Callback used by SASL, called on each auth step 510 """ 511 if data: 512 self.Dispatcher.ProcessNonBlocking(data) 513 if not 'SASL' in self.__dict__: 514 # SASL is pluged out, possible disconnect 515 return 516 if self.SASL.startsasl == 'in-process': 517 return 518 self.onreceive(None) 519 if self.SASL.startsasl == 'failure': 520 # wrong user/pass, stop auth 521 if 'SASL' in self.__dict__: 522 self.SASL.PlugOut() 523 self.connected = None # FIXME: is this intended? We use ''elsewhere 524 self._on_sasl_auth(None) 525 elif self.SASL.startsasl == 'success': 526 nb_bind = auth_nb.NonBlockingBind.get_instance() 527 sm = self._caller.sm 528 if sm._owner and sm.resumption: 529 nb_bind.resuming = True 530 sm.set_owner(self) 531 self.Dispatcher.sm = sm 532 nb_bind.PlugIn(self) 533 self.on_auth(self, 'sasl') 534 return 535 536 nb_bind.PlugIn(self) 537 self.onreceive(self._on_auth_bind) 538 return True
539
540 - def _on_auth_bind(self, data):
541 # FIXME: Why use this callback and not bind directly? 542 if data: 543 self.Dispatcher.ProcessNonBlocking(data) 544 if self.NonBlockingBind.bound is None: 545 return 546 self.NonBlockingBind.NonBlockingBind(self._Resource, self._on_sasl_auth) 547 return True
548
549 - def initRoster(self, version=''):
550 """ 551 Plug in the roster 552 """ 553 if not self.__dict__.has_key('NonBlockingRoster'): 554 return roster_nb.NonBlockingRoster.get_instance(version=version).PlugIn(self)
555
556 - def getRoster(self, on_ready=None, force=False):
557 """ 558 Return the Roster instance, previously plugging it in and requesting 559 roster from server if needed 560 """ 561 if self.__dict__.has_key('NonBlockingRoster'): 562 return self.NonBlockingRoster.getRoster(on_ready, force) 563 return None
564
565 - def sendPresence(self, jid=None, typ=None, requestRoster=0):
566 """ 567 Send some specific presence state. Can also request roster from server if 568 according agrument is set 569 """ 570 if requestRoster: 571 # FIXME: used somewhere? 572 roster_nb.NonBlockingRoster.get_instance().PlugIn(self) 573 self.send(dispatcher_nb.Presence(to=jid, typ=typ))
574 575 ############################################################################### 576 ### following methods are moved from blocking client class of xmpppy 577 ############################################################################### 578
579 - def RegisterDisconnectHandler(self, handler):
580 """ 581 Register handler that will be called on disconnect 582 """ 583 self.disconnect_handlers.append(handler)
584
585 - def UnregisterDisconnectHandler(self, handler):
586 """ 587 Unregister handler that is called on disconnect 588 """ 589 self.disconnect_handlers.remove(handler)
590
591 - def DisconnectHandler(self):
592 """ 593 Default disconnect handler. Just raises an IOError. If you choosed to use 594 this class in your production client, override this method or at least 595 unregister it. 596 """ 597 raise IOError('Disconnected from server.')
598
599 - def get_connect_type(self):
600 """ 601 Return connection state. F.e.: None / 'tls' / 'plain+non_sasl' 602 """ 603 return self.connected
604
605 - def get_peerhost(self):
606 """ 607 Gets the ip address of the account, from which is made connection to the 608 server (e.g. IP and port of socket) 609 610 We will create listening socket on the same ip 611 """ 612 # FIXME: tuple (ip, port) is expected (and checked for) but port num is 613 # useless 614 return self.socket.peerhost
615