Welcome to mirror list, hosted at ThFree Co, Russian Federation.

dev.gajim.org/gajim/python-nbxmpp.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'nbxmpp/transports.py')
-rw-r--r--nbxmpp/transports.py848
1 files changed, 0 insertions, 848 deletions
diff --git a/nbxmpp/transports.py b/nbxmpp/transports.py
deleted file mode 100644
index 901bc51..0000000
--- a/nbxmpp/transports.py
+++ /dev/null
@@ -1,848 +0,0 @@
-## transports.py
-##
-## Copyright (C) 2003-2004 Alexey "Snake" Nezhdanov
-## modified by Dimitur Kirov <dkirov@gmail.com>
-## modified by Tomas Karasek <tom.to.the.k@gmail.com>
-##
-## This program is free software; you can redistribute it and/or modify
-## it under the terms of the GNU General Public License as published by
-## the Free Software Foundation; either version 2, or (at your option)
-## any later version.
-##
-## This program is distributed in the hope that it will be useful,
-## but WITHOUT ANY WARRANTY; without even the implied warranty of
-## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-## GNU General Public License for more details.
-
-"""
-Transports are objects responsible for connecting to XMPP server and putting
-data to wrapped sockets in in desired form (SSL, TLS, TCP, for HTTP proxy,
-for SOCKS5 proxy...)
-
-Transports are not aware of XMPP stanzas and only responsible for low-level
-connection handling.
-"""
-
-import socket
-import errno
-import time
-import traceback
-import base64
-import sys
-import locale
-import logging
-from urllib.parse import urlparse
-
-from .plugin import PlugIn
-from .idlequeue import IdleObject
-from . import proxy_connectors
-from . import tls
-
-log = logging.getLogger('nbxmpp.transports')
-
-def urisplit(uri):
- """
- Function for splitting URI string to tuple (protocol, host, port, path).
- e.g. urisplit('http://httpcm.jabber.org:123/webclient') returns ('http',
- 'httpcm.jabber.org', 123, '/webclient') return 443 as default port if proto
- is https else 80
- """
- splitted = urlparse(uri)
- proto, host, path = splitted.scheme, splitted.hostname, splitted.path
- try:
- port = splitted.port
- except ValueError:
- log.warning('port cannot be extracted from BOSH URL %s, using default port' \
- % uri)
- port = ''
- if not port:
- if proto == 'https':
- port = 443
- else:
- port = 80
- return proto, host, port, path
-
-def get_proxy_data_from_dict(proxy):
- tcp_host, tcp_port, proxy_user, proxy_pass = None, None, None, None
- proxy_type = proxy['type']
- if proxy_type == 'bosh' and not proxy['bosh_useproxy']:
- # with BOSH not over proxy we have to parse the hostname from BOSH URI
- proto, tcp_host, tcp_port, path = urisplit(proxy['bosh_uri'])
- else:
- # with proxy!=bosh or with bosh over HTTP proxy we're connecting to proxy
- # machine
- tcp_host, tcp_port = proxy['host'], proxy['port']
- if proxy.get('useauth', False):
- proxy_user, proxy_pass = proxy['user'], proxy['pass']
- return tcp_host, tcp_port, proxy_user, proxy_pass
-
-def decode_py2(string, encoding):
- # decodes string into unicode if in py2
- # py3 has unicode strings by default
- try:
- string = string.decode(encoding)
- except AttributeError:
- pass
- return string
-
-#: timeout to connect to the server socket, it doesn't include auth
-CONNECT_TIMEOUT_SECONDS = 30
-
-#: how long to wait for a disconnect to complete
-DISCONNECT_TIMEOUT_SECONDS = 5
-
-#: size of the buffer which reads data from server
-# if lower, more stanzas will be fragmented and processed twice
-RECV_BUFSIZE = 32768 # 2x maximum size of ssl packet, should be plenty
-# it's inefficient but should work. Problem is that connect machine makes wrong
-# assumptions and that we only check for pending data in sockets but not in SSL
-# buffer...
-
-DATA_RECEIVED = 'DATA RECEIVED'
-DATA_SENT = 'DATA SENT'
-DATA_ERROR = 'DATA ERROR'
-
-DISCONNECTED = 'DISCONNECTED'
-DISCONNECTING = 'DISCONNECTING'
-CONNECTING = 'CONNECTING'
-PROXY_CONNECTING = 'PROXY_CONNECTING'
-CONNECTED = 'CONNECTED'
-STATES = (DISCONNECTED, CONNECTING, PROXY_CONNECTING, CONNECTED, DISCONNECTING)
-
-class NonBlockingTransport(PlugIn):
- """
- Abstract class representing a transport
-
- Subclasses CAN have different constructor signature but connect method SHOULD
- be the same.
- """
-
- def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls,
- certs, tls_version, cipher_list):
- """
- Each trasport class can have different constructor but it has to have at
- least all the arguments of NonBlockingTransport constructor
-
- :param raise_event: callback for monitoring of sent and received data
- :param on_disconnect: callback called on disconnection during runtime
- :param idlequeue: processing idlequeue
- :param estabilish_tls: boolean whether to estabilish TLS connection
- after TCP connection is done
- :param certs: tuple of (cacerts, mycerts) see constructor
- of tls.NonBlockingTLS for more details
- :param tls_version: The lowest supported TLS version.
- :param cipher_list: list of ciphers used to connect to server
- """
- PlugIn.__init__(self)
- self.raise_event = raise_event
- self.on_disconnect = on_disconnect
- self.on_connect = None
- self.on_connect_failure = None
- self.idlequeue = idlequeue
- self.on_receive = None
- self.server = None
- self.port = None
- self.conn_5tuple = None
- self.set_state(DISCONNECTED)
- self.estabilish_tls = estabilish_tls
- self.certs = certs
- self.tls_version = tls_version
- self.cipher_list = cipher_list
- # type of used ssl lib (if any) will be assigned to this member var
- self.ssl_lib = None
- self._exported_methods=[self.onreceive, self.set_send_timeout,
- self.set_send_timeout2, self.set_timeout, self.remove_timeout,
- self.start_disconnect]
-
- # time to wait for SOME stanza to come and then send keepalive
- self.sendtimeout = 0
-
- # in case we want to something different than sending keepalives
- self.on_timeout = None
- self.on_timeout2 = None
-
- def plugin(self, owner):
- owner.Connection = self
-
- def plugout(self):
- self._owner.Connection = None
- self._owner = None
- self.disconnect(do_callback=False)
-
- def connect(self, conn_5tuple, on_connect, on_connect_failure):
- """
- Creates and connects transport to server and port defined in conn_5tuple
- which should be item from list returned from getaddrinfo
-
- :param conn_5tuple: 5-tuple returned from getaddrinfo
- :param on_connect: callback called on successful connect to the server
- :param on_connect_failure: callback called on failure when connecting
- """
- self.on_connect = on_connect
- self.on_connect_failure = on_connect_failure
- self.server, self.port = conn_5tuple[4][:2]
- self.conn_5tuple = conn_5tuple
-
- def set_state(self, newstate):
- assert(newstate in STATES)
- self.state = newstate
-
- def get_state(self):
- return self.state
-
- def _on_connect(self):
- """
- Preceeds call of on_connect callback
- """
- # data is reference to socket wrapper instance. We don't need it in client
- # because
- self.set_state(CONNECTED)
- self.on_connect()
-
- def _on_connect_failure(self, err_message):
- """
- Preceeds call of on_connect_failure callback
- """
- # In case of error while connecting we need to disconnect transport
- # but we don't want to call DisconnectHandlers from client,
- # thus the do_callback=False
- self.disconnect(do_callback=False)
- self.on_connect_failure(err_message=err_message)
-
- def send(self, raw_data, now=False):
- if self.get_state() == DISCONNECTED:
- log.error('Unable to send %s \n because state is %s.' %
- (raw_data, self.get_state()))
-
- def disconnect(self, do_callback=True):
- self.set_state(DISCONNECTED)
- if do_callback:
- # invoke callback given in __init__
- self.on_disconnect()
-
- def onreceive(self, recv_handler):
- """
- Set the on_receive callback.
-
- onreceive(None) sets callback to Dispatcher.ProcessNonBlocking which is
- the default one that will decide what to do with received stanza based on
- its tag name and namespace.
-
- Do not confuse it with on_receive() method, which is the callback
- itself.
- """
- if not recv_handler:
- if hasattr(self, '_owner') and hasattr(self._owner, 'Dispatcher'):
- self.on_receive = self._owner.Dispatcher.ProcessNonBlocking
- else:
- log.warning('No Dispatcher plugged. Received data will not be processed')
- self.on_receive = None
- return
- self.on_receive = recv_handler
-
- def _tcp_connecting_started(self):
- self.set_state(CONNECTING)
-
- def read_timeout(self):
- """
- Called when there's no response from server in defined timeout
- """
- if self.on_timeout:
- self.on_timeout()
- self.renew_send_timeout()
-
- def read_timeout2(self):
- """
- called when there's no response from server in defined timeout
- """
- if self.on_timeout2:
- self.on_timeout2()
- self.renew_send_timeout2()
-
- def renew_send_timeout(self):
- if self.on_timeout and self.sendtimeout > 0:
- self.set_timeout(self.sendtimeout)
-
- def renew_send_timeout2(self):
- if self.on_timeout2 and self.sendtimeout2 > 0:
- self.set_timeout2(self.sendtimeout2)
-
- def set_timeout(self, timeout):
- self.idlequeue.set_read_timeout(self.fd, timeout)
-
- def set_timeout2(self, timeout2):
- self.idlequeue.set_read_timeout(self.fd, timeout2, self.read_timeout2)
-
- def get_fd(self):
- pass
-
- def remove_timeout(self):
- self.idlequeue.remove_timeout(self.fd)
-
- def set_send_timeout(self, timeout, on_timeout):
- self.sendtimeout = timeout
- if self.sendtimeout > 0:
- self.on_timeout = on_timeout
- else:
- self.on_timeout = None
-
- def set_send_timeout2(self, timeout2, on_timeout2):
- self.sendtimeout2 = timeout2
- if self.sendtimeout2 > 0:
- self.on_timeout2 = on_timeout2
- else:
- self.on_timeout2 = None
-
- # FIXME: where and why does this need to be called
- def start_disconnect(self):
- self.set_state(DISCONNECTING)
- if hasattr(self._owner, 'Smacks'):
- self._owner.Smacks.send_closing_ack()
-
-
-class NonBlockingTCP(NonBlockingTransport, IdleObject):
- """
- Non-blocking TCP socket wrapper
-
- It is used for simple XMPP connection. Can be connected via proxy and can
- estabilish TLS connection.
- """
- def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls,
- certs, tls_version, cipher_list, alpn, proxy_dict=None):
- """
- :param proxy_dict: dictionary with proxy data as loaded from config file
- """
- NonBlockingTransport.__init__(self, raise_event, on_disconnect,
- idlequeue, estabilish_tls, certs, tls_version, cipher_list)
- IdleObject.__init__(self)
-
- # queue with messages to be send
- self.sendqueue = []
-
- # bytes remained from the last send message
- self.sendbuff = ''
- self.sent_bytes_buff = b''
-
- # bytes remained from the last received message
- self.received_bytes_buff = b''
-
- self.proxy_dict = proxy_dict
- self.on_remote_disconnect = self.disconnect
-
- # ssl variables
- self.ssl_certificate = None
- # first ssl error
- self.ssl_errnum = 0
- # all ssl errors
- self.ssl_errors = []
-
- self.alpn = alpn
-
- # FIXME: transport should not be aware xmpp
- def start_disconnect(self):
- NonBlockingTransport.start_disconnect(self)
- self.send('</stream:stream>', now=True)
- self.disconnect()
-
- def connect(self, conn_5tuple, on_connect, on_connect_failure):
- NonBlockingTransport.connect(self, conn_5tuple, on_connect,
- on_connect_failure)
- log.info('NonBlockingTCP Connect :: About to connect to %s:%s' %
- (self.server, self.port))
-
- try:
- self._sock = socket.socket(*conn_5tuple[:3])
- except socket.error as e:
- self._on_connect_failure('NonBlockingTCP Connect: Error while creating\
- socket: %s' % str(e))
- return
-
- self._send = self._sock.send
- self._recv = self._sock.recv
- self.fd = self._sock.fileno()
-
- # we want to be notified when send is possible to connected socket because
- # it means the TCP connection is estabilished
- self._plug_idle(writable=True, readable=False)
- self.peerhost = None
-
- # variable for errno symbol that will be found from exception raised
- # from connect()
- errnum = 0
- errstr = str()
-
- # set timeout for TCP connecting - if nonblocking connect() fails, pollend
- # is called. If if succeeds pollout is called.
- self.idlequeue.set_read_timeout(self.fd, CONNECT_TIMEOUT_SECONDS)
-
- try:
- self._sock.setblocking(False)
- self._sock.connect((self.server, self.port))
- except Exception as exc:
- errnum, errstr = exc.errno, \
- decode_py2(exc.strerror, locale.getpreferredencoding())
-
- if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK):
- # connecting in progress
- log.info('After NB connect() of %s. "%s" raised => CONNECTING' %
- (id(self), errstr))
- self._tcp_connecting_started()
- return
-
- # if there was some other exception, call failure callback and unplug
- # transport which will also remove read_timeouts for descriptor
- self._on_connect_failure('Exception while connecting to %s:%s - %s %s' %
- (self.server, self.port, errnum, errstr))
-
- def _connect_to_proxy(self):
- self.set_state(PROXY_CONNECTING)
- if self.proxy_dict['type'] == 'socks5':
- proxyclass = proxy_connectors.SOCKS5Connector
- elif self.proxy_dict['type'] == 'http' :
- proxyclass = proxy_connectors.HTTPCONNECTConnector
- proxyclass.get_instance(
- send_method=self.send,
- onreceive=self.onreceive,
- old_on_receive=self.on_receive,
- on_success=self._on_connect,
- on_failure=self._on_connect_failure,
- xmpp_server=self.proxy_dict['xmpp_server'],
- proxy_creds=self.proxy_dict['credentials'])
-
- def _on_connect(self):
- """
- Preceed invoking of on_connect callback. TCP connection is already
- estabilished by this time
- """
- if self.estabilish_tls:
- self.tls_init(
- on_succ = lambda: NonBlockingTransport._on_connect(self),
- on_fail = lambda: self._on_connect_failure(
- 'error while estabilishing TLS'))
- else:
- NonBlockingTransport._on_connect(self)
-
- def tls_init(self, on_succ, on_fail):
- """
- Estabilishes TLS/SSL using this TCP connection by plugging a
- NonBlockingTLS module
- """
- cacerts, mycerts = self.certs
- result = tls.NonBlockingTLS.get_instance(cacerts, mycerts,
- self.tls_version, self.cipher_list, self.alpn).PlugIn(self)
- if result:
- on_succ()
- else:
- on_fail()
-
- def pollin(self):
- """
- Called by idlequeu when receive on plugged socket is possible
- """
- log.info('pollin called, state == %s' % self.get_state())
- self._do_receive()
-
- def pollout(self):
- """
- Called by idlequeu when send to plugged socket is possible
- """
- log.info('pollout called, state == %s' % self.get_state())
-
- if self.get_state() == CONNECTING:
- log.info('%s socket wrapper connected' % id(self))
- self.idlequeue.remove_timeout(self.fd)
- self._plug_idle(writable=False, readable=False)
- self.peerhost = self._sock.getsockname()
- if self.proxy_dict:
- self._connect_to_proxy()
- else:
- self._on_connect()
- elif self.get_state() != DISCONNECTED:
- self._do_send()
-
- def pollend(self):
- """
- Called by idlequeue on TCP connection errors
- """
- log.info('pollend called, state == %s' % self.get_state())
-
- if self.get_state() == CONNECTING:
- self._on_connect_failure('Error during connect to %s:%s' %
- (self.server, self.port))
- else:
- self.disconnect()
-
- def disconnect(self, do_callback=True):
- if self.get_state() == DISCONNECTED:
- return
- self.set_state(DISCONNECTED)
- self.idlequeue.unplug_idle(self.fd)
- if 'NonBlockingTLS' in self.__dict__:
- self.NonBlockingTLS.PlugOut()
- try:
- self._sock.shutdown(socket.SHUT_RDWR)
- self._sock.close()
- except socket.error as e:
- errstr = decode_py2(e.strerror, locale.getpreferredencoding())
- log.info('Error while disconnecting socket: %s' % errstr)
- self.fd = -1
- NonBlockingTransport.disconnect(self, do_callback)
-
- def read_timeout(self):
- log.info('read_timeout called, state == %s' % self.get_state())
- if self.get_state() == CONNECTING:
- # if read_timeout is called during connecting, connect() didn't end yet
- # thus we have to call the tcp failure callback
- self._on_connect_failure('Error during connect to %s:%s' %
- (self.server, self.port))
- else:
- NonBlockingTransport.read_timeout(self)
-
- def set_timeout(self, timeout):
- if self.get_state() != DISCONNECTED and self.fd != -1:
- NonBlockingTransport.set_timeout(self, timeout)
- else:
- log.warning('set_timeout: TIMEOUT NOT SET: state is %s, fd is %s' %
- (self.get_state(), self.fd))
-
- def remove_timeout(self):
- if self.fd:
- NonBlockingTransport.remove_timeout(self)
- else:
- log.warning('remove_timeout: no self.fd state is %s' % self.get_state())
-
- def send(self, raw_data, now=False):
- """
- Append raw_data to the queue of messages to be send. If supplied data is
- unicode string, encode it to utf-8.
- """
- NonBlockingTransport.send(self, raw_data, now)
-
- if isinstance(raw_data, bytes):
- r = raw_data
- else:
- r = self.encode_stanza(raw_data)
-
- if now:
- self.sendqueue.insert(0, r)
- self._do_send()
- else:
- self.sendqueue.append(r)
-
- self._plug_idle(writable=True, readable=True)
-
- def encode_stanza(self, stanza):
- """
- Encode str or unicode to utf-8
- """
- if isinstance(stanza, str):
- stanza = stanza.encode('utf-8')
- elif not isinstance(stanza, str):
- stanza = str(stanza).encode('utf-8')
- return stanza
-
- def _plug_idle(self, writable, readable):
- """
- Plug file descriptor of socket to Idlequeue
-
- Plugged socket will be watched for "send possible" or/and "recv possible"
- events. pollin() callback is invoked on "recv possible", pollout() on
- "send_possible".
-
- Plugged socket will always be watched for "error" event - in that case,
- pollend() is called.
- """
- log.info('Plugging fd %d, W:%s, R:%s' % (self.fd, writable, readable))
- self.idlequeue.plug_idle(self, writable, readable)
-
- def _do_send(self):
- """
- Called when send() to connected socket is possible. First message from
- sendqueue will be sent
- """
- if not self.sendbuff:
- if not self.sendqueue:
- log.warning('calling send on empty buffer and queue')
- self._plug_idle(writable=False, readable=True)
- return None
- self.sendbuff = self.sendqueue.pop(0)
- try:
- send_count = self._send(self.sendbuff)
- if send_count:
- sent_data = self.sendbuff[:send_count]
- self.sendbuff = self.sendbuff[send_count:]
- self._plug_idle(writable=((self.sendqueue != []) or (len(self.sendbuff) != 0)), readable=True)
-
- if self.sent_bytes_buff:
- sent_data = self.sent_bytes_buff + sent_data
- self.sent_bytes_buff = b''
- # try to decode sent data
- try:
- sent_data = decode_py2(sent_data, 'utf-8')
- except UnicodeDecodeError:
- for i in range(-1, -4, -1):
- char = sent_data[i]
- if ord(char) & 0xc0 == 0xc0:
- self.sent_bytes_buff = sent_data[i:]
- sent_data = sent_data[:i]
- break
- sent_data = decode_py2(sent_data, 'utf-8')
- self.raise_event(DATA_SENT, sent_data)
-
- except Exception:
- log.error('_do_send:', exc_info=True)
- traceback.print_exc()
- self.disconnect()
-
- def _do_receive(self):
- """
- Reads all pending incoming data. Will call owner's disconnected() method
- if appropriate
- """
- received = None
- errnum = 0
- errstr = 'No Error Set'
-
- try:
- # get as many bites, as possible, but not more than RECV_BUFSIZE
- received = self._recv(RECV_BUFSIZE)
- except tls.SSLWrapper.Error as e:
- log.info("_do_receive, caught SSL error, got %s:" % received,
- exc_info=True)
- errnum, errstr = e.errno,\
- decode_py2(e.strerror, locale.getpreferredencoding())
- except socket.error as e:
- log.info("_do_receive: got %s:" % received, exc_info=True)
-
- if received == '':
- errstr = 'zero bytes on recv'
-
- if (self.ssl_lib is None and received == '') or \
- (self.ssl_lib == tls.PYOPENSSL and errnum == -1 ):
- # -1 in pyopenssl: errstr == Unexpected EOF
- log.info("Disconnected by remote server: #%s, %s" % (errnum, errstr))
- self.on_remote_disconnect()
- return
-
- if errnum:
- log.info("Connection to %s:%s lost: %s %s" % (self.server, self.port,
- errnum, errstr), exc_info=True)
- self.disconnect()
- return
-
- # this branch is for case of non-fatal SSL errors - None is returned from
- # recv() but no errnum is set
- if received is None:
- return
-
- # we have received some bytes, stop the timeout!
- self.remove_timeout()
- self.renew_send_timeout()
- self.renew_send_timeout2()
-
- if self.received_bytes_buff:
- received = self.received_bytes_buff + received
- self.received_bytes_buff = b''
-
- if self.state != PROXY_CONNECTING or self.proxy_dict['type'] != \
- 'socks5':
- # try to decode data
- try:
- received = decode_py2(received, 'utf-8')
- except UnicodeDecodeError:
- for i in range(-1, -4, -1):
- char = received[i]
- if char & 0xc0 == 0xc0:
- self.received_bytes_buff = received[i:]
- received = received[:i]
- break
- received = decode_py2(received, 'utf-8')
-
- # pass received data to owner
- if self.on_receive:
- self.raise_event(DATA_RECEIVED, received)
- self._on_receive(received)
- else:
- # This should never happen, so we need the debug.
- # (If there is no handler on receive specified, data is passed to
- # Dispatcher.ProcessNonBlocking)
- log.error('SOCKET %s Unhandled data received: %s' % (id(self),
- received))
- self.disconnect()
-
- def _on_receive(self, data):
- """
- Preceeds on_receive callback. It peels off and checks HTTP headers in
- HTTP classes, in here it just calls the callback
- """
- self.on_receive(data)
-
-
-class NonBlockingHTTP(NonBlockingTCP):
- """
- Socket wrapper that creates HTTP message out of sent data and peels-off HTTP
- headers from incoming messages
- """
-
- def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls,
- certs, tls_version, cipher_list, on_http_request_possible, on_persistent_fallback,
- http_dict, proxy_dict=None):
- """
- :param on_http_request_possible: method to call when HTTP request to
- socket owned by transport is possible.
- :param on_persistent_fallback: callback called when server ends TCP
- connection. It doesn't have to be fatal for HTTP session.
- :param http_dict: dictionary with data for HTTP request and headers
- """
- NonBlockingTCP.__init__(self, raise_event, on_disconnect, idlequeue,
- estabilish_tls, certs, tls_version, cipher_list, False, proxy_dict)
-
- self.http_protocol, self.http_host, self.http_port, self.http_path = \
- urisplit(http_dict['http_uri'])
- self.http_protocol = self.http_protocol or 'http'
- self.http_path = self.http_path or '/'
- self.http_version = http_dict['http_version']
- self.http_persistent = http_dict['http_persistent']
- self.add_proxy_headers = http_dict['add_proxy_headers']
-
- if 'proxy_user' in http_dict and 'proxy_pass' in http_dict:
- self.proxy_user, self.proxy_pass = http_dict['proxy_user'], http_dict[
- 'proxy_pass']
- else:
- self.proxy_user, self.proxy_pass = None, None
-
- # buffer for partial responses
- self.recvbuff = ''
- self.expected_length = 0
- self.pending_requests = 0
- self.on_http_request_possible = on_http_request_possible
- self.last_recv_time = 0
- self.close_current_connection = False
- self.on_remote_disconnect = lambda: on_persistent_fallback(self)
-
- def http_send(self, raw_data, now=False):
- self.send(self.build_http_message(raw_data), now)
-
- def _on_receive(self, data):
- """
- Preceeds passing received data to owner class. Gets rid of HTTP headers
- and checks them.
- """
- if self.get_state() == PROXY_CONNECTING:
- NonBlockingTCP._on_receive(self, data)
- return
-
- # append currently received data to HTTP msg in buffer
- self.recvbuff = '%s%s' % (self.recvbuff or '', data)
- statusline, headers, httpbody, buffer_rest = self.parse_http_message(
- self.recvbuff)
-
- if not (statusline and headers and httpbody):
- log.debug('Received incomplete HTTP response')
- return
-
- if statusline[1] != '200':
- log.error('HTTP Error: %s %s' % (statusline[1], statusline[2]))
- self.disconnect()
- return
- self.expected_length = int(headers['Content-Length'])
- if 'Connection' in headers and headers['Connection'].strip()=='close':
- self.close_current_connection = True
-
- if self.expected_length > len(httpbody.encode('utf-8')):
- # If we haven't received the whole HTTP mess yet, let's end the thread.
- # It will be finnished from one of following recvs on plugged socket.
- log.info('not enough bytes in HTTP response - %d expected, got %d' %
- (self.expected_length, len(httpbody.encode('utf-8'))))
- else:
- # First part of buffer has been extraced and is going to be handled,
- # remove it from buffer
- self.recvbuff = buffer_rest
-
- # everything was received
- self.expected_length = 0
-
- if not self.http_persistent or self.close_current_connection:
- # not-persistent connections disconnect after response
- self.disconnect(do_callback=False)
- self.close_current_connection = False
- self.last_recv_time = time.time()
- self.on_receive(data=httpbody, socket=self)
- self.on_http_request_possible()
-
- def build_http_message(self, httpbody, method='POST'):
- """
- Builds http message with given body. Values for headers and status line
- fields are taken from class variables
- """
- headers = ['%s %s %s' % (method, self.http_path, self.http_version),
- 'Host: %s:%s' % (self.http_host, self.http_port),
- 'User-Agent: Gajim',
- 'Content-Type: text/xml; charset=utf-8',
- 'Content-Length: %s' % len(httpbody)]
- if self.add_proxy_headers:
- headers.append('Proxy-Connection: keep-alive')
- headers.append('Pragma: no-cache')
- if self.proxy_user and self.proxy_pass:
- credentials = '%s:%s' % (self.proxy_user, self.proxy_pass)
- credentials = base64.encodestring(credentials).strip()
- headers.append('Proxy-Authorization: Basic %s' % credentials)
- else:
- headers.append('Connection: Keep-Alive')
- headers.append('\r\n')
- headers = '\r\n'.join(headers)
- return b'%s%s' % (headers.encode('utf-8'), httpbody)
-
- def parse_http_message(self, message):
- """
- Split http message into a tuple:
- - (statusline - list of e.g. ['HTTP/1.1', '200', 'OK'],
- - headers - dictionary of headers e.g. {'Content-Length': '604',
- 'Content-Type': 'text/xml; charset=utf-8'},
- - httpbody - string with http body)
- - http_rest - what is left in the message after a full HTTP header + body
- """
- splitted = message.split('\r\n\r\n')
- if len(splitted) < 2:
- # no complete http message. Keep filling the buffer until we find one
- buffer_rest = message
- return ('', '', '', buffer_rest)
- else:
- (header, httpbody) = splitted[:2]
- header = header.replace('\r', '')
- header = header.lstrip('\n')
- header = header.split('\n')
- statusline = header[0].split(' ', 2)
- header = header[1:]
- headers = {}
- for dummy in header:
- row = dummy.split(' ', 1)
- headers[row[0][:-1]] = row[1]
- body_size = int(headers['Content-Length'])
- rest_splitted = splitted[2:]
- while (len(httpbody) < body_size) and rest_splitted:
- # Complete httpbody until it has the announced size
- httpbody = '\n\n'.join([httpbody, rest_splitted.pop(0)])
- buffer_rest = "\n\n".join(rest_splitted)
- return (statusline, headers, httpbody, buffer_rest)
-
-
-class NonBlockingHTTPBOSH(NonBlockingHTTP):
- """
- Class for BOSH HTTP connections. Slightly redefines HTTP transport by
- calling bosh bodytag generating callback before putting data on wire
- """
-
- def set_stanza_build_cb(self, build_cb):
- self.build_cb = build_cb
-
- def _do_send(self):
- if self.state == PROXY_CONNECTING:
- NonBlockingTCP._do_send(self)
- return
- if not self.sendbuff:
- stanza = self.build_cb(socket=self)
- stanza = self.encode_stanza(stanza)
- stanza = self.build_http_message(httpbody=stanza)
- self.sendbuff = stanza
- NonBlockingTCP._do_send(self)