Welcome to mirror list, hosted at ThFree Co, Russian Federation.

dev.gajim.org/gajim/gajim.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authortomk <tomk@no-mail.com>2008-07-14 02:22:58 +0400
committertomk <tomk@no-mail.com>2008-07-14 02:22:58 +0400
commit3d860f40a6dc99853684e7e9f1277b12207b40b9 (patch)
treee7039a24444c2bfbb9f9b179198b86f5cfea70d0
parente1899f34dc808e3b08f90d11e1e66fce646a1f80 (diff)
BOSHClient transformed to NonBlockingBOSH transport - it's easier to maintain more connections from below, implemented handling of non-persistent HTTP connections - it runs with ejabberd, improved NonBlockingTransport interface, minor changes in BOSHDispatcher
-rw-r--r--src/common/connection.py46
-rw-r--r--src/common/xmpp/bosh.py377
-rw-r--r--src/common/xmpp/client.py2
-rw-r--r--src/common/xmpp/client_nb.py82
-rw-r--r--src/common/xmpp/dispatcher_nb.py85
-rw-r--r--src/common/xmpp/idlequeue.py11
-rw-r--r--src/common/xmpp/protocol.py14
-rw-r--r--src/common/xmpp/transports_nb.py181
-rw-r--r--test/test_nonblockingtcp.py8
9 files changed, 452 insertions, 354 deletions
diff --git a/src/common/connection.py b/src/common/connection.py
index 649baf178..36e7719b6 100644
--- a/src/common/connection.py
+++ b/src/common/connection.py
@@ -521,24 +521,19 @@ class Connection(ConnectionHandlers):
self.connection = None
if self._current_type == 'ssl':
+ # SSL (force TLS on different port than plain)
port = self._current_host['ssl_port']
- secur = 1
+ secure = 'force'
else:
port = self._current_host['port']
if self._current_type == 'plain':
- secur = 0
+ # plain connection
+ secure = None
else:
- secur = None
+ # TLS (on the same port as plain)
+ secure = 'negotiate'
- if self._proxy and self._proxy['type'] == 'bosh':
- clientClass = common.xmpp.bosh.BOSHClient
- else:
- clientClass = common.xmpp.NonBlockingClient
-
- # there was:
- # "if gajim.verbose:"
- # here
- con = clientClass(
+ con = common.xmpp.NonBlockingClient(
domain=self._hostname,
caller=self,
idlequeue=gajim.idlequeue)
@@ -550,11 +545,11 @@ class Connection(ConnectionHandlers):
if self.on_connect_success == self._on_new_account:
con.RegisterDisconnectHandler(self._on_new_account)
- # FIXME: BOSH properties should be in proxy dictionary - loaded from
- # config
- if self._proxy and self._proxy['type'] == 'bosh':
+ # FIXME: BOSH properties should be loaded from config
+ if self._proxy and self._proxy['type'] == 'bosh':
self._proxy['bosh_hold'] = '1'
self._proxy['bosh_wait'] = '60'
+ self._proxy['bosh_content'] = 'text/xml; charset=utf-8'
log.info('Connecting to %s: [%s:%d]', self.name,
@@ -566,7 +561,7 @@ class Connection(ConnectionHandlers):
on_proxy_failure=self.on_proxy_failure,
on_connect_failure=self.connect_to_next_type,
proxy=self._proxy,
- secure = secur)
+ secure = secure)
else:
self.connect_to_next_host(retry)
@@ -578,9 +573,11 @@ class Connection(ConnectionHandlers):
'connection_types').split()
else:
self._connection_types = ['tls', 'ssl', 'plain']
-
+
# FIXME: remove after tls and ssl will be degubbed
- #self._connection_types = ['plain']
+ self._connection_types = ['plain']
+
+
host = self.select_next_host(self._hosts)
self._current_host = host
self._hosts.remove(host)
@@ -619,6 +616,8 @@ class Connection(ConnectionHandlers):
if _con_type == 'tcp':
_con_type = 'plain'
if _con_type != self._current_type:
+ log.info('Connecting to next type beacuse desired is %s and returned is %s'
+ % (self._current_type, _con_type))
self.connect_to_next_type()
return
if _con_type == 'plain' and gajim.config.get_per('accounts', self.name,
@@ -662,7 +661,12 @@ class Connection(ConnectionHandlers):
(con.Connection.ssl_fingerprint_sha1,))
return True
self._register_handlers(con, con_type)
- con.auth(name, self.password, self.server_resource, 1, self.__on_auth)
+ con.auth(
+ user=name,
+ password=self.password,
+ resource=self.server_resource,
+ sasl=1,
+ on_auth=self.__on_auth)
def ssl_certificate_accepted(self):
name = gajim.config.get_per('accounts', self.name, 'name')
@@ -997,7 +1001,7 @@ class Connection(ConnectionHandlers):
self.time_to_reconnect = None
self.connection.RegisterDisconnectHandler(self._on_disconnected)
- self.connection.send(p)
+ self.connection.send(p, now=True)
self.connection.StreamTerminate()
#self.connection.start_disconnect(p, self._on_disconnected)
else:
@@ -1554,7 +1558,7 @@ class Connection(ConnectionHandlers):
def gc_got_disconnected(self, room_jid):
''' A groupchat got disconnected. This can be or purpose or not.
Save the time we quit to avoid duplicate logs AND be faster than get that
- date from DB. Save it in mem AND in a small table (with fast access)
+ date from DB. Save it in mem AND in a small table (with fast access)
'''
log_time = time_time()
self.last_history_time[room_jid] = log_time
diff --git a/src/common/xmpp/bosh.py b/src/common/xmpp/bosh.py
index 851d7cc4c..3d53bdf40 100644
--- a/src/common/xmpp/bosh.py
+++ b/src/common/xmpp/bosh.py
@@ -1,235 +1,260 @@
-import protocol, locale, random, dispatcher_nb
-from client_nb import NBCommonClient
-import transports_nb
-import logging
+import locale, random
+from transports_nb import NonBlockingTransport, NonBlockingHTTP, CONNECTED, CONNECTING, DISCONNECTED
+from protocol import BOSHBody
from simplexml import Node
+
+import logging
log = logging.getLogger('gajim.c.x.bosh')
-class BOSHClient(NBCommonClient):
- '''
- Client class implementing BOSH. Extends common XMPP
- '''
- def __init__(self, domain, idlequeue, caller=None):
- '''Preceeds constructor of NBCommonClient and sets some of values that will
- be used as attributes in <body> tag'''
- self.bosh_sid=None
+FAKE_DESCRIPTOR = -1337
+'''Fake file descriptor - it's used for setting read_timeout in idlequeue for
+BOSH Transport. Timeouts in queue are saved by socket descriptor.
+In TCP-derived transports it is file descriptor of socket'''
+
+
+class NonBlockingBOSH(NonBlockingTransport):
+ def __init__(self, raise_event, on_disconnect, idlequeue, xmpp_server, domain,
+ bosh_dict):
+ NonBlockingTransport.__init__(self, raise_event, on_disconnect, idlequeue)
# with 50-bit random initial rid, session would have to go up
# to 7881299347898368 messages to raise rid over 2**53
# (see http://www.xmpp.org/extensions/xep-0124.html#rids)
r = random.Random()
r.seed()
+ global FAKE_DESCRIPTOR
+ FAKE_DESCRIPTOR = FAKE_DESCRIPTOR - 1
+ self.fake_fd = FAKE_DESCRIPTOR
self.bosh_rid = r.getrandbits(50)
self.bosh_sid = None
-
if locale.getdefaultlocale()[0]:
self.bosh_xml_lang = locale.getdefaultlocale()[0].split('_')[0]
else:
self.bosh_xml_lang = 'en'
self.http_version = 'HTTP/1.1'
+ self.http_persistent = False
+ self.http_pipelining = False
self.bosh_to = domain
- #self.Namespace = protocol.NS_HTTP_BIND
- #self.defaultNamespace = self.Namespace
- self.bosh_session_on = False
+ self.route_host, self.route_port = xmpp_server
- NBCommonClient.__init__(self, domain, idlequeue, caller)
+ self.bosh_wait = bosh_dict['bosh_wait']
+ self.bosh_hold = bosh_dict['bosh_hold']
+ self.bosh_host = bosh_dict['host']
+ self.bosh_port = bosh_dict['port']
+ self.bosh_content = bosh_dict['bosh_content']
+ self.http_socks = []
+ self.stanzas_to_send = []
+ self.prio_bosh_stanza = None
+ self.current_recv_handler = None
+ # if proxy_host .. do sth about HTTP proxy etc.
+
- def connect(self, on_connect, on_connect_failure, proxy, hostname=None, port=5222,
- on_proxy_failure=None, secure=None):
- '''
- Open XMPP connection (open XML streams in both directions).
- :param hostname: hostname of XMPP server from SRV request
- :param port: port number of XMPP server
- :param on_connect: called after stream is successfully opened
- :param on_connect_failure: called when error occures during connection
- :param on_proxy_failure: called if error occurres during TCP connection to
- proxy server or during connection to the proxy
- :param proxy: dictionary with bosh-related paramters. It should contain at
- least values for keys 'host' and 'port' - connection details for proxy
- server and optionally keys 'user' and 'pass' as proxy credentials
- :param secure: if
- '''
- NBCommonClient.connect(self, on_connect, on_connect_failure, hostname, port,
- on_proxy_failure, proxy, secure)
+ def connect(self, conn_5tuple, on_connect, on_connect_failure):
+ NonBlockingTransport.connect(self, conn_5tuple, on_connect, on_connect_failure)
+ self.http_socks.append(self.get_http_socket())
+ self.tcp_connection_started()
- if hostname:
- self.route_host = hostname
- else:
- self.route_host = self.Server
-
- assert(proxy.has_key('type'))
- assert(proxy['type']=='bosh')
-
- self.bosh_wait = proxy['bosh_wait']
- self.bosh_hold = proxy['bosh_hold']
- self.bosh_host = proxy['host']
- self.bosh_port = proxy['port']
- self.bosh_content = proxy['bosh_content']
-
- # _on_tcp_failure is callback for errors which occur during name resolving or
- # TCP connecting.
- self._on_tcp_failure = self.on_proxy_failure
-
-
-
- # in BOSH, client connects to Connection Manager instead of directly to
- # XMPP server ((hostname, port)). If HTTP Proxy is specified, client connects
- # to HTTP proxy and Connection Manager is specified at URI and Host header
- # in HTTP message
-
- # tcp_host, tcp_port is hostname and port for socket connection - Connection
- # Manager or HTTP proxy
- if proxy.has_key('proxy_host') and proxy['proxy_host'] and \
- proxy.has_key('proxy_port') and proxy['proxy_port']:
-
- tcp_host=proxy['proxy_host']
- tcp_port=proxy['proxy_port']
-
- # user and password for HTTP proxy
- if proxy.has_key('user') and proxy['user'] and \
- proxy.has_key('pass') and proxy['pass']:
-
- proxy_creds=(proxy['user'],proxy['pass'])
- else:
- proxy_creds=(None, None)
+ # this connect() is not needed because sockets can be connected on send but
+ # we need to know if host is reachable in order to invoke callback for
+ # failure when connecting (it's different than callback for errors
+ # occurring after connection is etabilished)
- else:
- tcp_host = transports_nb.urisplit(proxy['host'])[1]
- tcp_port=proxy['port']
+ self.http_socks[0].connect(
+ conn_5tuple = conn_5tuple,
+ on_connect = lambda: self._on_connect(self.http_socks[0]),
+ on_connect_failure = self._on_connect_failure)
- if tcp_host is None:
- self._on_connect_failure("Invalid BOSH URI")
- return
- self.socket = self.get_socket()
- self._resolve_hostname(
- hostname=tcp_host,
- port=tcp_port,
- on_success=self._try_next_ip,
- on_failure=self._on_tcp_failure)
+ def get_fd(self):
+ return self.fake_fd
- def _on_stream_start(self):
+ def on_http_request_possible(self):
'''
- Called after XMPP stream is opened. In BOSH, TLS is negotiated on socket
- connect so success callback can be invoked after TCP connect.
- (authentication is started from auth() method)
+ Called after HTTP response is received - another request is possible.
+ There should be always one pending request on BOSH CM.
'''
- self.onreceive(None)
- if self.connected == 'tcp':
- self._on_connect()
+ log.info('on_http_req possible, stanzas in list: %s, state:\n%s' %
+ (self.stanzas_to_send, self.get_current_state()))
+ # if one of sockets is connecting, sth is about to be sent - we don't have to
+ # send request from here
+ for s in self.http_socks:
+ if s.state==CONNECTING or s.pending_requests>0: return
+ self.flush_stanzas()
+
+
+ def flush_stanzas(self):
+ # another to-be-locked candidate
+ log.info('flushing stanzas')
+ if self.prio_bosh_stanza:
+ tmp = self.prio_bosh_stanza
+ self.prio_bosh_stanza = None
+ else:
+ tmp = self.stanzas_to_send
+ self.stanzas_to_send = []
+ self.send_http(tmp)
- def get_socket(self):
- tmp = transports_nb.NonBlockingHTTP(
- raise_event=self.raise_event,
- on_disconnect=self.on_http_disconnect,
- http_uri = self.bosh_host,
- http_port = self.bosh_port,
- http_version = self.http_version
- )
- tmp.PlugIn(self)
- return tmp
-
- def on_http_disconnect(self):
- log.info('HTTP socket disconnected')
- #import traceback
- #traceback.print_stack()
- if self.bosh_session_on:
- self.socket.connect(
- conn_5tuple=self.current_ip,
- on_connect=self.on_http_reconnect,
- on_connect_failure=self.on_disconnect)
+
+ def send(self, stanza, now=False):
+ # body tags should be send only via send_http()
+ assert(not isinstance(stanza, BOSHBody))
+ now = True
+ if now:
+ self.send_http([stanza])
else:
- self.on_disconnect()
+ self.stanzas_to_send.append(stanza)
- def on_http_reconnect(self):
- self.socket._plug_idle()
- log.info('Connected to BOSH CM again')
- pass
+ def send_http(self, payload):
+ # "Protocol" and string/unicode stanzas should be sent via send()
+ # (only initiating and terminating BOSH stanzas should be send via send_http)
+ assert(isinstance(payload, list) or isinstance(payload, BOSHBody))
+ log.warn('send_http: stanzas: %s\n%s' % (payload, self.get_current_state()))
+
+ if isinstance(payload, list):
+ bosh_stanza = self.boshify_stanzas(payload)
+ else:
+ # bodytag_payload is <body ...>, we don't boshify, only add the rid
+ bosh_stanza = payload
+ picked_sock = self.pick_socket()
+ if picked_sock:
+ log.info('sending to socket %s' % id(picked_sock))
+ bosh_stanza.setAttr('rid', self.get_rid())
+ picked_sock.send(bosh_stanza)
+ else:
+ # no socket was picked but one is about to connect - save the stanza and
+ # return
+ if self.prio_bosh_stanza:
+ payload = self.merge_stanzas(payload, self.prio_bosh_stanza)
+ if payload is None:
+ log.error('Error in BOSH socket handling - unable to send %s because %s\
+ is already about to be sent' % (payload, self.prio_bosh_stanza))
+ self.disconnect()
+ self.prio_bosh_stanza = payload
+
+ def merge_stanzas(self, s1, s2):
+ if isinstance(s1, BOSHBody):
+ if isinstance(s2, BOSHBody):
+ # both are boshbodies
+ return
+ else:
+ s1.setPayload(s2, add=True)
+ return s1
+ elif isinstance(s2, BOSHBody):
+ s2.setPayload(s1, add=True)
+ return s2
+ else:
+ #both are lists
+ s1.extend(s2)
+ return s1
- def on_http_reconnect_fail(self):
- log.error('Error when reconnecting to BOSH CM')
- self.on_disconnect()
- def send(self, stanza, now = False):
- (id, stanza_to_send) = self.Dispatcher.assign_id(stanza)
+ def get_current_state(self):
+ t = '\t\tSOCKET_ID\tSOCKET_STATE\tPENDING_REQS\n'
+ for s in self.http_socks:
+ t = '%s\t\t%s\t%s\t%s\n' % (t,id(s), s.state, s.pending_requests)
+ return t
+
- self.socket.send(
- self.boshify_stanza(stanza_to_send),
- now = now)
- return id
+ def pick_socket(self):
+ # try to pick connected socket with no pending reqs
+ for s in self.http_socks:
+ if s.state == CONNECTED and s.pending_requests == 0:
+ return s
- def get_rid(self):
- # does this need a lock??"
- self.bosh_rid = self.bosh_rid + 1
- return str(self.bosh_rid)
+ # try to connect some disconnected socket
+ for s in self.http_socks:
+ if s.state==DISCONNECTED:
+ self.connect_and_flush(s)
+ return
+
+ # if there is any just-connecting socket, it will send the data in its
+ # connect callback
+ for s in self.http_socks:
+ if s.state==CONNECTING:
+ return
+ # being here means there are only CONNECTED scokets with pending requests.
+ # Lets create and connect another one
+ s = self.get_http_socket()
+ self.http_socks.append(s)
+ self.connect_and_flush(s)
+ return
+
+
+ def connect_and_flush(self, socket):
+ socket.connect(
+ conn_5tuple = self.conn_5tuple,
+ on_connect = self.flush_stanzas,
+ on_connect_failure = self.disconnect)
+
+
+ def boshify_stanzas(self, stanzas=[], body_attrs=None):
+ ''' wraps zero to many stanzas by body tag with xmlns and sid '''
+ log.debug('boshify_staza - type is: %s, stanza is %s' % (type(stanzas), stanzas))
+ tag = BOSHBody(attrs={'sid': self.bosh_sid})
+ tag.setPayload(stanzas)
+ return tag
- def get_bodytag(self):
- # this should be called not until after session creation response so sid has
- # to be initialized.
- assert(hasattr(self, 'bosh_sid'))
- return protocol.BOSHBody(
- attrs={ 'rid': self.get_rid(),
- 'sid': self.bosh_sid})
def get_initial_bodytag(self, after_SASL=False):
- tag = protocol.BOSHBody(
+ return BOSHBody(
attrs={'content': self.bosh_content,
'hold': str(self.bosh_hold),
- 'route': '%s:%s' % (self.route_host, self.Port),
+ 'route': '%s:%s' % (self.route_host, self.route_port),
'to': self.bosh_to,
'wait': str(self.bosh_wait),
- 'rid': self.get_rid(),
'xml:lang': self.bosh_xml_lang,
'xmpp:version': '1.0',
'ver': '1.6',
'xmlns:xmpp': 'urn:xmpp:xbosh'})
- if after_SASL:
- tag.delAttr('content')
- tag.delAttr('hold')
- tag.delAttr('route')
- tag.delAttr('wait')
- tag.delAttr('ver')
- # xmpp:restart attribute is essential for stream restart request
- tag.setAttr('xmpp:restart','true')
- tag.setAttr('sid',self.bosh_sid)
-
- return tag
+ def get_after_SASL_bodytag(self):
+ return BOSHBody(
+ attrs={ 'to': self.bosh_to,
+ 'sid': self.bosh_sid,
+ 'xml:lang': self.bosh_xml_lang,
+ 'xmpp:version': '1.0',
+ 'xmpp:restart': 'true',
+ 'xmlns:xmpp': 'urn:xmpp:xbosh'})
def get_closing_bodytag(self):
- closing_bodytag = self.get_bodytag()
- closing_bodytag.setAttr('type', 'terminate')
- return closing_bodytag
+ return BOSHBody(attrs={'sid': self.bosh_sid, 'type': 'terminate'})
-
- def boshify_stanza(self, stanza=None, body_attrs=None):
- ''' wraps stanza by body tag with rid and sid '''
- #log.info('boshify_staza - type is: %s, stanza is %s' % (type(stanza), stanza))
- tag = self.get_bodytag()
- tag.setPayload([stanza])
- return tag
+ def get_rid(self):
+ self.bosh_rid = self.bosh_rid + 1
+ return str(self.bosh_rid)
- def on_bodytag_attrs(self, body_attrs):
- #log.info('on_bodytag_attrs: %s' % body_attrs)
- if body_attrs.has_key('type'):
- if body_attrs['type']=='terminated':
- # BOSH session terminated
- self.bosh_session_on = False
- elif body_attrs['type']=='error':
- # recoverable error
- pass
- if not self.bosh_sid:
- # initial response - when bosh_sid is set
- self.bosh_session_on = True
- self.bosh_sid = body_attrs['sid']
- self.Dispatcher.Stream._document_attrs['id']=body_attrs['authid']
+ def get_http_socket(self):
+ s = NonBlockingHTTP(
+ raise_event=self.raise_event,
+ on_disconnect=self.disconnect,
+ idlequeue = self.idlequeue,
+ on_http_request_possible = self.on_http_request_possible,
+ http_uri = self.bosh_host,
+ http_port = self.bosh_port,
+ http_version = self.http_version)
+ if self.current_recv_handler:
+ s.onreceive(self.current_recv_handler)
+ return s
+
+ def onreceive(self, recv_handler):
+ if recv_handler is None:
+ recv_handler = self._owner.Dispatcher.ProcessNonBlocking
+ self.current_recv_handler = recv_handler
+ for s in self.http_socks:
+ s.onreceive(recv_handler)
+
+ def disconnect(self, do_callback=True):
+ if self.state == DISCONNECTED: return
+
+ for s in self.http_socks:
+ s.disconnect(do_callback=False)
+ NonBlockingTransport.disconnect(self, do_callback)
diff --git a/src/common/xmpp/client.py b/src/common/xmpp/client.py
index f9f200dac..52c6bc2f2 100644
--- a/src/common/xmpp/client.py
+++ b/src/common/xmpp/client.py
@@ -48,7 +48,7 @@ class PlugIn:
else:
owner.__dict__[self.__class__.__name__]=self
- # following will not work for classes inheriting plugin()
+ # following commented line will not work for classes inheriting plugin()
#if self.__class__.__dict__.has_key('plugin'): return self.plugin(owner)
if hasattr(self,'plugin'): return self.plugin(owner)
diff --git a/src/common/xmpp/client_nb.py b/src/common/xmpp/client_nb.py
index c21437de5..d22905580 100644
--- a/src/common/xmpp/client_nb.py
+++ b/src/common/xmpp/client_nb.py
@@ -23,7 +23,7 @@ These classes can be used for simple applications "AS IS" though.
import socket
-import transports_nb, tls_nb, dispatcher_nb, auth_nb, roster_nb, protocol
+import transports_nb, tls_nb, dispatcher_nb, auth_nb, roster_nb, protocol, bosh
from client import *
import logging
@@ -49,7 +49,7 @@ class NBCommonClient:
self.Server = domain
- # caller is who initiated this client, it is sed to register the EventDispatcher
+ # caller is who initiated this client, it is needed to register the EventDispatcher
self._caller = caller
self._owner = self
self._registered_name = None
@@ -92,16 +92,8 @@ class NBCommonClient:
self.NonBlockingTCP.PlugOut()
if self.__dict__.has_key('NonBlockingHTTP'):
self.NonBlockingHTTP.PlugOut()
-
-
- def send(self, stanza, now = False):
- ''' interface for putting stanzas on wire. Puts ID to stanza if needed and
- sends it via socket wrapper'''
- (id, stanza_to_send) = self.Dispatcher.assign_id(stanza)
-
- self.Connection.send(stanza_to_send, now = now)
- return id
-
+ if self.__dict__.has_key('NonBlockingBOSH'):
+ self.NonBlockingBOSH.PlugOut()
def connect(self, on_connect, on_connect_failure, hostname=None, port=5222,
@@ -177,7 +169,7 @@ class NBCommonClient:
started, and _on_connect_failure on failure.
'''
#FIXME: use RegisterHandlerOnce instead of onreceive
- log.info('========xmpp_connect_machine() >> mode: %s, data: %s' % (mode,str(data)[:20] ))
+ log.info('-------------xmpp_connect_machine() >> mode: %s, data: %s' % (mode,str(data)[:20] ))
def on_next_receive(mode):
log.info('setting %s on next receive' % mode)
@@ -187,7 +179,8 @@ class NBCommonClient:
self.onreceive(lambda _data:self._xmpp_connect_machine(mode, _data))
if not mode:
- dispatcher_nb.Dispatcher().PlugIn(self)
+ # starting state
+ d=dispatcher_nb.Dispatcher().PlugIn(self)
on_next_receive('RECEIVE_DOCUMENT_ATTRIBUTES')
elif mode == 'FAILURE':
@@ -205,7 +198,7 @@ class NBCommonClient:
if not self.Dispatcher.Stream.features:
on_next_receive('RECEIVE_STREAM_FEATURES')
else:
- log.info('got STREAM FEATURES in first read')
+ log.info('got STREAM FEATURES in first recv')
self._xmpp_connect_machine(mode='STREAM_STARTED')
else:
@@ -222,7 +215,7 @@ class NBCommonClient:
mode='FAILURE',
data='Missing <features> in 1.0 stream')
else:
- log.info('got STREAM FEATURES in second read')
+ log.info('got STREAM FEATURES in second recv')
self._xmpp_connect_machine(mode='STREAM_STARTED')
elif mode == 'STREAM_STARTED':
@@ -244,7 +237,7 @@ class NBCommonClient:
self.on_connect(self, self.connected)
def raise_event(self, event_type, data):
- log.info('raising event from transport: %s %s' % (event_type,data))
+ log.info('raising event from transport: :::::%s::::\n_____________\n%s\n_____________\n' % (event_type,data))
if hasattr(self, 'Dispatcher'):
self.Dispatcher.Event('', event_type, data)
@@ -272,8 +265,9 @@ class NBCommonClient:
''' get the ip address of the account, from which is made connection
to the server , (e.g. me).
We will create listening socket on the same ip '''
- if hasattr(self, 'Connection'):
- return self.Connection._sock.getsockname()
+ # FIXME: tuple (ip, port) is expected (and checked for) but port num is useless
+ if hasattr(self, 'socket'):
+ return self.socket.peerhost
def auth(self, user, password, resource = '', sasl = 1, on_auth = None):
@@ -364,6 +358,7 @@ class NonBlockingClient(NBCommonClient):
def __init__(self, domain, idlequeue, caller=None):
NBCommonClient.__init__(self, domain, idlequeue, caller)
+ self.protocol_type = 'XMPP'
def connect(self, on_connect, on_connect_failure, hostname=None, port=5222,
on_proxy_failure=None, proxy=None, secure=None):
@@ -379,35 +374,33 @@ class NonBlockingClient(NBCommonClient):
if proxy:
# with proxies, client connects to proxy instead of directly to
# XMPP server ((hostname, port))
- # tcp_host is machine used for socket connection
- tcp_host=proxy['host']
- tcp_port=proxy['port']
+ # tcp_host is hostname of machine used for socket connection
+ # (DNS request will be done for this hostname)
+ tcp_host, tcp_port, proxy_user, proxy_pass = \
+ transports_nb.get_proxy_data_from_dict(proxy)
+
self._on_tcp_failure = self.on_proxy_failure
- if proxy.has_key('type'):
- assert(proxy['type']!='bosh')
- if proxy.has_key('user') and proxy.has_key('pass'):
- proxy_creds=(proxy['user'],proxy['pass'])
- else:
- proxy_creds=(None, None)
-
- type_ = proxy['type']
- if type_ == 'socks5':
- # SOCKS5 proxy
- self.socket = transports_nb.NBSOCKS5ProxySocket(
- on_disconnect=self.on_disconnect,
- proxy_creds=proxy_creds,
- xmpp_server=(xmpp_hostname, self.Port))
- elif type_ == 'http':
- # HTTP CONNECT to proxy
- self.socket = transports_nb.NBHTTPProxySocket(
+
+ if proxy['type'] == 'bosh':
+ self.socket = bosh.NonBlockingBOSH(
on_disconnect=self.on_disconnect,
- proxy_creds=proxy_creds,
- xmpp_server=(xmpp_hostname, self.Port))
+ raise_event = self.raise_event,
+ idlequeue = self.idlequeue,
+ xmpp_server=(xmpp_hostname, self.Port),
+ domain = self.Server,
+ bosh_dict = proxy)
+ self.protocol_type = 'BOSH'
+
else:
- # HTTP CONNECT to proxy from environment variables
- self.socket = transports_nb.NBHTTPProxySocket(
+ if proxy['type'] == 'socks5':
+ proxy_class = transports_nb.NBSOCKS5ProxySocket
+ elif proxy['type'] == 'http':
+ proxy_class = transports_nb.NBHTTPProxySocket
+ self.socket = proxy_class(
on_disconnect=self.on_disconnect,
- proxy_creds=(None, None),
+ raise_event = self.raise_event,
+ idlequeue = self.idlequeue,
+ proxy_creds=(proxy_user, proxy_pass),
xmpp_server=(xmpp_hostname, self.Port))
else:
self._on_tcp_failure = self._on_connect_failure
@@ -415,6 +408,7 @@ class NonBlockingClient(NBCommonClient):
tcp_port=self.Port
self.socket = transports_nb.NonBlockingTCP(
raise_event = self.raise_event,
+ idlequeue = self.idlequeue,
on_disconnect = self.on_disconnect)
self.socket.PlugIn(self)
diff --git a/src/common/xmpp/dispatcher_nb.py b/src/common/xmpp/dispatcher_nb.py
index 1c6f04a8b..818ae2790 100644
--- a/src/common/xmpp/dispatcher_nb.py
+++ b/src/common/xmpp/dispatcher_nb.py
@@ -42,8 +42,6 @@ XML_DECLARATION = '<?xml version=\'1.0\'?>'
# FIXME: ugly
-from client_nb import NonBlockingClient
-from bosh import BOSHClient
class Dispatcher():
# Why is this here - I needed to redefine Dispatcher for BOSH and easiest way
# was to inherit original Dispatcher (now renamed to XMPPDispatcher). Trouble
@@ -53,9 +51,9 @@ class Dispatcher():
# If having two kinds of dispatcher will go well, I will rewrite the
def PlugIn(self, client_obj, after_SASL=False):
- if isinstance(client_obj, NonBlockingClient):
+ if client_obj.protocol_type == 'XMPP':
XMPPDispatcher().PlugIn(client_obj)
- elif isinstance(client_obj, BOSHClient):
+ elif client_obj.protocol_type == 'BOSH':
BOSHDispatcher().PlugIn(client_obj, after_SASL)
@@ -76,8 +74,8 @@ class XMPPDispatcher(PlugIn):
self._exported_methods=[self.RegisterHandler, self.RegisterDefaultHandler, \
self.RegisterEventHandler, self.UnregisterCycleHandler, self.RegisterCycleHandler, \
self.RegisterHandlerOnce, self.UnregisterHandler, self.RegisterProtocol, \
- self.SendAndWaitForResponse, self.assign_id, self.StreamTerminate, \
- self.SendAndCallForResponse, self.getAnID, self.Event]
+ self.SendAndWaitForResponse, self.StreamTerminate, \
+ self.SendAndCallForResponse, self.getAnID, self.Event, self.send]
def getAnID(self):
global ID
@@ -112,10 +110,7 @@ class XMPPDispatcher(PlugIn):
self._owner.lastErrNode = None
self._owner.lastErr = None
self._owner.lastErrCode = None
- if hasattr(self._owner, 'StreamInit'):
- self._owner.StreamInit()
- else:
- self.StreamInit()
+ self.StreamInit()
def plugout(self):
''' Prepares instance to be destructed. '''
@@ -165,6 +160,7 @@ class XMPPDispatcher(PlugIn):
self.Stream.Parse(data)
# end stream:stream tag received
if self.Stream and self.Stream.has_received_endtag():
+ # FIXME call client method
self._owner.Connection.disconnect()
return 0
except ExpatError:
@@ -414,25 +410,19 @@ class XMPPDispatcher(PlugIn):
''' Put stanza on the wire and call back when recipient replies.
Additional callback arguments can be specified in args. '''
self.SendAndWaitForResponse(stanza, 0, func, args)
-
- def assign_id(self, stanza):
- ''' Assign an unique ID to stanza and return assigned ID.'''
- if type(stanza) in [type(''), type(u'')]:
- return (None, stanza)
- if not isinstance(stanza, Protocol):
- _ID=None
- elif not stanza.getID():
- global ID
- ID+=1
- _ID=`ID`
- stanza.setID(_ID)
- else:
- _ID=stanza.getID()
- if self._owner._registered_name and not stanza.getAttr('from'):
- stanza.setAttr('from', self._owner._registered_name)
- stanza.setNamespace(self._owner.Namespace)
- stanza.setParent(self._metastream)
- return (_ID, stanza)
+
+ def send(self, stanza, now=False):
+ id = None
+ if type(stanza) not in [type(''), type(u'')]:
+ if isinstance(stanza, Protocol):
+ id = stanza.getID()
+ if id is None:
+ stanza.setID(self.getAnID())
+ id = stanza.getID()
+ if self._owner._registered_name and not stanza.getAttr('from'):
+ stanza.setAttr('from', self._owner._registered_name)
+ self._owner.Connection.send(stanza, now)
+ return id
class BOSHDispatcher(XMPPDispatcher):
@@ -458,12 +448,16 @@ class BOSHDispatcher(XMPPDispatcher):
locale.getdefaultlocale()[0].split('_')[0])
self.restart = True
- self._owner.Connection.send(self._owner.get_initial_bodytag(self.after_SASL))
+ if self.after_SASL:
+ self._owner.Connection.send_http(self._owner.Connection.get_after_SASL_bodytag())
+ else:
+ self._owner.Connection.send_http(self._owner.Connection.get_initial_bodytag())
+
def StreamTerminate(self):
''' Send a stream terminator. '''
- self._owner.Connection.send(self._owner.get_closing_bodytag())
+ self._owner.Connection.send_http(self._owner.Connection.get_closing_bodytag())
def ProcessNonBlocking(self, data=None):
@@ -478,10 +472,31 @@ class BOSHDispatcher(XMPPDispatcher):
def dispatch(self, stanza, session=None, direct=0):
if stanza.getName()=='body' and stanza.getNamespace()==NS_HTTP_BIND:
- self._owner.on_bodytag_attrs(stanza.getAttrs())
- #self._owner.send_empty_bodytag()
- for child in stanza.getChildren():
- XMPPDispatcher.dispatch(self, child, session, direct)
+
+ stanza_attrs = stanza.getAttrs()
+
+ if stanza_attrs.has_key('authid'):
+ # should be only in init response
+ # auth module expects id of stream in document attributes
+ self.Stream._document_attrs['id'] = stanza_attrs['authid']
+
+ if stanza_attrs.has_key('sid'):
+ # session ID should be only in init response
+ self._owner.Connection.bosh_sid = stanza_attrs['sid']
+
+ if stanza_attrs.has_key('terminate'):
+ # staznas under body still should be passed to XMPP dispatcher
+ self._owner.on_disconnect()
+
+ if stanza_attrs.has_key('error'):
+ # recoverable error
+ pass
+
+ children = stanza.getChildren()
+
+ if children:
+ for child in children:
+ XMPPDispatcher.dispatch(self, child, session, direct)
else:
XMPPDispatcher.dispatch(self, stanza, session, direct)
diff --git a/src/common/xmpp/idlequeue.py b/src/common/xmpp/idlequeue.py
index 2ca1b0bd3..66b40299f 100644
--- a/src/common/xmpp/idlequeue.py
+++ b/src/common/xmpp/idlequeue.py
@@ -15,6 +15,7 @@
import select
import logging
log = logging.getLogger('gajim.c.x.idlequeue')
+log.setLevel(logging.DEBUG)
class IdleObject:
''' base class for all idle listeners, these are the methods, which are called from IdleQueue
@@ -36,7 +37,7 @@ class IdleObject:
pass
def read_timeout(self):
- ''' called when timeout has happend '''
+ ''' called when timeout happened '''
pass
class IdleQueue:
@@ -55,7 +56,8 @@ class IdleQueue:
self.selector = select.poll()
def remove_timeout(self, fd):
- log.debug('read timeout removed for fd %s' % fd)
+ #log.debug('read timeout removed for fd %s' % fd)
+ print 'read timeout removed for fd %s' % fd
if self.read_timeouts.has_key(fd):
del(self.read_timeouts[fd])
@@ -71,11 +73,13 @@ class IdleQueue:
def set_read_timeout(self, fd, seconds):
''' set a new timeout, if it is not removed after 'seconds',
then obj.read_timeout() will be called '''
- log.debug('read timeout set for fd %s on %s seconds' % (fd, seconds))
+ #log.debug('read timeout set for fd %s on %s seconds' % (fd, seconds))
+ print 'read timeout set for fd %s on %s seconds' % (fd, seconds)
timeout = self.current_time() + seconds
self.read_timeouts[fd] = timeout
def check_time_events(self):
+ print 'check time evs'
current_time = self.current_time()
for fd, timeout in self.read_timeouts.items():
if timeout > current_time:
@@ -134,6 +138,7 @@ class IdleQueue:
return False
if flags & 3: # waiting read event
+ #print 'waiting read on %d, flags are %d' % (fd, flags)
obj.pollin()
return True
diff --git a/src/common/xmpp/protocol.py b/src/common/xmpp/protocol.py
index 797a8e9af..c80870a33 100644
--- a/src/common/xmpp/protocol.py
+++ b/src/common/xmpp/protocol.py
@@ -300,6 +300,13 @@ class JID:
""" Produce hash of the JID, Allows to use JID objects as keys of the dictionary. """
return hash(self.__str__())
+class BOSHBody(Node):
+ '''
+ <body> tag that wraps usual XMPP stanzas in XMPP over BOSH
+ '''
+ def __init__(self, attrs={}, payload=[], node=None):
+ Node.__init__(self, tag='body', attrs=attrs, payload=payload, node=node)
+ self.setNamespace(NS_HTTP_BIND)
class Protocol(Node):
@@ -400,13 +407,6 @@ class Protocol(Node):
if item in ['to','from']: val=JID(val)
return self.setAttr(item,val)
-class BOSHBody(Protocol):
- '''
- <body> tag that wraps usual XMPP stanzas in BOSH
- '''
- def __init__(self, to=None, frm=None, attrs={}, payload=[], node=None):
- Protocol.__init__(self, name='body', to=to, frm=frm, attrs=attrs,
- payload=payload, xmlns=NS_HTTP_BIND, node=node)
class Message(Protocol):
""" XMPP Message stanza - "push" mechanism."""
diff --git a/src/common/xmpp/transports_nb.py b/src/common/xmpp/transports_nb.py
index a4e35656f..e30a8aa90 100644
--- a/src/common/xmpp/transports_nb.py
+++ b/src/common/xmpp/transports_nb.py
@@ -3,7 +3,7 @@
##
## Copyright (C) 2003-2004 Alexey "Snake" Nezhdanov
## modified by Dimitur Kirov <dkirov@gmail.com>
-## modified by Dimitur Kirov <dkirov@gmail.com>
+## modified by Tomas Karasek <tom.to.the.k@gmail.com>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
@@ -45,6 +45,34 @@ def urisplit(uri):
proto, host, path = grouped[1], grouped[3], grouped[4]
return proto, host, path
+def get_proxy_data_from_dict(proxy):
+ type = proxy['type']
+ # with http-connect/socks5 proxy, we do tcp connecting to the proxy machine
+ tcp_host, tcp_port = proxy['host'], proxy['port']
+ if type == 'bosh':
+ # in ['host'] is whole URI
+ tcp_host = urisplit(proxy['host'])[1]
+ # in BOSH, client connects to Connection Manager instead of directly to
+ # XMPP server ((hostname, port)). If HTTP Proxy is specified, client connects
+ # to HTTP proxy and Connection Manager is specified at URI and Host header
+ # in HTTP message
+ if proxy.has_key('proxy_host') and proxy.has_key('proxy_port'):
+ tcp_host, tcp_port = proxy['proxy_host'], proxy['proxy_port']
+
+ # user and pass for socks5/http_connect proxy. In case of BOSH, it's user and
+ # pass for http proxy - If there's no proxy_host they won't be used
+ if proxy.has_key('user'):
+ proxy_user = proxy['user']
+ else:
+ proxy_user = None
+ if proxy.has_key('pass'):
+ proxy_pass = proxy['pass']
+ else:
+ proxy_pass = None
+ return tcp_host, tcp_port, proxy_user, proxy_pass
+
+
+
# timeout to connect to the server socket, it doesn't include auth
CONNECT_TIMEOUT_SECONDS = 30
@@ -63,62 +91,72 @@ DATA_SENT='DATA SENT'
DISCONNECTED ='DISCONNECTED'
CONNECTING ='CONNECTING'
CONNECTED ='CONNECTED'
-DISCONNECTING ='DISCONNECTING'
-
-
+# transports have different constructor and same connect
class NonBlockingTransport(PlugIn):
- def __init__(self, raise_event, on_disconnect):
+ def __init__(self, raise_event, on_disconnect, idlequeue):
PlugIn.__init__(self)
self.raise_event = raise_event
self.on_disconnect = on_disconnect
self.on_connect = None
self.on_connect_failure = None
- self.idlequeue = None
+ self.idlequeue = idlequeue
self.on_receive = None
self.server = None
self.port = None
self.state = DISCONNECTED
- self._exported_methods=[self.disconnect, self.onreceive]
+ self._exported_methods=[self.disconnect, self.onreceive, self.set_send_timeout,
+ self.set_timeout, self.remove_timeout]
+
+ # time to wait for SOME stanza to come and then send keepalive
+ self.sendtimeout = 0
+
+ # in case we want to something different than sending keepalives
+ self.on_timeout = None
def plugin(self, owner):
owner.Connection=self
- self.idlequeue = owner.idlequeue
-
def plugout(self):
self._owner.Connection = None
self._owner = None
def connect(self, conn_5tuple, on_connect, on_connect_failure):
+ '''
+ connect method should have the same declaration in all derived transports
+
+ '''
+ assert(self.state == DISCONNECTED)
self.on_connect = on_connect
self.on_connect_failure = on_connect_failure
(self.server, self.port) = conn_5tuple[4][:2]
- log.info('NonBlocking Connect :: About tot connect to %s:%s' % (self.server, self.port))
+ self.conn_5tuple = conn_5tuple
+ log.info('NonBlocking Connect :: About to connect to %s:%s' % (self.server, self.port))
def set_state(self, newstate):
- assert(newstate in [DISCONNECTED, CONNECTING, CONNECTED, DISCONNECTING])
- if (self.state, newstate) in [(CONNECTING, DISCONNECTING), (DISCONNECTED, DISCONNECTING)]:
- log.info('strange move: %s -> %s' % (self.state, newstate))
+ assert(newstate in [DISCONNECTED, CONNECTING, CONNECTED])
self.state = newstate
def _on_connect(self, data):
''' preceeds call of on_connect callback '''
+ # data is reference to socket wrapper instance. We don't need it in client
+ # because
+ self.peerhost = data._sock.getsockname()
self.set_state(CONNECTED)
self.on_connect()
def _on_connect_failure(self,err_message):
''' preceeds call of on_connect_failure callback '''
- # In case of error while connecting we need to close socket
+ # In case of error while connecting we need to disconnect transport
# but we don't want to call DisconnectHandlers from client,
# thus the do_callback=False
self.disconnect(do_callback=False)
self.on_connect_failure(err_message=err_message)
def send(self, raw_data, now=False):
- if self.state not in [CONNECTED, DISCONNECTING]:
+ if self.state not in [CONNECTED]:
# FIXME better handling needed
log.error('Trying to send %s when transport is %s.' %
(raw_data, self.state))
@@ -139,24 +177,49 @@ class NonBlockingTransport(PlugIn):
else:
self.on_receive = None
return
- log.info('setting onreceive on %s' % recv_handler)
self.on_receive = recv_handler
def tcp_connection_started(self):
self.set_state(CONNECTING)
# on_connect/on_conn_failure will be called from self.pollin/self.pollout
+ def read_timeout(self):
+ if self.on_timeout:
+ self.on_timeout()
+ self.renew_send_timeout()
+
+ def renew_send_timeout(self):
+ if self.on_timeout and self.sendtimeout > 0:
+ self.set_timeout(self.sendtimeout)
+ else:
+ self.remove_timeout()
+
+ def set_timeout(self, timeout):
+ self.idlequeue.set_read_timeout(self.get_fd(), timeout)
+
+ def get_fd(self):
+ pass
+
+ def remove_timeout(self):
+ self.idlequeue.remove_timeout(self.get_fd())
+
+ def set_send_timeout(self, timeout, on_timeout):
+ self.sendtimeout = timeout
+ if self.sendtimeout > 0:
+ self.on_timeout = on_timeout
+ else:
+ self.on_timeout = None
class NonBlockingTCP(NonBlockingTransport, IdleObject):
'''
Non-blocking TCP socket wrapper
'''
- def __init__(self, raise_event, on_disconnect):
+ def __init__(self, raise_event, on_disconnect, idlequeue):
'''
Class constructor.
'''
- NonBlockingTransport.__init__(self, raise_event, on_disconnect)
+ NonBlockingTransport.__init__(self, raise_event, on_disconnect, idlequeue)
# writable, readable - keep state of the last pluged flags
# This prevents replug of same object with the same flags
self.writable = True
@@ -165,23 +228,16 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
# queue with messages to be send
self.sendqueue = []
- # time to wait for SOME stanza to come and then send keepalive
- self.sendtimeout = 0
-
- # in case we want to something different than sending keepalives
- self.on_timeout = None
-
# bytes remained from the last send message
self.sendbuff = ''
- self._exported_methods=[self.disconnect, self.onreceive, self.set_send_timeout,
- self.set_timeout, self.remove_timeout]
def get_fd(self):
try:
tmp = self._sock.fileno()
return tmp
- except:
+ except socket.error, (errnum, errstr):
+ log.error('Trying to get file descriptor of not-connected socket: %s' % errstr )
return 0
def connect(self, conn_5tuple, on_connect, on_connect_failure):
@@ -205,6 +261,7 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
self._recv = self._sock.recv
self.fd = self._sock.fileno()
self.idlequeue.plug_idle(self, True, False)
+ self.peerhost = None
errnum = 0
''' variable for errno symbol that will be found from exception raised from connect() '''
@@ -221,11 +278,11 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK):
# connecting in progress
- log.info('After connect. "%s" raised => CONNECTING' % errstr)
+ log.info('After NB connect() of %s. "%s" raised => CONNECTING' % (id(self),errstr))
self.tcp_connection_started()
return
elif errnum in (0, 10056, errno.EISCONN):
- # already connected - this branch is very unlikely, nonblocking connect() will
+ # already connected - this branch is probably useless, nonblocking connect() will
# return EINPROGRESS exception in most cases. When here, we don't need timeout
# on connected descriptor and success callback can be called.
log.info('After connect. "%s" raised => CONNECTED' % errstr)
@@ -240,6 +297,7 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
def _on_connect(self, data):
''' with TCP socket, we have to remove send-timeout '''
self.idlequeue.remove_timeout(self.get_fd())
+
NonBlockingTransport._on_connect(self, data)
@@ -253,6 +311,7 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
log.info('pollout called, state == %s' % self.state)
if self.state==CONNECTING:
+ log.info('%s socket wrapper connected' % id(self))
self._on_connect(self)
return
self._do_send()
@@ -288,30 +347,17 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
self._on_connect_failure('Error during connect to %s:%s' %
(self.server, self.port))
else:
- if self.on_timeout:
- self.on_timeout()
- self.renew_send_timeout()
+ NonBlockingTransport.read_timeout(self)
- def renew_send_timeout(self):
- if self.on_timeout and self.sendtimeout > 0:
- self.set_timeout(self.sendtimeout)
- else:
- self.remove_timeout()
- def set_send_timeout(self, timeout, on_timeout):
- self.sendtimeout = timeout
- if self.sendtimeout > 0:
- self.on_timeout = on_timeout
- else:
- self.on_timeout = None
def set_timeout(self, timeout):
if self.state in [CONNECTING, CONNECTED] and self.get_fd() > 0:
- self.idlequeue.set_read_timeout(self.get_fd(), timeout)
+ NonBlockingTransport.set_timeout(self, timeout)
def remove_timeout(self):
if self.get_fd():
- self.idlequeue.remove_timeout(self.get_fd())
+ NonBlockingTransport.remove_timeout(self)
def send(self, raw_data, now=False):
'''Append raw_data to the queue of messages to be send.
@@ -415,46 +461,50 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
else:
# This should never happen, so we need the debug. (If there is no handler
# on receive spacified, data are passed to Dispatcher.ProcessNonBlocking)
- log.error('SOCKET Unhandled data received: %s' % received)
+ log.error('SOCKET %s Unhandled data received: %s' % (id(self), received))
+ import traceback
+ traceback.print_stack()
self.disconnect()
def _on_receive(self,data):
- '''Preceeds passing received data to Client class. Gets rid of HTTP headers
- and checks them.'''
+ ''' preceeds on_receive callback. It peels off and checks HTTP headers in
+ class, in here it just calls the callback.'''
self.on_receive(data)
-
class NonBlockingHTTP(NonBlockingTCP):
'''
Socket wrapper that cretes HTTP message out of sent data and peels-off
HTTP headers from incoming messages
'''
- def __init__(self, raise_event, on_disconnect, http_uri, http_port, http_version=None):
+ def __init__(self, raise_event, on_disconnect, idlequeue, on_http_request_possible,
+ http_uri, http_port, http_version='HTTP/1.1'):
+
self.http_protocol, self.http_host, self.http_path = urisplit(http_uri)
if self.http_protocol is None:
self.http_protocol = 'http'
if self.http_path == '':
http_path = '/'
self.http_port = http_port
- if http_version:
- self.http_version = http_version
- else:
- self.http_version = 'HTTP/1.1'
+ self.http_version = http_version
# buffer for partial responses
self.recvbuff = ''
self.expected_length = 0
- NonBlockingTCP.__init__(self, raise_event, on_disconnect)
+ self.pending_requests = 0
+ self.on_http_request_possible = on_http_request_possible
+ NonBlockingTCP.__init__(self, raise_event, on_disconnect, idlequeue)
def send(self, raw_data, now=False):
NonBlockingTCP.send(
self,
self.build_http_message(raw_data),
now)
+ self.pending_requests += 1
+
def _on_receive(self,data):
- '''Preceeds passing received data to Client class. Gets rid of HTTP headers
+ '''Preceeds passing received data to owner class. Gets rid of HTTP headers
and checks them.'''
if not self.recvbuff:
# recvbuff empty - fresh HTTP message was received
@@ -470,7 +520,8 @@ class NonBlockingHTTP(NonBlockingTCP):
if self.expected_length > len(self.recvbuff):
# If we haven't received the whole HTTP mess yet, let's end the thread.
- # It will be finnished from one of following poll calls on plugged socket.
+ # It will be finnished from one of following polls (io_watch) on plugged socket.
+ log.info('not enough bytes - %d expected, %d got' % (self.expected_length, len(self.recvbuff)))
return
# FIXME the reassembling doesn't work - Connection Manager on jabbim.cz
@@ -481,8 +532,13 @@ class NonBlockingHTTP(NonBlockingTCP):
self.recvbuff=''
self.expected_length=0
+ self.pending_requests -= 1
+ assert(self.pending_requests >= 0)
+ # not-persistent connections
+ self.disconnect(do_callback = False)
self.on_receive(httpbody)
-
+ self.on_http_request_possible()
+
def build_http_message(self, httpbody, method='POST'):
'''
@@ -512,7 +568,7 @@ class NonBlockingHTTP(NonBlockingTCP):
message = message.replace('\r','')
(header, httpbody) = message.split('\n\n',1)
header = header.split('\n')
- statusline = header[0].split(' ')
+ statusline = header[0].split(' ',2)
header = header[1:]
headers = {}
for dummy in header:
@@ -521,16 +577,16 @@ class NonBlockingHTTP(NonBlockingTCP):
return (statusline, headers, httpbody)
-
class NBProxySocket(NonBlockingTCP):
'''
Interface for proxy socket wrappers - when tunnneling XMPP over proxies,
some connecting process usually has to be done before opening stream.
'''
- def __init__(self, raise_event, on_disconnect, xmpp_server, proxy_creds=(None,None)):
+ def __init__(self, raise_event, on_disconnect, idlequeue, xmpp_server,
+ proxy_creds=(None,None)):
self.proxy_user, self.proxy_pass = proxy_creds
self.xmpp_server = xmpp_server
- NonBlockingTCP.__init__(self, raise_event, on_disconnect)
+ NonBlockingTCP.__init__(self, raise_event, on_disconnect, idlequeue)
def connect(self, conn_5tuple, on_connect, on_connect_failure):
@@ -552,7 +608,6 @@ class NBProxySocket(NonBlockingTCP):
pass
-
class NBHTTPProxySocket(NBProxySocket):
''' This class can be used instead of NonBlockingTCP
HTTP (CONNECT) proxy connection class. Allows to use HTTP proxies like squid with
diff --git a/test/test_nonblockingtcp.py b/test/test_nonblockingtcp.py
index 7987d3278..cca0b0a26 100644
--- a/test/test_nonblockingtcp.py
+++ b/test/test_nonblockingtcp.py
@@ -1,5 +1,5 @@
'''
-Unit test for NonBlockingTcp tranport.
+Unit test for NonBlockingTCP tranport.
'''
import unittest
@@ -38,7 +38,7 @@ class MockClient(IdleMock):
IdleMock.__init__(self)
def do_connect(self):
- self.socket=transports_nb.NonBlockingTcp(
+ self.socket=transports_nb.NonBlockingTCP(
on_disconnect=lambda: self.on_success(mode='SocketDisconnect')
)
@@ -73,7 +73,7 @@ class MockClient(IdleMock):
-class TestNonBlockingTcp(unittest.TestCase):
+class TestNonBlockingTCP(unittest.TestCase):
def setUp(self):
self.idlequeue_thread = IdleQueueThread()
self.idlequeue_thread.start()
@@ -100,6 +100,6 @@ class TestNonBlockingTcp(unittest.TestCase):
if __name__ == '__main__':
- suite = unittest.TestLoader().loadTestsFromTestCase(TestNonBlockingTcp)
+ suite = unittest.TestLoader().loadTestsFromTestCase(TestNonBlockingTCP)
unittest.TextTestRunner(verbosity=2).run(suite)