Welcome to mirror list, hosted at ThFree Co, Russian Federation.

dev.gajim.org/gajim/gajim.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authortomk <tomk@no-mail.com>2008-08-18 02:57:48 +0400
committertomk <tomk@no-mail.com>2008-08-18 02:57:48 +0400
commitacdf4ff1434c6f9f7b9f477d9d19887918b816e2 (patch)
tree57c6b94830fb209b87a8da7aa666d33b0cede97f
parenta76c173816c2692ef62f91e56a507bc7810f2d8f (diff)
improved disconnect handling, added comments, fixed minor bugs
-rw-r--r--src/common/connection.py17
-rw-r--r--src/common/xmpp/bosh.py58
-rw-r--r--src/common/xmpp/client_nb.py383
-rw-r--r--src/common/xmpp/proxy_connectors.py1
-rw-r--r--src/common/xmpp/tls_nb.py11
-rw-r--r--src/common/xmpp/transports_nb.py156
6 files changed, 363 insertions, 263 deletions
diff --git a/src/common/connection.py b/src/common/connection.py
index d191c8237..cb09ee795 100644
--- a/src/common/connection.py
+++ b/src/common/connection.py
@@ -581,8 +581,8 @@ class Connection(ConnectionHandlers):
if self.on_connect_success == self._on_new_account:
con.RegisterDisconnectHandler(self._on_new_account)
- log.info('Connecting to %s: [%s:%d]', self.name,
- self._current_host['host'], port)
+ self.log_hosttype_info(port)
+
con.connect(
hostname=self._current_host['host'],
port=port,
@@ -594,6 +594,19 @@ class Connection(ConnectionHandlers):
else:
self.connect_to_next_host(retry)
+ def log_hosttype_info(self, port):
+ msg = '>>>>>> Connecting to %s [%s:%d], type = %s' % (self.name,
+ self._current_host['host'], port, self._current_type)
+ log.info(msg)
+ if self._proxy:
+ msg = '>>>>>> '
+ if self._proxy['type']=='bosh':
+ msg = '%s over BOSH %s:%s' % (msg, self._proxy['bosh_uri'], self._proxy['bosh_port'])
+ if self._proxy['type'] in ['http','socks5'] or self._proxy['bosh_useproxy']:
+ msg = '%s over proxy %s:%s' % (msg, self._proxy['host'], self._proxy['port'])
+ log.info(msg)
+
+
def _connect_failure(self, con_type = None):
if not con_type:
diff --git a/src/common/xmpp/bosh.py b/src/common/xmpp/bosh.py
index 5d1796a99..c6ae1beba 100644
--- a/src/common/xmpp/bosh.py
+++ b/src/common/xmpp/bosh.py
@@ -30,10 +30,9 @@ log = logging.getLogger('gajim.c.x.bosh')
KEY_COUNT = 10
+# Fake file descriptor - it's used for setting read_timeout in idlequeue for
+# BOSH Transport. In TCP-derived transports this is file descriptor of socket.
FAKE_DESCRIPTOR = -1337
-'''Fake file descriptor - it's used for setting read_timeout in idlequeue for
-BOSH Transport.
-In TCP-derived transports it is file descriptor of socket'''
class NonBlockingBOSH(NonBlockingTransport):
@@ -126,10 +125,11 @@ class NonBlockingBOSH(NonBlockingTransport):
def on_http_request_possible(self):
'''
- Called after HTTP response is received - when another request is possible.
+ Called when HTTP request it's possible to send a HTTP request. It can be when
+ socket is connected or when HTTP response arrived.
There should be always one pending request to BOSH CM.
'''
- log.info('on_http_req possible, state:\n%s' % self.get_current_state())
+ log.debug('on_http_req possible, state:\n%s' % self.get_current_state())
if self.get_state()==DISCONNECTED: return
#Hack for making the non-secure warning dialog work
@@ -137,6 +137,10 @@ class NonBlockingBOSH(NonBlockingTransport):
if (hasattr(self._owner, 'NonBlockingNonSASL') or hasattr(self._owner, 'SASL')):
self.send_BOSH(None)
else:
+ # If we already got features and no auth module was plugged yet, we are
+ # probably waiting for confirmation of the "not-secure-connection" dialog.
+ # We don't send HTTP request in that case.
+ # see http://lists.jabber.ru/pipermail/ejabberd/2008-August/004027.html
return
else:
self.send_BOSH(None)
@@ -144,18 +148,20 @@ class NonBlockingBOSH(NonBlockingTransport):
def get_socket_in(self, state):
+ ''' gets sockets in desired state '''
for s in self.http_socks:
if s.get_state()==state: return s
return None
def get_free_socket(self):
+ ''' Selects and returns socket eligible for sending a data to.'''
if self.http_pipelining:
return self.get_socket_in(CONNECTED)
else:
last_recv_time, tmpsock = 0, None
for s in self.http_socks:
- # we're interested only into CONNECTED socket with no req pending
+ # we're interested only in CONNECTED socket with no requests pending
if s.get_state()==CONNECTED and s.pending_requests==0:
# if there's more of them, we want the one with the least recent data receive
# (lowest last_recv_time)
@@ -169,6 +175,10 @@ class NonBlockingBOSH(NonBlockingTransport):
def send_BOSH(self, payload):
+ '''
+ Tries to send a stanza in payload by appeding it to a buffer and plugging a
+ free socket for writing.
+ '''
total_pending_reqs = sum([s.pending_requests for s in self.http_socks])
# when called after HTTP response (Payload=None) and when there are already
@@ -192,7 +202,8 @@ class NonBlockingBOSH(NonBlockingTransport):
self.get_current_state())
return
- # when there's free CONNECTED socket, we flush the data
+ # when there's free CONNECTED socket, we plug it for write and the data will
+ # be sent when write is possible
if self.get_free_socket():
self.plug_socket()
return
@@ -209,8 +220,7 @@ class NonBlockingBOSH(NonBlockingTransport):
if s:
self.connect_and_flush(s)
else:
- #if len(self.http_socks) > 1: return
- print 'connecting sock'
+ # otherwise create and connect a new one
ss = self.get_new_http_socket()
self.http_socks.append(ss)
self.connect_and_flush(ss)
@@ -225,6 +235,15 @@ class NonBlockingBOSH(NonBlockingTransport):
log.error('=====!!!!!!!!====> Couldn\'t get free socket in plug_socket())')
def build_stanza(self, socket):
+ '''
+ Builds a BOSH body tag from data in buffers and adds key, rid and ack
+ attributes to it.
+ This method is called from _do_send() of underlying transport. This is to
+ ensure rid and keys will be processed in correct order. If I generate them
+ before plugging a socket for write (and did it for two sockets/HTTP
+ connections) in parallel, they might be sent in wrong order, which results
+ in violating the BOSH session and server-side disconnect.
+ '''
if self.prio_bosh_stanzas:
stanza, add_payload = self.prio_bosh_stanzas.pop(0)
if add_payload:
@@ -244,7 +263,6 @@ class NonBlockingBOSH(NonBlockingTransport):
log.info('sending msg with rid=%s to sock %s' % (stanza.getAttr('rid'), id(socket)))
- #socket.send(stanza)
self.renew_bosh_wait_timeout(self.bosh_wait + 3)
return stanza
@@ -266,8 +284,12 @@ class NonBlockingBOSH(NonBlockingTransport):
self.wait_cb_time)
def on_persistent_fallback(self, socket):
- log.warn('Fallback to nonpersistent HTTP (no pipelining as well)')
+ '''
+ Called from underlying transport when server closes TCP connection.
+ :param socket: disconnected transport object
+ '''
if socket.http_persistent:
+ log.warn('Fallback to nonpersistent HTTP (no pipelining as well)')
socket.http_persistent = False
self.http_persistent = False
self.http_pipelining = False
@@ -279,6 +301,9 @@ class NonBlockingBOSH(NonBlockingTransport):
def handle_body_attrs(self, stanza_attrs):
+ '''
+ Called for each incoming body stanza from dispatcher. Checks body attributes.
+ '''
self.remove_bosh_wait_timeout()
if self.after_init:
@@ -315,11 +340,13 @@ class NonBlockingBOSH(NonBlockingTransport):
def append_stanza(self, stanza):
+ ''' appends stanza to a buffer to send '''
if stanza:
if isinstance(stanza, tuple):
# stanza is tuple of BOSH stanza and bool value for whether to add payload
self.prio_bosh_stanzas.append(stanza)
else:
+ # stanza is XMPP stanza. Will be boshified before send.
self.stanza_buffer.append(stanza)
@@ -391,7 +418,6 @@ class NonBlockingBOSH(NonBlockingTransport):
if self.use_proxy_auth:
http_dict['proxy_user'], http_dict['proxy_pass'] = self.proxy_creds
-
s = NonBlockingHTTPBOSH(
raise_event=self.raise_event,
on_disconnect=self.disconnect,
@@ -402,6 +428,7 @@ class NonBlockingBOSH(NonBlockingTransport):
http_dict = http_dict,
proxy_dict = self.proxy_dict,
on_persistent_fallback = self.on_persistent_fallback)
+
s.onreceive(self.on_received_http)
s.set_stanza_build_cb(self.build_stanza)
return s
@@ -439,6 +466,10 @@ def get_rand_number():
class AckChecker():
+ '''
+ Class for generating rids and generating and checking acknowledgements in
+ BOSH messages.
+ '''
def __init__(self):
self.rid = get_rand_number()
self.ack = 1
@@ -481,6 +512,9 @@ class AckChecker():
class KeyStack():
+ '''
+ Class implementing key sequences for BOSH messages
+ '''
def __init__(self, count):
self.count = count
self.keys = []
diff --git a/src/common/xmpp/client_nb.py b/src/common/xmpp/client_nb.py
index 35d5079f4..b7603cc08 100644
--- a/src/common/xmpp/client_nb.py
+++ b/src/common/xmpp/client_nb.py
@@ -16,11 +16,6 @@
# $Id: client.py,v 1.52 2006/01/02 19:40:55 normanr Exp $
-'''
-Provides Client classes implementations as examples of xmpppy structures usage.
-These classes can be used for simple applications "AS IS" though.
-'''
-
import socket
import transports_nb, dispatcher_nb, auth_nb, roster_nb, protocol, bosh
@@ -32,16 +27,19 @@ import logging
log = logging.getLogger('gajim.c.x.client_nb')
-class NBCommonClient:
- ''' Base for Client and Component classes.'''
+class NonBlockingClient:
+ '''
+ Client class is XMPP connection mountpoint. Objects for authentication,
+ network communication, roster, xml parsing ... are plugged to client object.
+ Client implements the abstract behavior - mostly negotioation and callbacks
+ handling, whereas underlying modules take care of feature-specific logic.
+ '''
def __init__(self, domain, idlequeue, caller=None):
-
- ''' Caches connection data:
+ '''
+ Caches connection data:
:param domain: domain - for to: attribute (from account info)
:param idlequeue: processing idlequeue
- :param port: port of listening XMPP server
- :param caller: calling object - it has to implement certain methods (necessary?)
-
+ :param caller: calling object - it has to implement method _event_dispatcher
'''
self.Namespace = protocol.NS_CLIENT
self.defaultNamespace = self.Namespace
@@ -62,19 +60,22 @@ class NBCommonClient:
self.on_connect_failure = None
self.proxy = None
self.got_features = False
+ self.stream_started = False
+ self.disconnecting = False
+ self.protocol_type = 'XMPP'
- def on_disconnect(self):
+ def disconnect(self, message=''):
'''
- Called on disconnection - when connect failure occurs on running connection
- (after stream is successfully opened).
- Calls disconnect handlers and cleans things up.
+ Called on disconnection - disconnect callback is picked based on state of the
+ client.
'''
-
- self.connected=''
- for i in reversed(self.disconnect_handlers):
- log.debug('Calling disconnect handler %s' % i)
- i()
+
+ # to avoid recursive calls
+ if self.disconnecting: return
+
+ log.warn('Disconnecting NBClient: %s' % message)
+
if self.__dict__.has_key('NonBlockingRoster'):
self.NonBlockingRoster.PlugOut()
if self.__dict__.has_key('NonBlockingBind'):
@@ -89,7 +90,41 @@ class NBCommonClient:
self.NonBlockingHTTP.PlugOut()
if self.__dict__.has_key('NonBlockingBOSH'):
self.NonBlockingBOSH.PlugOut()
+
+ connected = self.connected
+ stream_started = self.stream_started
+
+ self.connected = ''
+ self.stream_started = False
+
+ self.disconnecting = True
+
log.debug('Client disconnected..')
+ if connected == '':
+ # if we're disconnecting before connection to XMPP sever is opened, we don't
+ # call disconnect handlers but on_connect_failure callback
+ if self.proxy:
+ # with proxy, we have different failure callback
+ log.debug('calling on_proxy_failure cb')
+ self.on_proxy_failure(reason=message)
+ else:
+ log.debug('ccalling on_connect_failure cb')
+ self.on_connect_failure()
+ else:
+ # we are connected to XMPP server
+ if not stream_started:
+ # if error occur before XML stream was opened, e.g. no response on init
+ # request, we call the on_connect_failure callback because proper
+ # connection is not estabilished yet and it's not a proxy issue
+ log.debug('calling on_connect_failure cb')
+ self.on_connect_failure()
+ else:
+ # with open connection, we are calling the disconnect handlers
+ for i in reversed(self.disconnect_handlers):
+ log.debug('Calling disconnect handler %s' % i)
+ i()
+ self.disconnecting = False
+
def connect(self, on_connect, on_connect_failure, hostname=None, port=5222,
@@ -101,11 +136,14 @@ class NBCommonClient:
:param on_connect: called after stream is successfully opened
:param on_connect_failure: called when error occures during connection
:param on_proxy_failure: called if error occurres during TCP connection to
- proxy server or during connection to the proxy
+ proxy server or during proxy connecting process
:param proxy: dictionary with proxy data. It should contain at least values
for keys 'host' and 'port' - connection details for proxy server and
optionally keys 'user' and 'pass' as proxy credentials
- :param secure_tuple:
+ :param secure_tuple: tuple of (desired connection type, cacerts and mycerts)
+ connection type can be 'ssl' - TLS estabilished after TCP connection,
+ 'tls' - TLS estabilished after negotiation with starttls, or 'plain'.
+ cacerts, mycerts - see tls_nb.NonBlockingTLS constructor for more details
'''
self.on_connect = on_connect
self.on_connect_failure=on_connect_failure
@@ -113,16 +151,72 @@ class NBCommonClient:
self.secure, self.cacerts, self.mycerts = secure_tuple
self.Connection = None
self.Port = port
+ self.proxy = proxy
+
+ if hostname:
+ xmpp_hostname = hostname
+ else:
+ xmpp_hostname = self.Server
+
+ estabilish_tls = self.secure == 'ssl'
+ certs = (self.cacerts, self.mycerts)
+ proxy_dict = {}
+ tcp_host=xmpp_hostname
+ tcp_port=self.Port
+
+ if proxy:
+ # with proxies, client connects to proxy instead of directly to
+ # XMPP server ((hostname, port))
+ # tcp_host is hostname of machine used for socket connection
+ # (DNS request will be done for proxy or BOSH CM hostname)
+ tcp_host, tcp_port, proxy_user, proxy_pass = \
+ transports_nb.get_proxy_data_from_dict(proxy)
+
+
+ if proxy['type'] == 'bosh':
+ self.socket = bosh.NonBlockingBOSH(
+ on_disconnect = self.disconnect,
+ raise_event = self.raise_event,
+ idlequeue = self.idlequeue,
+ estabilish_tls = estabilish_tls,
+ certs = certs,
+ proxy_creds = (proxy_user, proxy_pass),
+ xmpp_server = (xmpp_hostname, self.Port),
+ domain = self.Server,
+ bosh_dict = proxy)
+ self.protocol_type = 'BOSH'
+ self.wait_for_restart_response = proxy['bosh_wait_for_restart_response']
+
+ else:
+ proxy_dict['type'] = proxy['type']
+ proxy_dict['xmpp_server'] = (xmpp_hostname, self.Port)
+ proxy_dict['credentials'] = (proxy_user, proxy_pass)
+
+ if not proxy or proxy['type'] != 'bosh':
+ self.socket = transports_nb.NonBlockingTCP(
+ on_disconnect = self.disconnect,
+ raise_event = self.raise_event,
+ idlequeue = self.idlequeue,
+ estabilish_tls = estabilish_tls,
+ certs = certs,
+ proxy_dict = proxy_dict)
+
+ self.socket.PlugIn(self)
+
+ self._resolve_hostname(
+ hostname=tcp_host,
+ port=tcp_port,
+ on_success=self._try_next_ip)
- def _resolve_hostname(self, hostname, port, on_success, on_failure):
- ''' wrapper of getaddinfo call. FIXME: getaddinfo blocks'''
+ def _resolve_hostname(self, hostname, port, on_success):
+ ''' wrapper for getaddinfo call. FIXME: getaddinfo blocks'''
try:
self.ip_addresses = socket.getaddrinfo(hostname,port,
socket.AF_UNSPEC,socket.SOCK_STREAM)
except socket.gaierror, (errnum, errstr):
- on_failure('Lookup failure for %s:%s, hostname: %s - %s' %
+ self.disconnect(message= 'Lookup failure for %s:%s, hostname: %s - %s' %
(self.Server, self.Port, hostname, errstr))
else:
on_success()
@@ -130,12 +224,13 @@ class NBCommonClient:
def _try_next_ip(self, err_message=None):
- '''iterates over IP addresses from getaddinfo'''
+ '''iterates over IP addresses from getaddrinfo'''
if err_message:
log.debug('While looping over DNS A records: %s' % err_message)
if self.ip_addresses == []:
- self._on_tcp_failure('Run out of hosts for name %s:%s' %
- (self.Server, self.Port))
+ msg = 'Run out of hosts for name %s:%s.' % (self.Server, self.Port)
+ msg = msg + ' Error for last IP: %s' % err_message
+ self.disconnect(msg)
else:
self.current_ip = self.ip_addresses.pop(0)
self.socket.connect(
@@ -152,19 +247,23 @@ class NBCommonClient:
return None
def _xmpp_connect(self, socket_type):
- if socket_type == 'plain' and self.Connection.ssl_lib:
- socket_type = 'ssl'
+ '''
+ Starts XMPP connecting process - opens the XML stream. Is called after TCP
+ connection is estabilished or after switch to TLS when successfully
+ negotiated with <starttls>.
+ '''
+ if socket_type == 'plain' and self.Connection.ssl_lib: socket_type = 'ssl'
self.connected = socket_type
self._xmpp_connect_machine()
def _xmpp_connect_machine(self, mode=None, data=None):
'''
- Finite automaton called after TCP connecting. Takes care of stream opening
- and features tag handling. Calls _on_stream_start when stream is
- started, and _on_connect_failure on failure.
+ Finite automaton taking care of stream opening and features tag
+ handling. Calls _on_stream_start when stream is started, and disconnect()
+ on failure.
'''
- log.info('-------------xmpp_connect_machine() >> mode: %s, data: %s' % (mode,str(data)[:20] ))
+ log.info('-------------xmpp_connect_machine() >> mode: %s, data: %s...' % (mode,str(data)[:20] ))
def on_next_receive(mode):
log.info('setting %s on next receive' % mode)
@@ -182,7 +281,7 @@ class NBCommonClient:
on_next_receive('RECEIVE_DOCUMENT_ATTRIBUTES')
elif mode == 'FAILURE':
- self._on_connect_failure(err_message='During XMPP connect: %s' % data)
+ self.disconnect('During XMPP connect: %s' % data)
elif mode == 'RECEIVE_DOCUMENT_ATTRIBUTES':
if data:
@@ -219,8 +318,38 @@ class NBCommonClient:
elif mode == 'STREAM_STARTED':
self._on_stream_start()
+ def _on_stream_start(self):
+ '''
+ Called after XMPP stream is opened.
+ TLS negotiation may follow after esabilishing a stream.
+ '''
+ self.stream_started = True
+ self.onreceive(None)
+ if self.connected == 'plain':
+ if self.secure == 'plain':
+ # if we want plain connection, we're done now
+ self._on_connect()
+ return
+ if not self.Dispatcher.Stream.features.getTag('starttls'):
+ # if server doesn't advertise TLS in init response, we can't do more
+ log.warn('While connecting with type = "tls": TLS unsupported by remote server')
+ self._on_connect()
+ return
+ if self.incoming_stream_version() != '1.0':
+ # if stream version is less than 1.0, we can't do more
+ log.warn('While connecting with type = "tls": stream version is less than 1.0')
+ self._on_connect()
+ return
+ # otherwise start TLS negotioation
+ self.stream_started = False
+ log.info("TLS supported by remote server. Requesting TLS start.")
+ self._tls_negotiation_handler()
+ elif self.connected in ['ssl', 'tls']:
+ self._on_connect()
+
def _tls_negotiation_handler(self, con=None, tag=None):
+ ''' takes care of TLS negotioation with <starttls> '''
log.info('-------------tls_negotiaton_handler() >> tag: %s' % tag)
if not con and not tag:
# starting state when we send the <starttls>
@@ -232,72 +361,44 @@ class NBCommonClient:
else:
# we got <proceed> or <failure>
if tag.getNamespace() <> NS_TLS:
- self._on_connect_failure('Unknown namespace: %s' % tag.getNamespace())
+ self.disconnect('Unknown namespace: %s' % tag.getNamespace())
return
tagname = tag.getName()
if tagname == 'failure':
- self._on_connect_failure('TLS <failure> received: %s' % tag)
+ self.disconnect('TLS <failure> received: %s' % tag)
return
log.info('Got starttls proceed response. Switching to TLS/SSL...')
# following call wouldn't work for BOSH transport but it doesn't matter
- # because TLS negotiation with BOSH is forbidden
+ # because <starttls> negotiation with BOSH is forbidden
self.Connection.tls_init(
on_succ = lambda: self._xmpp_connect(socket_type='tls'),
- on_fail = lambda: self._on_connect_failure('error while etabilishing TLS'))
+ on_fail = lambda: self.disconnect('error while etabilishing TLS'))
- def _on_stream_start(self):
- '''Called when stream is opened. To be overriden in derived classes.'''
-
- def _on_connect_failure(self, retry=None, err_message=None):
- self.connected = ''
- if err_message:
- log.debug('While connecting: %s' % err_message)
- if self.socket:
- self.socket.disconnect()
- self.on_connect_failure(retry)
-
def _on_connect(self):
+ ''' preceeds call of on_connect callback '''
self.onreceive(None)
self.on_connect(self, self.connected)
def raise_event(self, event_type, data):
+ '''
+ raises event to connection instance - DATA_SENT and DATA_RECIVED events are
+ used in XML console to show XMPP traffic
+ '''
log.info('raising event from transport: >>>>>%s<<<<<\n_____________\n%s\n_____________\n' % (event_type,data))
if hasattr(self, 'Dispatcher'):
self.Dispatcher.Event('', event_type, data)
- # moved from client.CommonClient (blocking client from xmpppy):
- def RegisterDisconnectHandler(self,handler):
- """ Register handler that will be called on disconnect."""
- self.disconnect_handlers.append(handler)
-
- def UnregisterDisconnectHandler(self,handler):
- """ Unregister handler that is called on disconnect."""
- self.disconnect_handlers.remove(handler)
-
- def DisconnectHandler(self):
- """ Default disconnect handler. Just raises an IOError.
- If you choosed to use this class in your production client,
- override this method or at least unregister it. """
- raise IOError('Disconnected from server.')
-
- def get_connect_type(self):
- """ Returns connection state. F.e.: None / 'tls' / 'plain+non_sasl' . """
- return self.connected
-
- def get_peerhost(self):
- ''' get the ip address of the account, from which is made connection
- to the server , (e.g. me).
- We will create listening socket on the same ip '''
- # FIXME: tuple (ip, port) is expected (and checked for) but port num is useless
- return self.socket.peerhost
-
+ # follows code for authentication, resource bind, session and roster download
+ #
def auth(self, user, password, resource = '', sasl = 1, on_auth = None):
- ''' Authenticate connnection and bind resource. If resource is not provided
- random one or library name used. '''
+ '''
+ Authenticate connnection and bind resource. If resource is not provided
+ random one or library name used.
+ '''
self._User, self._Password, self._Resource, self._sasl = user, password, resource, sasl
self.on_auth = on_auth
self._on_doc_attrs()
@@ -318,7 +419,6 @@ class NBCommonClient:
self._Resource = 'xmpppy'
auth_nb.NonBlockingNonSASL(self._User, self._Password, self._Resource, self._on_old_auth).PlugIn(self)
return
- #self.onreceive(self._on_start_sasl)
self.SASL.auth()
return True
@@ -349,7 +449,8 @@ class NBCommonClient:
self.onreceive(self._on_auth_bind)
return
return
-
+
+
def _on_auth_bind(self, data):
if data:
self.Dispatcher.ProcessNonBlocking(data)
@@ -386,107 +487,35 @@ class NBCommonClient:
self.send(dispatcher_nb.Presence(to=jid, typ=typ))
-
-class NonBlockingClient(NBCommonClient):
- ''' Example client class, based on CommonClient. '''
-
- def __init__(self, domain, idlequeue, caller=None):
- NBCommonClient.__init__(self, domain, idlequeue, caller)
- self.protocol_type = 'XMPP'
-
- def connect(self, on_connect, on_connect_failure, hostname=None, port=5222,
- on_proxy_failure=None, proxy=None, secure_tuple=None):
-
- NBCommonClient.connect(self, on_connect, on_connect_failure, hostname, port,
- on_proxy_failure, proxy, secure_tuple)
-
- if hostname:
- xmpp_hostname = hostname
- else:
- xmpp_hostname = self.Server
-
- estabilish_tls = self.secure == 'ssl'
- certs = (self.cacerts, self.mycerts)
-
- self._on_tcp_failure = self._on_connect_failure
- proxy_dict = {}
- tcp_host=xmpp_hostname
- tcp_port=self.Port
-
- if proxy:
- # with proxies, client connects to proxy instead of directly to
- # XMPP server ((hostname, port))
- # tcp_host is hostname of machine used for socket connection
- # (DNS request will be done for proxy or BOSH CM hostname)
- tcp_host, tcp_port, proxy_user, proxy_pass = \
- transports_nb.get_proxy_data_from_dict(proxy)
-
- if proxy['type'] == 'bosh':
- self.socket = bosh.NonBlockingBOSH(
- on_disconnect = self.on_disconnect,
- raise_event = self.raise_event,
- idlequeue = self.idlequeue,
- estabilish_tls = estabilish_tls,
- certs = certs,
- proxy_creds = (proxy_user, proxy_pass),
- xmpp_server = (xmpp_hostname, self.Port),
- domain = self.Server,
- bosh_dict = proxy)
- self.protocol_type = 'BOSH'
- self.wait_for_restart_response = proxy['bosh_wait_for_restart_response']
-
- else:
- self._on_tcp_failure = self.on_proxy_failure
- proxy_dict['type'] = proxy['type']
- proxy_dict['xmpp_server'] = (xmpp_hostname, self.Port)
- proxy_dict['credentials'] = (proxy_user, proxy_pass)
-
- if not proxy or proxy['type'] != 'bosh':
- self.socket = transports_nb.NonBlockingTCP(
- on_disconnect = self.on_disconnect,
- raise_event = self.raise_event,
- idlequeue = self.idlequeue,
- estabilish_tls = estabilish_tls,
- certs = certs,
- proxy_dict = proxy_dict)
-
- self.socket.PlugIn(self)
-
- self._resolve_hostname(
- hostname=tcp_host,
- port=tcp_port,
- on_success=self._try_next_ip,
- on_failure=self._on_tcp_failure)
+ # following methods are moved from blocking client class from xmpppy:
+ def RegisterDisconnectHandler(self,handler):
+ ''' Register handler that will be called on disconnect.'''
+ self.disconnect_handlers.append(handler)
+ def UnregisterDisconnectHandler(self,handler):
+ ''' Unregister handler that is called on disconnect.'''
+ self.disconnect_handlers.remove(handler)
- def _on_stream_start(self):
+ def DisconnectHandler(self):
'''
- Called after XMPP stream is opened.
- In pure XMPP client, TLS negotiation may follow after esabilishing a stream.
+ Default disconnect handler. Just raises an IOError. If you choosed to use
+ this class in your production client, override this method or at least
+ unregister it.
'''
- self.onreceive(None)
- if self.connected == 'plain':
- if self.secure == 'plain':
- # if we want plain connection, we're done now
- self._on_connect()
- return
- if not self.Dispatcher.Stream.features.getTag('starttls'):
- # if server doesn't advertise TLS in init response, we can't do more
- log.warn('While connecting with type = "tls": TLS unsupported by remote server')
- self._on_connect()
- return
- if self.incoming_stream_version() != '1.0':
- # if stream version is less than 1.0, we can't do more
- log.warn('While connecting with type = "tls": stream version is less than 1.0')
- self._on_connect()
- return
- # otherwise start TLS
- log.info("TLS supported by remote server. Requesting TLS start.")
- self._tls_negotiation_handler()
- elif self.connected in ['ssl', 'tls']:
- self._on_connect()
+ raise IOError('Disconnected from server.')
-
+ def get_connect_type(self):
+ ''' Returns connection state. F.e.: None / 'tls' / 'plain+non_sasl'. '''
+ return self.connected
+ def get_peerhost(self):
+ '''
+ Gets the ip address of the account, from which is made connection to the
+ server , (e.g. IP and port of gajim's socket. We will create listening socket
+ on the same ip
+ '''
+ # FIXME: tuple (ip, port) is expected (and checked for) but port num is
+ # useless
+ return self.socket.peerhost
diff --git a/src/common/xmpp/proxy_connectors.py b/src/common/xmpp/proxy_connectors.py
index 1b769288a..1cb30d3b1 100644
--- a/src/common/xmpp/proxy_connectors.py
+++ b/src/common/xmpp/proxy_connectors.py
@@ -19,6 +19,7 @@ import struct, socket, base64
'''
Module containing classes for proxy connecting. So far its HTTP CONNECT
and SOCKS5 proxy.
+Authentication to NTLM (Microsoft implementation) proxies can be next.
'''
import logging
diff --git a/src/common/xmpp/tls_nb.py b/src/common/xmpp/tls_nb.py
index 72faab425..08f00a7f9 100644
--- a/src/common/xmpp/tls_nb.py
+++ b/src/common/xmpp/tls_nb.py
@@ -3,6 +3,7 @@
##
## Copyright (C) 2003-2004 Alexey "Snake" Nezhdanov
## modified by Dimitur Kirov <dkirov@gmail.com>
+## modified by Tomas Karasek <tom.to.the.k@gmail.com>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
@@ -190,7 +191,7 @@ class PyOpenSSLWrapper(SSLWrapper):
return 0
class StdlibSSLWrapper(SSLWrapper):
- '''Wrapper class for Python's socket.ssl read() and write() methods'''
+ '''Wrapper class for Python socket.ssl read() and write() methods'''
def __init__(self, *args):
self.parent = SSLWrapper
@@ -221,6 +222,10 @@ class NonBlockingTLS(PlugIn):
''' TLS connection used to encrypts already estabilished tcp connection.'''
def __init__(self, cacerts, mycerts):
+ '''
+ :param cacerts: path to pem file with certificates of known XMPP servers
+ :param mycerts: path to pem file with certificates of user trusted servers
+ '''
PlugIn.__init__(self)
self.cacerts = cacerts
self.mycerts = mycerts
@@ -239,11 +244,9 @@ class NonBlockingTLS(PlugIn):
log.info('Starting TLS estabilishing')
PlugIn.PlugIn(self, owner)
try:
- self._owner._plug_idle(writable=False, readable=False)
res = self._startSSL()
except Exception, e:
log.error("PlugIn: while trying _startSSL():", exc_info=True)
- #traceback.print_exc()
return False
return res
@@ -278,7 +281,6 @@ class NonBlockingTLS(PlugIn):
if result:
log.debug("Synchronous handshake completed")
- self._owner._plug_idle(writable=True, readable=False)
return True
else:
return False
@@ -361,7 +363,6 @@ class NonBlockingTLS(PlugIn):
def _ssl_verify_callback(self, sslconn, cert, errnum, depth, ok):
# Exceptions can't propagate up through this callback, so print them here.
try:
- print 'in ssl verify callback'
self._owner.ssl_fingerprint_sha1 = cert.digest('sha1')
if errnum == 0:
return True
diff --git a/src/common/xmpp/transports_nb.py b/src/common/xmpp/transports_nb.py
index 166dd2956..840adcbb7 100644
--- a/src/common/xmpp/transports_nb.py
+++ b/src/common/xmpp/transports_nb.py
@@ -84,11 +84,28 @@ CONNECTING = 'CONNECTING'
PROXY_CONNECTING = 'PROXY_CONNECTING'
CONNECTED = 'CONNECTED'
STATES = [DISCONNECTED, CONNECTING, PROXY_CONNECTING, CONNECTED, DISCONNECTING]
-# transports have different arguments in constructor and same in connect()
-# method
+
+# Transports have different arguments in constructor and same in connect()
+# method.
class NonBlockingTransport(PlugIn):
+ '''
+ Abstract class representing a trasport - object responsible for connecting to
+ XMPP server and putting stanzas on wire in desired form.
+ '''
def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls, certs):
+ '''
+ Each trasport class can have different constructor but it has to have at
+ least all the arguments of NonBlockingTransport constructor.
+
+ :param raise_event: callback for monitoring of sent and received data
+ :param on_disconnect: callback called on disconnection during runtime
+ :param idlequeue: processing idlequeue
+ :param estabilish_tls: boolean whether to estabilish TLS connection after TCP
+ connection is done
+ :param certs: tuple of (cacerts, mycerts) see tls_nb.NonBlockingTLS
+ constructor for more details
+ '''
PlugIn.__init__(self)
self.raise_event = raise_event
self.on_disconnect = on_disconnect
@@ -103,7 +120,7 @@ class NonBlockingTransport(PlugIn):
self.certs = certs
# type of used ssl lib (if any) will be assigned to this member var
self.ssl_lib = None
- self._exported_methods=[self.disconnect, self.onreceive, self.set_send_timeout,
+ self._exported_methods=[self.onreceive, self.set_send_timeout,
self.set_timeout, self.remove_timeout, self.start_disconnect]
# time to wait for SOME stanza to come and then send keepalive
@@ -118,10 +135,15 @@ class NonBlockingTransport(PlugIn):
def plugout(self):
self._owner.Connection = None
self._owner = None
+ self.disconnect(do_callback=False)
def connect(self, conn_5tuple, on_connect, on_connect_failure):
'''
- connect method should have the same declaration in all derived transports
+ Creates and connects transport to server and port defined in conn_5tupe which
+ should be item from list returned from getaddrinfo.
+ :param conn_5tuple: 5-tuple returned from getaddrinfo
+ :param on_connect: callback called on successful connect to the server
+ :param on_connect_failure: callback called on failure when connecting
'''
self.on_connect = on_connect
self.on_connect_failure = on_connect_failure
@@ -164,8 +186,13 @@ class NonBlockingTransport(PlugIn):
self.on_disconnect()
def onreceive(self, recv_handler):
- ''' Sets the on_receive callback. Do not confuse it with
- on_receive() method, which is the callback itself.'''
+ '''
+ Sets the on_receive callback. Do not confuse it with on_receive() method,
+ which is the callback itself.
+ onreceive(None) sets callback to Dispatcher.ProcessNonBlocking which is the
+ default one that will decide what to do with received stanza based on its
+ tag name and namespace.
+ '''
if not recv_handler:
if hasattr(self._owner, 'Dispatcher'):
self.on_receive = self._owner.Dispatcher.ProcessNonBlocking
@@ -176,9 +203,9 @@ class NonBlockingTransport(PlugIn):
def tcp_connecting_started(self):
self.set_state(CONNECTING)
- # on_connect/on_conn_failure will be called from self.pollin/self.pollout
def read_timeout(self):
+ ''' called when there's no response from server in defined timeout '''
if self.on_timeout:
self.on_timeout()
self.renew_send_timeout()
@@ -212,12 +239,13 @@ class NonBlockingTransport(PlugIn):
class NonBlockingTCP(NonBlockingTransport, IdleObject):
'''
- Non-blocking TCP socket wrapper
+ Non-blocking TCP socket wrapper. It is used for simple XMPP connection. Can be
+ connected via proxy and can estabilish TLS connection.
'''
def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls, certs,
proxy_dict=None):
'''
- Class constructor.
+ :param proxy_dict: dictionary with proxy data as loaded from config file
'''
NonBlockingTransport.__init__(self, raise_event, on_disconnect, idlequeue,
estabilish_tls, certs)
@@ -227,7 +255,7 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
# bytes remained from the last send message
self.sendbuff = ''
self.proxy_dict = proxy_dict
- self.on_remote_disconnect = self.disconnect()
+ self.on_remote_disconnect = self.disconnect
def start_disconnect(self):
@@ -236,14 +264,6 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
self.disconnect()
def connect(self, conn_5tuple, on_connect, on_connect_failure):
- '''
- Creates and connects socket to server and port defined in conn_5tupe which
- should be list item returned from getaddrinfo.
- :param conn_5tuple: 5-tuple returned from getaddrinfo
- :param on_connect: callback called on successful tcp connection
- :param on_connect_failure: callback called on failure when estabilishing tcp
- connection
- '''
NonBlockingTransport.connect(self, conn_5tuple, on_connect, on_connect_failure)
log.info('NonBlockingTCP Connect :: About to connect to %s:%s' % (self.server, self.port))
@@ -258,12 +278,13 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
self._recv = self._sock.recv
self.fd = self._sock.fileno()
- # we want to be notified when send is possible to connected socket
+ # we want to be notified when send is possible to connected socket because
+ # it means the TCP connection is estabilished
self._plug_idle(writable=True, readable=False)
self.peerhost = None
+ #variable for errno symbol that will be found from exception raised from connect()
errnum = 0
- ''' variable for errno symbol that will be found from exception raised from connect() '''
# set timeout for TCP connecting - if nonblocking connect() fails, pollend
# is called. If if succeeds pollout is called.
@@ -280,15 +301,8 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
log.info('After NB connect() of %s. "%s" raised => CONNECTING' % (id(self),errstr))
self.tcp_connecting_started()
return
- elif errnum in (0, 10056, errno.EISCONN):
- # already connected - this branch is probably useless, nonblocking connect() will
- # return EINPROGRESS exception in most cases. When here, we don't need timeout
- # on connected descriptor and success callback can be called.
- log.info('After connect. "%s" raised => CONNECTED' % errstr)
- self._on_connect(self)
- return
- # if there was some other error, call failure callback and unplug transport
+ # if there was some other exception, call failure callback and unplug transport
# which will also remove read_timeouts for descriptor
self._on_connect_failure('Exception while connecting to %s:%s - %s %s' %
(self.server, self.port, errnum, errstr))
@@ -312,8 +326,8 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
def _on_connect(self):
'''
- Preceeds invoking of on_connect callback. TCP connection is estabilished at
- this time.
+ Preceeds invoking of on_connect callback. TCP connection is already
+ estabilished by this this time.
'''
if self.estabilish_tls:
self.tls_init(
@@ -324,6 +338,9 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
def tls_init(self, on_succ, on_fail):
+ '''
+ Estabilishes a TLS/SSL on TCP connection by plugging a NonBlockingTLS module
+ '''
cacerts, mycerts = self.certs
result = tls_nb.NonBlockingTLS(cacerts, mycerts).PlugIn(self)
if result: on_succ()
@@ -342,6 +359,7 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
if self.get_state()==CONNECTING:
log.info('%s socket wrapper connected' % id(self))
self.idlequeue.remove_timeout(self.fd)
+ self._plug_idle(writable=False, readable=False)
self.peerhost = self._sock.getsockname()
if self.proxy_dict: self._connect_to_proxy()
else: self._on_connect()
@@ -349,6 +367,7 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
self._do_send()
def pollend(self):
+ '''called on error on TCP connection'''
log.info('pollend called, state == %s' % self.get_state())
if self.get_state()==CONNECTING:
@@ -358,8 +377,7 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
self.disconnect()
def disconnect(self, do_callback=True):
- if self.get_state() == DISCONNECTED:
- return
+ if self.get_state() == DISCONNECTED: return
self.set_state(DISCONNECTED)
self.idlequeue.unplug_idle(self.fd)
if self.__dict__.has_key('NonBlockingTLS'): self.NonBlockingTLS.PlugOut()
@@ -367,14 +385,12 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
self._sock.shutdown(socket.SHUT_RDWR)
self._sock.close()
except socket.error, (errnum, errstr):
- log.error('Error while disconnecting a socket: %s %s' % (errnum,errstr))
+ log.error('Error while disconnecting socket: %s' % errstr)
self.fd = -1
NonBlockingTransport.disconnect(self, do_callback)
def read_timeout(self):
- '''
- Implemntation of IdleObject function called on timeouts from IdleQueue.
- '''
+ ''' method called when timeout passed '''
log.warn('read_timeout called, state == %s' % self.get_state())
if self.get_state()==CONNECTING:
# if read_timeout is called during connecting, connect() didn't end yet
@@ -403,11 +419,9 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
If supplied data is unicode string, encode it to utf-8.
'''
NonBlockingTransport.send(self, raw_data, now)
- r = raw_data
- if isinstance(r, unicode):
- r = r.encode('utf-8')
- elif not isinstance(r, str):
- r = ustr(r).encode('utf-8')
+
+ r = self.encode_stanza(raw_data)
+
if now:
self.sendqueue.insert(0, r)
self._do_send()
@@ -416,6 +430,12 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
self._plug_idle(writable=True, readable=True)
+ def encode_stanza(self, stanza):
+ if isinstance(stanza, unicode):
+ stanza = stanza.encode('utf-8')
+ elif not isinstance(stanza, str):
+ stanza = ustr(stanza).encode('utf-8')
+ return stanza
def _plug_idle(self, writable, readable):
@@ -433,12 +453,14 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
def _do_send(self):
+ '''
+ Called when send() to connected socket is possible. First message from
+ sendqueue will be sent.
+ '''
if not self.sendbuff:
if not self.sendqueue:
log.warn('calling send on empty buffer and queue')
- self._plug_idle(
- writable= ((self.sendqueue!=[]) or (self.sendbuff!='')),
- readable=True)
+ self._plug_idle(writable=False, readable=True)
return None
self.sendbuff = self.sendqueue.pop(0)
try:
@@ -471,13 +493,16 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
except tls_nb.SSLWrapper.Error, e:
log.info("_do_receive, caught SSL error, got %s:" % received , exc_info=True)
errnum, errstr = e.exc
-
+
+ if received == '': errstr = 'zero bytes on recv'
+
if (self.ssl_lib is None and received == '') or \
(self.ssl_lib == tls_nb.PYSTDLIB and errnum == 8 ) or \
(self.ssl_lib == tls_nb.PYOPENSSL and errnum == -1 ):
# 8 in stdlib: errstr == EOF occured in violation of protocol
# -1 in pyopenssl: errstr == Unexpected EOF
- log.info("Disconnected by remote server: %s %s" % (errnum, errstr), exc_info=True)
+ log.info("Disconnected by remote server: #%s, %s" % (errnum, errstr))
+ print self.on_remote_disconnect
self.on_remote_disconnect()
return
@@ -489,8 +514,7 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
# this branch is for case of non-fatal SSL errors - None is returned from
# recv() but no errnum is set
- if received is None:
- return
+ if received is None: return
# we have received some bytes, stop the timeout!
self.renew_send_timeout()
@@ -519,6 +543,13 @@ class NonBlockingHTTP(NonBlockingTCP):
def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls, certs,
on_http_request_possible, on_persistent_fallback, http_dict, proxy_dict = None):
+ '''
+ :param on_http_request_possible: method to call when HTTP request to socket
+ owned by transport is possible.
+ :param on_persistent_fallback: callback called when server ends TCP
+ connection. It doesn't have to be fatal for HTTP session.
+ :param http_dict: dictionary with data for HTTP request and headers
+ '''
NonBlockingTCP.__init__(self, raise_event, on_disconnect, idlequeue,
estabilish_tls, certs, proxy_dict)
@@ -551,8 +582,10 @@ class NonBlockingHTTP(NonBlockingTCP):
def _on_receive(self,data):
- '''Preceeds passing received data to owner class. Gets rid of HTTP headers
- and checks them.'''
+ '''
+ Preceeds passing received data to owner class. Gets rid of HTTP headers and
+ checks them.
+ '''
if self.get_state() == PROXY_CONNECTING:
NonBlockingTCP._on_receive(self, data)
return
@@ -648,7 +681,10 @@ class NonBlockingHTTP(NonBlockingTCP):
class NonBlockingHTTPBOSH(NonBlockingHTTP):
-
+ '''
+ Class for BOSH HTTP connections. Slightly redefines HTTP transport by calling
+ bosh bodytag generating callback before putting data on wire.
+ '''
def set_stanza_build_cb(self, build_cb):
self.build_cb = build_cb
@@ -659,24 +695,10 @@ class NonBlockingHTTPBOSH(NonBlockingHTTP):
return
if not self.sendbuff:
stanza = self.build_cb(socket=self)
+ stanza = self.encode_stanza(stanza)
stanza = self.build_http_message(httpbody=stanza)
- if isinstance(stanza, unicode):
- stanza = stanza.encode('utf-8')
- elif not isinstance(stanza, str):
- stanza = ustr(stanza).encode('utf-8')
self.sendbuff = stanza
- try:
- send_count = self._send(self.sendbuff)
- if send_count:
- sent_data = self.sendbuff[:send_count]
- self.sendbuff = self.sendbuff[send_count:]
- self._plug_idle(writable = self.sendbuff != '', readable = True)
- self.raise_event(DATA_SENT, sent_data)
-
- except socket.error, e:
- log.error('_do_send:', exc_info=True)
- traceback.print_exc()
- self.disconnect()
+ NonBlockingTCP._do_send(self)