Welcome to mirror list, hosted at ThFree Co, Russian Federation.

dev.gajim.org/gajim/python-nbxmpp.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/epydoc.conf26
-rwxr-xr-xdoc/examples/xsend.py158
-rw-r--r--nbxmpp/bind.py133
-rw-r--r--nbxmpp/bosh.py581
-rw-r--r--nbxmpp/client.py639
-rw-r--r--nbxmpp/features.py216
-rw-r--r--nbxmpp/proxy_connectors.py247
-rw-r--r--nbxmpp/roster.py367
-rw-r--r--nbxmpp/tls.py432
-rw-r--r--nbxmpp/transports.py848
10 files changed, 0 insertions, 3647 deletions
diff --git a/doc/epydoc.conf b/doc/epydoc.conf
deleted file mode 100644
index 018bffb..0000000
--- a/doc/epydoc.conf
+++ /dev/null
@@ -1,26 +0,0 @@
-[epydoc]
-
-# Information about the project.
-name: python-nbxmpp
-url: http://python-nbxmpp.gajim.org
-
-verbosity: 3
-imports: yes
-redundant-details: yes
-docformat: restructuredtext
-
-# The list of modules to document. Modules can be named using
-# dotted names, module filenames, or package directory names.
-# This option may be repeated.
-modules: nbxmpp/*
-
-# Write html output to the directory "apidocs"
-output: html
-target: doc/apidocs/
-
-# Include all automatically generated graphs. These graphs are
-# generated using Graphviz dot.
-graph: all
-dotpath: /usr/bin/dot
-graph-font: Sans
-graph-font-size: 10
diff --git a/doc/examples/xsend.py b/doc/examples/xsend.py
deleted file mode 100755
index 743f191..0000000
--- a/doc/examples/xsend.py
+++ /dev/null
@@ -1,158 +0,0 @@
-#!/usr/bin/python3
-
-import sys
-import os
-import logging
-
-import nbxmpp
-from nbxmpp.const import Realm
-from nbxmpp.const import Event
-from gi.repository import GLib
-
-if sys.platform in ('win32', 'darwin'):
- import certifi
-
-consoleloghandler = logging.StreamHandler()
-log = logging.getLogger('nbxmpp')
-log.setLevel('INFO')
-log.addHandler(consoleloghandler)
-
-if len(sys.argv) < 2:
- print("Syntax: xsend JID text")
- sys.exit(0)
-
-to_jid = sys.argv[1]
-text = ' '.join(sys.argv[2:])
-
-jidparams = {}
-if os.access(os.environ['HOME'] + '/.xsend', os.R_OK):
- for ln in open(os.environ['HOME'] + '/.xsend').readlines():
- if not ln[0] in ('#', ';'):
- key, val = ln.strip().split('=', 1)
- jidparams[key.lower()] = val
-for mandatory in ['jid', 'password']:
- if mandatory not in jidparams.keys():
- open(os.environ['HOME']+'/.xsend','w').write('#Uncomment fields before use and type in correct credentials.\n#JID=romeo@montague.net/resource (/resource is optional)\n#PASSWORD=juliet\n')
- print('Please point ~/.xsend config file to valid JID for sending messages.')
- sys.exit(0)
-
-
-class Connection:
- def __init__(self):
- self.jid = nbxmpp.protocol.JID(jidparams['jid'])
- self.password = jidparams['password']
- self.client_cert = None
- self.idle_queue = nbxmpp.idlequeue.get_idlequeue()
- self.client = None
-
- def _on_auth_successful(self):
- print('authenticated')
- self.client.bind()
-
- def _on_connection_active(self):
- print('Connection active')
- self.client.RegisterHandler('message', self._on_message)
- self.send_presence()
-
- def _on_auth_failed(self, reason, text):
- log.debug("Couldn't authenticate")
- log.error(reason, text)
- exit()
-
- def _on_message(self, con, stanza):
- print('message received')
- print(stanza.getBody())
-
- def _on_connected(self, con, con_type):
- print('connected with ' + con_type)
- self.client.auth(self.jid.getNode(),
- get_password=self._get_password,
- resource=self.jid.getResource())
-
- def _get_password(self, mech, password_cb):
- password_cb(self.password)
-
- def _on_connection_failed(self):
- print('could not connect!')
-
- def _event_dispatcher(self, realm, event, data):
- if realm == Realm.CONNECTING:
- if event == Event.AUTH_SUCCESSFUL:
- log.info(event)
- self._on_auth_successful()
-
- elif event == Event.AUTH_FAILED:
- log.error(event)
- log.error(data)
- self._on_auth_failed(*data)
-
- elif event == Event.SESSION_FAILED:
- log.error(event)
-
- elif event == Event.BIND_FAILED:
- log.error(event)
-
- elif event == Event.CONNECTION_ACTIVE:
- log.info(event)
- self._on_connection_active()
- return
-
- def connect(self):
- cacerts = ''
- if sys.platform in ('win32', 'darwin'):
- cacerts = certifi.where()
-
- self.client = nbxmpp.NonBlockingClient(self.jid.getDomain(),
- self.idle_queue,
- caller=self)
-
- self.client.connect(self._on_connected,
- self._on_connection_failed,
- secure_tuple=('tls', cacerts, '', None, None, False))
-
- if sys.platform == 'win32':
- timeout, in_seconds = 20, None
- else:
- timeout, in_seconds = 100, False
-
- if in_seconds:
- GLib.timeout_add_seconds(timeout, self.process_connections)
- else:
- GLib.timeout_add(timeout, self.process_connections)
-
- def send_presence(self):
- presence = nbxmpp.Presence()
- self.client.send(presence)
-
- def quit(self):
- self.disconnect()
- ml.quit()
-
- def disconnect(self):
- self.client.start_disconnect()
-
- def process_connections(self):
- try:
- self.idle_queue.process()
- except Exception:
- # Otherwise, an exception will stop our loop
-
- if sys.platform == 'win32':
- # On Windows process() calls select.select(), so we need this
- # executed as often as possible.
- timeout, in_seconds = 1, None
- else:
- timeout, in_seconds = 100, False
-
- if in_seconds:
- GLib.timeout_add_seconds(timeout, self.process_connections)
- else:
- GLib.timeout_add(timeout, self.process_connections)
- raise
- return True # renew timeout (loop for ever)
-
-
-con = Connection()
-con.connect()
-ml = GLib.MainLoop()
-ml.run()
diff --git a/nbxmpp/bind.py b/nbxmpp/bind.py
deleted file mode 100644
index 207fd87..0000000
--- a/nbxmpp/bind.py
+++ /dev/null
@@ -1,133 +0,0 @@
-# Copyright (C) 2003-2005 Alexey "Snake" Nezhdanov
-# Copyright (C) Dimitur Kirov <dkirov AT gmail.com>
-# Copyright (C) 2018 Philipp Hörist <philipp AT hoerist.com>
-#
-# This file is part of nbxmpp.
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; If not, see <http://www.gnu.org/licenses/>.
-
-import logging
-
-from nbxmpp.plugin import PlugIn
-from nbxmpp.protocol import NS_BIND
-from nbxmpp.protocol import NS_SESSION
-from nbxmpp.protocol import NS_STREAMS
-from nbxmpp.protocol import NS_STREAM_MGMT
-from nbxmpp.protocol import Node
-from nbxmpp.protocol import isResultNode
-from nbxmpp.protocol import Protocol
-from nbxmpp.const import Realm
-from nbxmpp.const import Event
-
-log = logging.getLogger('nbxmpp.bind')
-
-
-class NonBlockingBind(PlugIn):
- """
- Bind some JID to the current connection to allow router know of our
- location. Must be plugged after successful SASL auth
- """
-
- def __init__(self):
- PlugIn.__init__(self)
- self._session_required = False
-
- def plugin(self, _owner):
- self._owner.RegisterHandler(
- 'features', self._on_features, xmlns=NS_STREAMS)
-
- feats = self._owner.Dispatcher.Stream.features
- if feats is not None:
- if feats.getTag('bind', namespace=NS_BIND) is not None:
- # We already received the features
- self._on_features(None, feats)
-
- def _on_features(self, _con, feats):
- """
- Determine if server supports resource binding and set some internal
- attributes accordingly.
- """
- if not feats or not feats.getTag('bind', namespace=NS_BIND):
- return
-
- session = feats.getTag('session', namespace=NS_SESSION)
- if session is not None:
- if session.getTag('optional') is None:
- self._session_required = True
-
- self._bind()
-
- def plugout(self):
- """
- Remove Bind handler from owner's dispatcher. Used internally
- """
- self._owner.UnregisterHandler(
- 'features', self._on_features, xmlns=NS_STREAMS)
-
- def _bind(self):
- """
- Perform binding. Use provided resource name or random (if not provided).
- """
- log.info('Send bind')
- resource = []
- if self._owner._Resource:
- resource = [Node('resource', payload=[self._owner._Resource])]
-
- payload = Node('bind', attrs={'xmlns': NS_BIND}, payload=resource)
- node = Protocol('iq', typ='set', payload=[payload])
-
- self._owner.Dispatcher.SendAndCallForResponse(node, func=self._on_bind)
-
- def _on_bind(self, _client, stanza):
- if isResultNode(stanza):
- bind = stanza.getTag('bind')
- if bind is not None:
- jid = bind.getTagData('jid')
- log.info('Successfully bound %s', jid)
- self._owner.set_bound_jid(jid)
-
- if not self._session_required:
- # Server don't want us to initialize a session
- log.info('No session required')
- self._on_bind_successful()
- else:
- node = Node('session', attrs={'xmlns':NS_SESSION})
- iq = Protocol('iq', typ='set', payload=[node])
- self._owner.SendAndCallForResponse(
- iq, func=self._on_session)
- return
- if stanza:
- log.error('Binding failed: %s.', stanza.getTag('error'))
- else:
- log.error('Binding failed: timeout expired')
- self._owner.Connection.start_disconnect()
- self._owner.Dispatcher.Event(Realm.CONNECTING, Event.BIND_FAILED)
- self.PlugOut()
-
- def _on_session(self, _client, stanza):
- if isResultNode(stanza):
- log.info('Successfully started session')
- self._on_bind_successful()
- else:
- log.error('Session open failed')
- self._owner.Connection.start_disconnect()
- self._owner.Dispatcher.Event(Realm.CONNECTING, Event.SESSION_FAILED)
- self.PlugOut()
-
- def _on_bind_successful(self):
- feats = self._owner.Dispatcher.Stream.features
- if feats.getTag('sm', namespace=NS_STREAM_MGMT):
- self._owner.Smacks.send_enable()
- self._owner.Dispatcher.Event(Realm.CONNECTING, Event.CONNECTION_ACTIVE)
- self.PlugOut()
diff --git a/nbxmpp/bosh.py b/nbxmpp/bosh.py
deleted file mode 100644
index a6889dd..0000000
--- a/nbxmpp/bosh.py
+++ /dev/null
@@ -1,581 +0,0 @@
-## bosh.py
-##
-##
-## Copyright (C) 2008 Tomas Karasek <tom.to.the.k@gmail.com>
-##
-## This file is part of Gajim.
-##
-## Gajim is free software; you can redistribute it and/or modify
-## it under the terms of the GNU General Public License as published
-## by the Free Software Foundation; version 3 only.
-##
-## Gajim is distributed in the hope that it will be useful,
-## but WITHOUT ANY WARRANTY; without even the implied warranty of
-## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-## GNU General Public License for more details.
-##
-## You should have received a copy of the GNU General Public License
-## along with Gajim. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import binascii
-import locale
-import logging
-from hashlib import sha1
-
-from .transports import NonBlockingTransport, NonBlockingHTTPBOSH,\
- CONNECTED, CONNECTING, DISCONNECTED, DISCONNECTING,\
- urisplit, DISCONNECT_TIMEOUT_SECONDS
-from .protocol import BOSHBody, Protocol, NS_CLIENT
-
-
-log = logging.getLogger('nbxmpp.bosh')
-
-KEY_COUNT = 10
-
-# Fake file descriptor - it's used for setting read_timeout in idlequeue for
-# BOSH Transport. In TCP-derived transports this is file descriptor of socket.
-FAKE_DESCRIPTOR = -1337
-
-
-class NonBlockingBOSH(NonBlockingTransport):
- def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls,
- certs, tls_version, cipher_list, xmpp_server, domain, bosh_dict, proxy_creds):
- NonBlockingTransport.__init__(self, raise_event, on_disconnect,
- idlequeue, estabilish_tls, certs, tls_version, cipher_list)
-
- self.bosh_sid = None
-
- self.http_version = 'HTTP/1.1'
- self.http_persistent = True
- self.http_pipelining = bosh_dict['bosh_http_pipelining']
- self.bosh_to = domain
-
- self.route_host, self.route_port = xmpp_server
-
- self.bosh_wait = bosh_dict['bosh_wait']
- if not self.http_pipelining:
- self.bosh_hold = 1
- else:
- self.bosh_hold = bosh_dict['bosh_hold']
- self.bosh_requests = self.bosh_hold
- self.bosh_uri = bosh_dict['bosh_uri']
- self.bosh_content = bosh_dict['bosh_content']
- self.over_proxy = bosh_dict['bosh_useproxy']
- if estabilish_tls:
- self.bosh_secure = 'true'
- else:
- self.bosh_secure = 'false'
- self.tls_version = tls_version
- self.cipher_list = cipher_list
- self.use_proxy_auth = bosh_dict['useauth']
- self.proxy_creds = proxy_creds
- self.wait_cb_time = None
- self.http_socks = []
- self.stanza_buffer = []
- self.prio_bosh_stanzas = []
- self.current_recv_handler = None
- self.current_recv_socket = None
- self.key_stack = None
- self.ack_checker = None
- self.after_init = False
- self.proxy_dict = {}
- if self.over_proxy and self.estabilish_tls:
- self.proxy_dict['type'] = 'http'
- # with SSL over proxy, we do HTTP CONNECT to proxy to open a channel to
- # BOSH Connection Manager
- host, port = urisplit(self.bosh_uri)[1:3]
- self.proxy_dict['xmpp_server'] = (host, port)
- self.proxy_dict['credentials'] = self.proxy_creds
-
- # ssl variables
- self.ssl_certificate = None
- # first ssl error
- self.ssl_errnum = 0
- # all ssl errors
- self.ssl_errors = []
-
- def connect(self, conn_5tuple, on_connect, on_connect_failure):
- NonBlockingTransport.connect(self, conn_5tuple, on_connect, on_connect_failure)
-
- global FAKE_DESCRIPTOR
- FAKE_DESCRIPTOR -= 1
- self.fd = FAKE_DESCRIPTOR
-
- self.stanza_buffer = []
- self.prio_bosh_stanzas = []
-
- self.key_stack = KeyStack(KEY_COUNT)
- self.ack_checker = AckChecker()
- self.after_init = True
-
- self.http_socks.append(self.get_new_http_socket())
- self._tcp_connecting_started()
-
- self.http_socks[0].connect(
- conn_5tuple = conn_5tuple,
- on_connect = self._on_connect,
- on_connect_failure = self._on_connect_failure)
-
- def _on_connect(self):
- self.peerhost = self.http_socks[0].peerhost
- self.ssl_lib = self.http_socks[0].ssl_lib
- NonBlockingTransport._on_connect(self)
-
-
-
- def set_timeout(self, timeout):
- if self.get_state() != DISCONNECTED and self.fd != -1:
- NonBlockingTransport.set_timeout(self, timeout)
- else:
- log.warning('set_timeout: TIMEOUT NOT SET: state is %s, fd is %s' % (self.get_state(), self.fd))
-
- def on_http_request_possible(self):
- """
- Called when HTTP request it's possible to send a HTTP request. It can be when
- socket is connected or when HTTP response arrived
-
- There should be always one pending request to BOSH CM.
- """
- log.debug('on_http_req possible, state:\n%s' % self.get_current_state())
- if self.get_state()==DISCONNECTED: return
-
- #Hack for making the non-secure warning dialog work
- if self._owner.got_features:
- if hasattr(self._owner, 'SASL'):
- self.send_BOSH(None)
- else:
- # If we already got features and no auth module was plugged yet, we are
- # probably waiting for confirmation of the "not-secure-connection" dialog.
- # We don't send HTTP request in that case.
- # see http://lists.jabber.ru/pipermail/ejabberd/2008-August/004027.html
- return
- else:
- self.send_BOSH(None)
-
-
-
- def get_socket_in(self, state):
- """
- Get sockets in desired state
- """
- for s in self.http_socks:
- if s.get_state()==state: return s
- return None
-
-
- def get_free_socket(self):
- """
- Select and returns socket eligible for sending a data to
- """
- if self.http_pipelining:
- return self.get_socket_in(CONNECTED)
- else:
- last_recv_time, tmpsock = 0, None
- for s in self.http_socks:
- # we're interested only in CONNECTED socket with no requests pending
- if s.get_state()==CONNECTED and s.pending_requests==0:
- # if there's more of them, we want the one with the least recent data receive
- # (lowest last_recv_time)
- if (last_recv_time==0) or (s.last_recv_time < last_recv_time):
- last_recv_time = s.last_recv_time
- tmpsock = s
- if tmpsock:
- return tmpsock
- else:
- return None
-
-
- def send_BOSH(self, payload):
- """
- Tries to send a stanza in payload by appeding it to a buffer and plugging a
- free socket for writing.
- """
- total_pending_reqs = sum([s.pending_requests for s in self.http_socks])
-
- # when called after HTTP response (Payload=None) and when there are already
- # some pending requests and no data to send, or when the socket is
- # disconnected, we do nothing
- if payload is None and \
- total_pending_reqs > 0 and \
- self.stanza_buffer == [] and \
- self.prio_bosh_stanzas == [] or \
- self.get_state()==DISCONNECTED:
- return
-
- # Add xmlns to stanza to help ejabberd server
- if payload and isinstance(payload, Protocol):
- if not payload.getNamespace():
- payload.setNamespace(NS_CLIENT)
-
- # now the payload is put to buffer and will be sent at some point
- self.append_stanza(payload)
-
- # if we're about to make more requests than allowed, we don't send - stanzas will be
- # sent after HTTP response from CM, exception is when we're disconnecting - then we
- # send anyway
- if total_pending_reqs >= self.bosh_requests and self.get_state()!=DISCONNECTING:
- log.warning('attemp to make more requests than allowed by Connection Manager:\n%s' %
- self.get_current_state())
- return
-
- # when there's free CONNECTED socket, we plug it for write and the data will
- # be sent when write is possible
- if self.get_free_socket():
- self.plug_socket()
- return
-
- # if there is a connecting socket, we just wait for when it connects,
- # payload will be sent in a sec when the socket connects
- if self.get_socket_in(CONNECTING): return
-
- # being here means there are either DISCONNECTED sockets or all sockets are
- # CONNECTED with too many pending requests
- s = self.get_socket_in(DISCONNECTED)
-
- # if we have DISCONNECTED socket, lets connect it and plug for send
- if s:
- self.connect_and_flush(s)
- else:
- # otherwise create and connect a new one
- ss = self.get_new_http_socket()
- self.http_socks.append(ss)
- self.connect_and_flush(ss)
- return
-
- def plug_socket(self):
- stanza = None
- s = self.get_free_socket()
- if s:
- s._plug_idle(writable=True, readable=True)
- else:
- log.error('=====!!!!!!!!====> Couldn\'t get free socket in plug_socket())')
-
- def build_stanza(self, socket):
- """
- Build a BOSH body tag from data in buffers and adds key, rid and ack
- attributes to it
-
- This method is called from _do_send() of underlying transport. This is to
- ensure rid and keys will be processed in correct order. If I generate
- them before plugging a socket for write (and did it for two sockets/HTTP
- connections) in parallel, they might be sent in wrong order, which
- results in violating the BOSH session and server-side disconnect.
- """
- if self.prio_bosh_stanzas:
- stanza, add_payload = self.prio_bosh_stanzas.pop(0)
- if add_payload:
- stanza.setPayload(self.stanza_buffer)
- self.stanza_buffer = []
- else:
- stanza = self.boshify_stanzas(self.stanza_buffer)
- self.stanza_buffer = []
-
- stanza = self.ack_checker.backup_stanza(stanza, socket)
-
- key, newkey = self.key_stack.get()
- if key:
- stanza.setAttr('key', key)
- if newkey:
- stanza.setAttr('newkey', newkey)
-
-
- log.info('sending msg with rid=%s to sock %s' % (stanza.getAttr('rid'), id(socket)))
- self.renew_bosh_wait_timeout(self.bosh_wait + 3)
- return stanza
-
-
- def on_bosh_wait_timeout(self):
- log.error('Connection Manager didn\'t respond within %s + 3 seconds --> forcing disconnect' % self.bosh_wait)
- self.disconnect()
-
-
- def renew_bosh_wait_timeout(self, timeout):
- if self.wait_cb_time is not None:
- self.remove_bosh_wait_timeout()
- sched_time = self.idlequeue.set_alarm(self.on_bosh_wait_timeout, timeout)
- self.wait_cb_time = sched_time
-
- def remove_bosh_wait_timeout(self):
- self.idlequeue.remove_alarm(
- self.on_bosh_wait_timeout,
- self.wait_cb_time)
-
- def on_persistent_fallback(self, socket):
- """
- Called from underlying transport when server closes TCP connection
-
- :param socket: disconnected transport object
- """
- if socket.http_persistent:
- log.warning('Fallback to nonpersistent HTTP (no pipelining as well)')
- socket.http_persistent = False
- self.http_persistent = False
- self.http_pipelining = False
- socket.disconnect(do_callback=False)
- self.connect_and_flush(socket)
- else:
- socket.disconnect()
-
-
-
- def handle_body_attrs(self, stanza_attrs):
- """
- Called for each incoming body stanza from dispatcher. Checks body
- attributes.
- """
- self.remove_bosh_wait_timeout()
-
- if self.after_init:
- if 'sid' in stanza_attrs:
- # session ID should be only in init response
- self.bosh_sid = stanza_attrs['sid']
-
- if 'requests' in stanza_attrs:
- self.bosh_requests = int(stanza_attrs['requests'])
-
- if 'wait' in stanza_attrs:
- self.bosh_wait = int(stanza_attrs['wait'])
- self.after_init = False
-
- ack = None
- if 'ack' in stanza_attrs:
- ack = stanza_attrs['ack']
- self.ack_checker.process_incoming_ack(ack=ack,
- socket=self.current_recv_socket)
-
- if 'type' in stanza_attrs:
- if stanza_attrs['type'] in ['terminate', 'terminal']:
- condition = 'n/a'
- if 'condition' in stanza_attrs:
- condition = stanza_attrs['condition']
- if condition == 'n/a':
- log.info('Received sesion-ending terminating stanza')
- else:
- log.error('Received terminating stanza: %s - %s' % (condition,
- bosh_errors[condition]))
- self.disconnect()
- return
-
- if stanza_attrs['type'] == 'error':
- # recoverable error
- pass
- return
-
-
- def append_stanza(self, stanza):
- """
- Append stanza to a buffer to send
- """
- if stanza:
- if isinstance(stanza, tuple):
- # stanza is tuple of BOSH stanza and bool value for whether to add payload
- self.prio_bosh_stanzas.append(stanza)
- else:
- # stanza is XMPP stanza. Will be boshified before send.
- self.stanza_buffer.append(stanza)
-
-
- def send(self, stanza, now=False):
- self.send_BOSH(stanza)
-
-
-
- def get_current_state(self):
- t = '------ SOCKET_ID\tSOCKET_STATE\tPENDING_REQS\n'
- for s in self.http_socks:
- t = '%s------ %s\t%s\t%s\n' % (t, id(s), s.get_state(), s.pending_requests)
- t = '%s------ prio stanzas: %s, queued XMPP stanzas: %s, not_acked stanzas: %s' \
- % (t, self.prio_bosh_stanzas, self.stanza_buffer,
- self.ack_checker.get_not_acked_rids())
- return t
-
-
- def connect_and_flush(self, socket):
- socket.connect(
- conn_5tuple = self.conn_5tuple,
- on_connect = self.on_http_request_possible,
- on_connect_failure = self.disconnect)
-
-
- def boshify_stanzas(self, stanzas=[], body_attrs=None):
- """
- Wraps zero to many stanzas by body tag with xmlns and sid
- """
- log.debug('boshify_staza - type is: %s, stanza is %s' % (type(stanzas), stanzas))
- tag = BOSHBody(attrs={'sid': self.bosh_sid})
- tag.setPayload(stanzas)
- return tag
-
-
- def send_init(self, after_SASL=False):
- if after_SASL:
- t = BOSHBody(
- attrs={ 'to': self.bosh_to,
- 'sid': self.bosh_sid,
- 'xml:lang': self._owner.lang,
- 'xmpp:restart': 'true',
- 'secure': self.bosh_secure,
- 'xmlns:xmpp': 'urn:xmpp:xbosh'})
- else:
- t = BOSHBody(
- attrs={ 'content': self.bosh_content,
- 'hold': str(self.bosh_hold),
- 'route': 'xmpp:%s:%s' % (self.route_host, self.route_port),
- 'to': self.bosh_to,
- 'wait': str(self.bosh_wait),
- 'xml:lang': self._owner.lang,
- 'xmpp:version': '1.0',
- 'ver': '1.6',
- 'xmlns:xmpp': 'urn:xmpp:xbosh'})
- self.send_BOSH((t, True))
-
- def start_disconnect(self):
- NonBlockingTransport.start_disconnect(self)
- self.renew_bosh_wait_timeout(DISCONNECT_TIMEOUT_SECONDS)
- self.send_BOSH(
- (BOSHBody(attrs={'sid': self.bosh_sid, 'type': 'terminate'}), True))
-
-
- def get_new_http_socket(self):
- http_dict = {'http_uri': self.bosh_uri,
- 'http_version': self.http_version,
- 'http_persistent': self.http_persistent,
- 'add_proxy_headers': self.over_proxy and not self.estabilish_tls}
- if self.use_proxy_auth:
- http_dict['proxy_user'], http_dict['proxy_pass'] = self.proxy_creds
-
- s = NonBlockingHTTPBOSH(
- raise_event=self.raise_event,
- on_disconnect=self.disconnect,
- idlequeue = self.idlequeue,
- estabilish_tls = self.estabilish_tls,
- tls_version = self.tls_version,
- cipher_list = self.cipher_list,
- certs = self.certs,
- on_http_request_possible = self.on_http_request_possible,
- http_dict = http_dict,
- proxy_dict = self.proxy_dict,
- on_persistent_fallback = self.on_persistent_fallback)
-
- s.onreceive(self.on_received_http)
- s.set_stanza_build_cb(self.build_stanza)
- return s
-
-
- def onreceive(self, recv_handler):
- if recv_handler is None:
- recv_handler = self._owner.Dispatcher.ProcessNonBlocking
- self.current_recv_handler = recv_handler
-
-
- def on_received_http(self, data, socket):
- self.current_recv_socket = socket
- self.current_recv_handler(data)
-
-
- def disconnect(self, do_callback=True):
- self.remove_bosh_wait_timeout()
- if self.get_state() == DISCONNECTED: return
- self.fd = -1
- for s in self.http_socks:
- s.disconnect(do_callback=False)
- NonBlockingTransport.disconnect(self, do_callback)
-
-
-def get_rand_number():
- # (see http://www.xmpp.org/extensions/xep-0124.html#rids)
- # it's also used for sequence key initialization
- return int(binascii.hexlify(os.urandom(6)), 16)
-
-
-class AckChecker:
- """
- Class for generating rids and generating and checking acknowledgements in
- BOSH messages
- """
- def __init__(self):
- self.rid = get_rand_number()
- self.ack = 1
- self.last_rids = {}
- self.not_acked = []
-
-
- def get_not_acked_rids(self): return [rid for rid, st in self.not_acked]
-
- def backup_stanza(self, stanza, socket):
- socket.pending_requests += 1
- rid = self.get_rid()
- self.not_acked.append((rid, stanza))
- stanza.setAttr('rid', str(rid))
- self.last_rids[socket]=rid
-
- if self.rid != self.ack + 1:
- stanza.setAttr('ack', str(self.ack))
- return stanza
-
- def process_incoming_ack(self, socket, ack=None):
- socket.pending_requests -= 1
- if ack:
- ack = int(ack)
- else:
- ack = self.last_rids[socket]
-
- i = len([rid for rid, st in self.not_acked if ack >= rid])
- self.not_acked = self.not_acked[i:]
-
- self.ack = ack
-
-
- def get_rid(self):
- self.rid = self.rid + 1
- return self.rid
-
-
-class KeyStack:
- """
- Class implementing key sequences for BOSH messages
- """
- def __init__(self, count):
- self.count = count
- self.keys = []
- self.reset()
- self.first_call = True
-
- def reset(self):
- seed = str(get_rand_number()).encode('utf-8')
- self.keys = [sha1(seed).hexdigest()]
- for i in range(self.count-1):
- curr_seed = self.keys[i].encode('utf-8')
- self.keys.append(sha1(curr_seed).hexdigest())
-
- def get(self):
- if self.first_call:
- self.first_call = False
- return (None, self.keys.pop())
-
- if len(self.keys)>1:
- return (self.keys.pop(), None)
- else:
- last_key = self.keys.pop()
- self.reset()
- new_key = self.keys.pop()
- return (last_key, new_key)
-
-# http://www.xmpp.org/extensions/xep-0124.html#errorstatus-terminal
-bosh_errors = {
- 'n/a': 'none or unknown condition in terminating body stanza',
- 'bad-request': 'The format of an HTTP header or binding element received from the client is unacceptable (e.g., syntax error), or Script Syntax is not supported.',
- 'host-gone': 'The target domain specified in the "to" attribute or the target host or port specified in the "route" attribute is no longer serviced by the connection manager.',
- 'host-unknown': 'The target domain specified in the "to" attribute or the target host or port specified in the "route" attribute is unknown to the connection manager.',
- 'improper-addressing': 'The initialization element lacks a "to" or "route" attribute (or the attribute has no value) but the connection manager requires one.',
- 'internal-server-error': 'The connection manager has experienced an internal error that prevents it from servicing the request.',
- 'item-not-found': '(1) "sid" is not valid, (2) "stream" is not valid, (3) "rid" is larger than the upper limit of the expected window, (4) connection manager is unable to resend response, (5) "key" sequence is invalid',
- 'other-request': 'Another request being processed at the same time as this request caused the session to terminate.',
- 'policy-violation': 'The client has broken the session rules (polling too frequently, requesting too frequently, too many simultaneous requests).',
- 'remote-connection-failed': 'The connection manager was unable to connect to, or unable to connect securely to, or has lost its connection to, the server.',
- 'remote-stream-error': 'Encapsulates an error in the protocol being transported.',
- 'see-other-uri': 'The connection manager does not operate at this URI (e.g., the connection manager accepts only SSL or TLS connections at some https: URI rather than the http: URI requested by the client). The client may try POSTing to the URI in the content of the <uri/> child element.',
- 'system-shutdown': 'The connection manager is being shut down. All active HTTP sessions are being terminated. No new sessions can be created.',
- 'undefined-condition': 'The error is not one of those defined herein; the connection manager SHOULD include application-specific information in the content of the <body/> wrapper.'
-}
diff --git a/nbxmpp/client.py b/nbxmpp/client.py
deleted file mode 100644
index d01af4e..0000000
--- a/nbxmpp/client.py
+++ /dev/null
@@ -1,639 +0,0 @@
-## client.py
-##
-## Copyright (C) 2003-2005 Alexey "Snake" Nezhdanov
-## modified by Dimitur Kirov <dkirov@gmail.com>
-##
-## This program is free software; you can redistribute it and/or modify
-## it under the terms of the GNU General Public License as published by
-## the Free Software Foundation; either version 2, or (at your option)
-## any later version.
-##
-## This program is distributed in the hope that it will be useful,
-## but WITHOUT ANY WARRANTY; without even the implied warranty of
-## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-## GNU General Public License for more details.
-
-"""
-Client class establishes connection to XMPP Server and handles authentication
-"""
-
-import socket
-import logging
-
-from . import transports, dispatcher, roster, protocol, bosh
-from .protocol import NS_TLS
-from .protocol import JID
-from .auth import SASL
-from .bind import NonBlockingBind
-from .smacks import Smacks
-from .const import Realm
-from .const import Event
-
-
-log = logging.getLogger('nbxmpp.client')
-
-
-class NonBlockingClient:
- """
- Client class is XMPP connection mountpoint. Objects for authentication,
- network communication, roster, xml parsing ... are plugged to client object.
- Client implements the abstract behavior - mostly negotiation and callbacks
- handling, whereas underlying modules take care of feature-specific logic
- """
-
- def __init__(self, domain, idlequeue, caller=None, lang='en'):
- """
- Caches connection data
-
- :param domain: domain - for to: attribute (from account info)
- :param idlequeue: processing idlequeue
- :param caller: calling object - it has to implement methods
- _event_dispatcher which is called from dispatcher instance
- :param lang: the preferred stream language
- """
- self.Namespace = protocol.NS_CLIENT
- self.defaultNamespace = self.Namespace
- self.lang = lang
-
- self.idlequeue = idlequeue
- self.disconnect_handlers = []
-
- self.Server = domain
- self.xmpp_hostname = None # FQDN hostname to connect to
-
- # caller is who initiated this client, it is in needed to register
- # the EventDispatcher
- self._caller = caller
- self._owner = self
- self._registered_name = None # our full jid, set after successful bind
- self.connected = ''
- self.ip_addresses = []
- self.socket = None
- self.on_connect = None
- self.on_proxy_failure = None
- self.on_connect_failure = None
- self.proxy = None
- self.got_features = False
- self.got_see_other_host = None
- self.stream_started = False
- self.disconnecting = False
- self.protocol_type = 'XMPP'
- self.alpn = False
-
- # Smacks must retain data over multiple connects/disconnects
- Smacks.get_instance().PlugIn(self)
-
- def set_resume_data(self, data):
- '''
- Dict with values we pass to Smacks for session resumption
- This is only needed if between reconnects a new NonBlockingClient
- instance is created
- '''
- self.Smacks.set_resume_data(data)
-
- def get_resume_data(self):
- '''
- returns a dict with values that are necessary to resume a stream
- see set_resume_data()
- '''
- if not self.sm_enabled or not self.resume_supported:
- return {}
- return self.Smacks.get_resume_data()
-
- @property
- def sm_enabled(self):
- return self.Smacks.enabled
-
- @property
- def resume_supported(self):
- return self.Smacks.resume_supported
-
- def set_bound_jid(self, jid):
- if jid is None:
- return
- jid = JID(jid)
- self._registered_name = jid
- self.User = jid.getNode()
- self.Resource = jid.getResource()
-
- def get_bound_jid(self):
- return self._registered_name
-
- def get_ssl_connection(self):
- if 'NonBlockingTCP' in self.__dict__:
- return self.NonBlockingTCP.get_ssl_connection()
-
- def disconnect(self, message=''):
- """
- Called on disconnection - disconnect callback is picked based on state of
- the client.
- """
- # to avoid recursive calls
- if self.ip_addresses:
- self._try_next_ip()
- return
- if self.disconnecting: return
-
- log.info('Disconnecting NBClient: %s' % message)
-
- sasl_failed = False
- if 'NonBlockingRoster' in self.__dict__:
- self.NonBlockingRoster.PlugOut()
- if 'SASL' in self.__dict__:
- self.SASL.PlugOut()
- if 'NonBlockingTCP' in self.__dict__:
- self.NonBlockingTCP.PlugOut()
- if 'NonBlockingHTTP' in self.__dict__:
- self.NonBlockingHTTP.PlugOut()
- if 'NonBlockingBOSH' in self.__dict__:
- self.NonBlockingBOSH.PlugOut()
- # FIXME: we never unplug dispatcher, only on next connect
- # See _xmpp_connect_machine and SASLHandler
-
- connected = self.connected
- stream_started = self.stream_started
-
- self.connected = ''
- self.stream_started = False
-
- self.disconnecting = True
-
- log.debug('Client disconnected..')
- # Don't call any callback when it's a SASL failure.
- # SASL handler is already called
- if connected == '' and not sasl_failed:
- # if we're disconnecting before connection to XMPP sever is opened,
- # we don't call disconnect handlers but on_connect_failure callback
- if self.proxy:
- # with proxy, we have different failure callback
- log.debug('calling on_proxy_failure cb')
- self.on_proxy_failure(reason=message)
- else:
- log.debug('calling on_connect_failure cb')
- self.on_connect_failure()
- elif not sasl_failed:
- # we are connected to XMPP server
- if not stream_started:
- # if error occur before XML stream was opened, e.g. no response on
- # init request, we call the on_connect_failure callback because
- # proper connection is not established yet and it's not a proxy
- # issue
- log.debug('calling on_connect_failure cb')
- self._caller.streamError = message
- self.on_connect_failure()
- else:
- # with open connection, we are calling the disconnect handlers
- for i in reversed(self.disconnect_handlers):
- log.debug('Calling disconnect handler %s' % i)
- i()
- self.disconnecting = False
-
- def connect(self, on_connect, on_connect_failure, hostname=None, port=5222,
- on_proxy_failure=None, on_stream_error_cb=None, proxy=None,
- secure_tuple=('tls', None, None, None, None, False)):
- """
- Open XMPP connection (open XML streams in both directions)
-
- :param on_connect: called after stream is successfully opened
- :param on_connect_failure: called when error occurs during connection
- :param hostname: hostname of XMPP server from SRV request
- :param port: port number of XMPP server
- :param on_proxy_failure: called if error occurs during TCP connection
- to proxy server or during proxy connecting process
- :param on_stream_error_cb: called if error occurs
- :param proxy: dictionary with proxy data. It should contain at least
- values for keys 'host' and 'port' - connection details for proxy
- serve and optionally keys 'user' and 'pass' as proxy credentials
- :param secure_tuple: tuple of (desired connection type, cacerts,
- mycerts, tls_version, cipher_list, alpn)
- connection type can be 'ssl' - TLS established after TCP connection,
- 'tls' - TLS established after negotiation with starttls, or
- 'plain'.
- cacerts, mycerts, tls_version, cipher_list, alpn
- see tls.NonBlockingTLS constructor for more details
- """
- self.on_connect = on_connect
- self.on_connect_failure=on_connect_failure
- self.on_proxy_failure = on_proxy_failure
- self.on_stream_error_cb = on_stream_error_cb
- self.desired_security, self.cacerts, self.mycerts, self.tls_version, \
- self.cipher_list = secure_tuple[:5]
- if len(secure_tuple) == 6:
- # ALPN support was added in version 0.6.3
- self.alpn = secure_tuple[5]
- self.Connection = None
- self.Port = port
- self.proxy = proxy
-
- if hostname:
- self.xmpp_hostname = hostname
- else:
- self.xmpp_hostname = self.Server
-
- # We only check for SSL here as for TLS we will first have to start a
- # PLAIN connection and negotiate TLS afterwards.
- # establish_tls will instruct transport to start secure connection
- # directly
- establish_tls = self.desired_security == 'ssl'
- certs = (self.cacerts, self.mycerts)
-
- proxy_dict = {}
- tcp_host = self.xmpp_hostname
- tcp_port = self.Port
-
- if proxy:
- # with proxies, client connects to proxy instead of directly to
- # XMPP server ((hostname, port))
- # tcp_host is hostname of machine used for socket connection
- # (DNS request will be done for proxy or BOSH CM hostname)
- tcp_host, tcp_port, proxy_user, proxy_pass = \
- transports.get_proxy_data_from_dict(proxy)
-
- if proxy['type'] == 'bosh':
- # Setup BOSH transport
- self.socket = bosh.NonBlockingBOSH.get_instance(
- on_disconnect=self.disconnect,
- raise_event=self.raise_event,
- idlequeue=self.idlequeue,
- estabilish_tls=establish_tls,
- certs=certs,
- tls_version = self.tls_version,
- cipher_list = self.cipher_list,
- proxy_creds=(proxy_user, proxy_pass),
- xmpp_server=(self.xmpp_hostname, self.Port),
- domain=self.Server,
- bosh_dict=proxy)
- self.protocol_type = 'BOSH'
- self.wait_for_restart_response = \
- proxy['bosh_wait_for_restart_response']
- else:
- # http proxy
- proxy_dict['type'] = proxy['type']
- proxy_dict['xmpp_server'] = (self.xmpp_hostname, self.Port)
- proxy_dict['credentials'] = (proxy_user, proxy_pass)
-
- if not proxy or proxy['type'] != 'bosh':
- # Setup ordinary TCP transport
- self.socket = transports.NonBlockingTCP.get_instance(
- on_disconnect=self.disconnect,
- raise_event=self.raise_event,
- idlequeue=self.idlequeue,
- estabilish_tls=establish_tls,
- certs=certs,
- tls_version = self.tls_version,
- cipher_list = self.cipher_list,
- alpn=self.alpn,
- proxy_dict=proxy_dict)
-
- # plug transport into client as self.Connection
- self.socket.PlugIn(self)
-
- self._resolve_hostname(
- hostname=tcp_host,
- port=tcp_port,
- on_success=self._try_next_ip)
-
- def _resolve_hostname(self, hostname, port, on_success):
- """
- Wrapper for getaddinfo call
-
- FIXME: getaddinfo blocks
- """
- try:
- self.ip_addresses = socket.getaddrinfo(hostname, port,
- socket.AF_UNSPEC, socket.SOCK_STREAM)
- except socket.gaierror as exc:
- self.disconnect(message='Lookup failure for %s:%s, hostname: %s - %s' %
- (self.Server, self.Port, hostname, str(exc)))
- except socket.error as exc:
- errnum, errstr = exc.errno, exc.strerror
- self.disconnect(message='General socket error for %s:%s, hostname: '
- '%s - %s' % (self.Server, self.Port, hostname, errstr))
- else:
- on_success()
-
- def _try_next_ip(self, err_message=None):
- """
- Iterate over IP addresses tries to connect to it
- """
- if err_message:
- log.debug('While looping over DNS A records: %s' % err_message)
- if not self.ip_addresses:
- msg = 'Run out of hosts for name %s:%s.' % (self.Server, self.Port)
- msg += ' Error for last IP: %s' % err_message
- self.disconnect(msg)
- else:
- self.current_ip = self.ip_addresses.pop(0)
- self.socket.connect(
- conn_5tuple=self.current_ip,
- on_connect=lambda: self._xmpp_connect(),
- on_connect_failure=self._try_next_ip)
-
- def incoming_stream_version(self):
- """
- Get version of xml stream
- """
- if 'version' in self.Dispatcher.Stream._document_attrs:
- return self.Dispatcher.Stream._document_attrs['version']
- else:
- return None
-
- def _xmpp_connect(self, socket_type=None):
- """
- Start XMPP connecting process - open the XML stream. Is called after TCP
- connection is established or after switch to TLS when successfully
- negotiated with <starttls>.
- """
- # socket_type contains info which transport connection was established
- if not socket_type:
- if self.Connection.ssl_lib:
- # When ssl_lib is set we connected via SSL
- socket_type = 'ssl'
- else:
- # PLAIN is default
- socket_type = 'plain'
- self.connected = socket_type
- self._xmpp_connect_machine()
-
- def _xmpp_connect_machine(self, mode=None, data=None):
- """
- Finite automaton taking care of stream opening and features tag handling.
- Calls _on_stream_start when stream is started, and disconnect() on
- failure.
- """
- log.info('-------------xmpp_connect_machine() >> mode: %s, data: %s...' %
- (mode, str(data)[:20]))
-
- def on_next_receive(mode):
- """
- Set desired on_receive callback on transport based on the state of
- connect_machine.
- """
- log.info('setting %s on next receive' % mode)
- if mode is None:
- self.onreceive(None) # switch to Dispatcher.ProcessNonBlocking
- else:
- self.onreceive(lambda _data:self._xmpp_connect_machine(mode, _data))
-
- if not mode:
- # starting state
- if 'Dispatcher' in self.__dict__:
- self.Dispatcher.PlugOut()
- self.got_features = False
- dispatcher.Dispatcher.get_instance().PlugIn(self)
- on_next_receive('RECEIVE_DOCUMENT_ATTRIBUTES')
-
- elif mode == 'FAILURE':
- self.disconnect('During XMPP connect: %s' % data)
-
- elif mode == 'RECEIVE_DOCUMENT_ATTRIBUTES':
- if data:
- self.Dispatcher.ProcessNonBlocking(data)
- self.ip_addresses = []
- if not hasattr(self, 'Dispatcher') or \
- self.Dispatcher.Stream._document_attrs is None:
- self._xmpp_connect_machine(mode='FAILURE',
- data='Error on stream open')
- return
-
- # if terminating stanza was received after init request then client gets
- # disconnected from bosh transport plugin and we have to end the stream
- # negotiating process straight away.
- # fixes #4657
- if not self.connected: return
-
- if self.incoming_stream_version() == '1.0':
- if not self.got_features:
- on_next_receive('RECEIVE_STREAM_FEATURES')
- else:
- log.info('got STREAM FEATURES in first recv')
- self._xmpp_connect_machine(mode='STREAM_STARTED')
- else:
- log.info('incoming stream version less than 1.0')
- self._xmpp_connect_machine(mode='STREAM_STARTED')
-
- elif mode == 'RECEIVE_STREAM_FEATURES':
- if data:
- # sometimes <features> are received together with document
- # attributes and sometimes on next receive...
- self.Dispatcher.ProcessNonBlocking(data)
- if self.got_see_other_host:
- log.info('got see-other-host')
- self.onreceive(None)
- self.on_stream_error_cb(self, self.got_see_other_host)
- elif not self.got_features:
- self._xmpp_connect_machine(mode='FAILURE',
- data='Missing <features> in 1.0 stream')
- else:
- log.info('got STREAM FEATURES in second recv')
- self._xmpp_connect_machine(mode='STREAM_STARTED')
-
- elif mode == 'STREAM_STARTED':
- self._on_stream_start()
-
- def _on_stream_start(self):
- """
- Called after XMPP stream is opened. TLS negotiation may follow if
- supported and desired.
- """
- self.stream_started = True
- if not hasattr(self, 'onreceive'):
- # we may already have been disconnected
- return
- self.onreceive(None)
-
- if self.connected == 'plain':
- if self.desired_security == 'plain':
- # if we want and have plain connection, we're done now
- self._on_connect()
- else:
- # try to negotiate TLS
- if self.incoming_stream_version() != '1.0':
- # if stream version is less than 1.0, we can't do more
- log.info('While connecting with type = "tls": stream version ' +
- 'is less than 1.0')
- self._on_connect()
- return
- if self.Dispatcher.Stream.features.getTag('starttls'):
- # Server advertises TLS support, start negotiation
- self.stream_started = False
- log.info('TLS supported by remote server. Requesting TLS start.')
- self._tls_negotiation_handler()
- else:
- log.info('While connecting with type = "tls": TLS unsupported ' +
- 'by remote server')
- self._on_connect()
-
- elif self.connected in ['ssl', 'tls']:
- self._on_connect()
- else:
- assert False, 'Stream opened for unsupported connection'
-
- def _tls_negotiation_handler(self, con=None, tag=None):
- """
- Take care of TLS negotiation with <starttls>
- """
- log.info('-------------tls_negotiaton_handler() >> tag: %s' % tag)
- if not con and not tag:
- # starting state when we send the <starttls>
- self.RegisterHandlerOnce('proceed', self._tls_negotiation_handler,
- xmlns=NS_TLS)
- self.RegisterHandlerOnce('failure', self._tls_negotiation_handler,
- xmlns=NS_TLS)
- self.send('<starttls xmlns="%s"/>' % NS_TLS)
- else:
- # we got <proceed> or <failure>
- if tag.getNamespace() != NS_TLS:
- self.disconnect('Unknown namespace: %s' % tag.getNamespace())
- return
- tagname = tag.getName()
- if tagname == 'failure':
- self.disconnect('TLS <failure> received: %s' % tag)
- return
- log.info('Got starttls proceed response. Switching to TLS/SSL...')
- # following call wouldn't work for BOSH transport but it doesn't matter
- # because <starttls> negotiation with BOSH is forbidden
- self.Connection.tls_init(
- on_succ = lambda: self._xmpp_connect(socket_type='tls'),
- on_fail = lambda: self.disconnect('error while establishing TLS'))
-
- def _on_connect(self):
- """
- Preceed call of on_connect callback
- """
- self.onreceive(None)
- self.on_connect(self, self.connected)
-
- def raise_event(self, event_type, data):
- """
- Raise event to connection instance. DATA_SENT and DATA_RECIVED events
- are used in XML console to show XMPP traffic
- """
- e_t = event_type
- if type(event_type) != str:
- e_t = event_type.encode('utf-8')
- log.info('raising event from transport: :::::%s::::\n_____________\n%s\n_____________\n' % (e_t, data))
- if hasattr(self, 'Dispatcher'):
- self.Dispatcher.Event('', event_type, data)
-
-###############################################################################
-### follows code for authentication, resource bind, session and roster download
-###############################################################################
-
- def auth(self, user, get_password=None, resource='', auth_mechs=None):
- """
- Authenticate connnection and bind resource. If resource is not provided
- random one or library name used
-
- :param user: XMPP username
- :param get_password: Callback that must return the password for the
- chosen mechanism
- :param resource: resource that shall be used for auth/connecting
- :param auth_mechs: Set of valid authentification mechanisms. If None all
- authentification mechanisms will be allowed.
- See the auth module for possible values
- """
- if 'SASL' in self.__dict__:
- log.error('Auth not possible while another auth is in progress')
- return
-
- self._User = user
- self._Resource = resource
-
- self.onreceive(None)
- SASL.get_instance(self._User,
- auth_mechs,
- get_password,
- self._on_sasl_finished).PlugIn(self)
-
- def _on_sasl_finished(self, success, reason, text):
- if success:
- self.SASL.PlugOut()
- if self.protocol_type == 'BOSH':
- self.Dispatcher.after_SASL = True
- self.Dispatcher.StreamInit()
-
- self.connected += '+sasl'
- self.Dispatcher.Event(Realm.CONNECTING,
- Event.AUTH_SUCCESSFUL)
- self._owner.Smacks.register_handlers()
- else:
- self.Dispatcher.Event(Realm.CONNECTING,
- Event.AUTH_FAILED,
- (reason, text))
-
- def bind(self):
- # Check if we can resume
- if self.Smacks.resume_supported:
- self.Smacks.resume_request()
- else:
- # If we cant resume we bind and enable sm afterwards
- NonBlockingBind.get_instance().PlugIn(self)
-
- def initRoster(self, version='', request=True):
- """
- Plug in the roster
- """
- if 'NonBlockingRoster' not in self.__dict__:
- return roster.NonBlockingRoster.get_instance(
- version=version).PlugIn(self, request=request)
-
- def getRoster(self, on_ready=None, force=False):
- """
- Return the Roster instance, previously plugging it in and requesting
- roster from server if needed
- """
- if 'NonBlockingRoster' in self.__dict__:
- return self.NonBlockingRoster.getRoster(on_ready, force)
- return None
-
- def sendPresence(self, jid=None, typ=None, requestRoster=0):
- """
- Send some specific presence state. Can also request roster from server if
- according agrument is set
- """
- if requestRoster:
- # FIXME: used somewhere?
- roster.NonBlockingRoster.get_instance().PlugIn(self)
- self.send(dispatcher.Presence(to=jid, typ=typ))
-
-###############################################################################
-### following methods are moved from blocking client class of xmpppy
-###############################################################################
-
- def RegisterDisconnectHandler(self, handler):
- """
- Register handler that will be called on disconnect
- """
- self.disconnect_handlers.append(handler)
-
- def UnregisterDisconnectHandler(self, handler):
- """
- Unregister handler that is called on disconnect
- """
- self.disconnect_handlers.remove(handler)
-
- def DisconnectHandler(self):
- """
- Default disconnect handler. Just raises an IOError. If you choosed to use
- this class in your production client, override this method or at least
- unregister it.
- """
- raise IOError('Disconnected from server.')
-
- def get_connect_type(self):
- """
- Return connection state. F.e.: None / 'tls' / 'plain+non_sasl'
- """
- return self.connected
-
- def get_peerhost(self):
- """
- Gets the ip address of the account, from which is made connection to the
- server (e.g. IP and port of socket)
-
- We will create listening socket on the same ip
- """
- # FIXME: tuple (ip, port) is expected (and checked for) but port num is
- # useless
- return self.socket.peerhost
diff --git a/nbxmpp/features.py b/nbxmpp/features.py
deleted file mode 100644
index 69b8b6e..0000000
--- a/nbxmpp/features.py
+++ /dev/null
@@ -1,216 +0,0 @@
-## features.py
-##
-## Copyright (C) 2003-2004 Alexey "Snake" Nezhdanov
-## Copyright (C) 2007 Julien Pivotto <roidelapluie@gmail.com>
-##
-## This program is free software; you can redistribute it and/or modify
-## it under the terms of the GNU General Public License as published by
-## the Free Software Foundation; either version 2, or (at your option)
-## any later version.
-##
-## This program is distributed in the hope that it will be useful,
-## but WITHOUT ANY WARRANTY; without even the implied warranty of
-## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-## GNU General Public License for more details.
-
-"""
-Different stuff that wasn't worth separating it into modules
-(Registration, Privacy Lists, ...)
-"""
-
-from .protocol import NS_REGISTER, NS_PRIVACY, NS_DATA, Iq, isResultNode, Node
-
-def _on_default_response(disp, iq, cb):
- def _on_response(_client, resp):
- if isResultNode(resp):
- if cb:
- cb(True)
- elif cb:
- cb(False)
- disp.SendAndCallForResponse(iq, _on_response)
-
-###############################################################################
-### Registration
-###############################################################################
-
-REGISTER_DATA_RECEIVED = 'REGISTER DATA RECEIVED'
-
-def getRegInfo(disp, host, info={}, sync=True):
- """
- Get registration form from remote host. Info dict can be prefilled
- :param disp: plugged dispatcher instance
- :param info: dict, like {'username':'joey'}.
-
- See JEP-0077 for details.
- """
- iq=Iq('get', NS_REGISTER, to=host)
- for i in info.keys():
- iq.setTagData(i, info[i])
- if sync:
- disp.SendAndCallForResponse(iq, lambda _client, resp:
- _ReceivedRegInfo(disp.Dispatcher, resp, host))
- else:
- disp.SendAndCallForResponse(iq, _ReceivedRegInfo, {'agent': host })
-
-def _ReceivedRegInfo(con, resp, agent):
- Iq('get', NS_REGISTER, to=agent)
- if not isResultNode(resp):
- error_msg = resp.getErrorMsg()
- con.Event(NS_REGISTER, REGISTER_DATA_RECEIVED, (agent, None, False, error_msg, ''))
- return
- tag=resp.getTag('query', namespace=NS_REGISTER)
- if not tag:
- error_msg = resp.getErrorMsg()
- con.Event(NS_REGISTER, REGISTER_DATA_RECEIVED, (agent, None, False, error_msg, ''))
- return
- df=tag.getTag('x', namespace=NS_DATA)
- if df:
- con.Event(NS_REGISTER, REGISTER_DATA_RECEIVED, (agent, df, True, '',
- tag))
- return
- df={}
- for i in resp.getQueryPayload():
- if not isinstance(i, Node):
- continue
- df[i.getName()] = i.getData()
- con.Event(NS_REGISTER, REGISTER_DATA_RECEIVED, (agent, df, False, '', ''))
-
-def register(disp, host, info, cb, args=None):
- """
- Perform registration on remote server with provided info
-
- If registration fails you can get additional info from the dispatcher's
- owner attributes lastErrNode, lastErr and lastErrCode.
- """
- iq=Iq('set', NS_REGISTER, to=host)
- if not isinstance(info, dict):
- info=info.asDict()
- for i in info.keys():
- iq.setTag('query').setTagData(i, info[i])
- disp.SendAndCallForResponse(iq, cb, args)
-
-def unregister(disp, host, cb):
- """
- Unregisters with host (permanently removes account). Returns true on success
- """
- iq = Iq('set', NS_REGISTER, to=host, payload=[Node('remove')])
- _on_default_response(disp, iq, cb)
-
-def changePasswordTo(disp, newpassword, host=None, cb = None):
- """
- Changes password on specified or current (if not specified) server. Returns
- true on success.
- """
- if not host:
- host = disp._owner.Server
- iq = Iq('set', NS_REGISTER, to=host, payload=[Node('username',
- payload=[disp._owner.Server]), Node('password', payload=[newpassword])])
- _on_default_response(disp, iq, cb)
-
-###############################################################################
-### Privacy List
-###############################################################################
-
-PL_TYPE_JID = 'jid'
-PL_TYPE_GROUP = 'group'
-PL_TYPE_SUBC = 'subscription'
-PL_ACT_ALLOW = 'allow'
-PL_ACT_DENY = 'deny'
-
-PRIVACY_LISTS_RECEIVED = 'PRIVACY LISTS RECEIVED'
-PRIVACY_LIST_RECEIVED = 'PRIVACY LIST RECEIVED'
-PRIVACY_LISTS_ACTIVE_DEFAULT = 'PRIVACY LISTS ACTIVE DEFAULT'
-
-def getPrivacyLists(disp):
- """
- Request privacy lists from connected server. Returns dictionary of existing
- lists on success.
- """
- iq = Iq('get', NS_PRIVACY)
- def _on_response(_client, resp):
- dict_ = {'lists': []}
- if not isResultNode(resp):
- disp.Event(NS_PRIVACY, PRIVACY_LISTS_RECEIVED, False)
- return
- for list_ in resp.getQueryPayload():
- if list_.getName()=='list':
- dict_['lists'].append(list_.getAttr('name'))
- else:
- dict_[list_.getName()]=list_.getAttr('name')
- disp.Event(NS_PRIVACY, PRIVACY_LISTS_RECEIVED, dict_)
- disp.SendAndCallForResponse(iq, _on_response)
-
-def getActiveAndDefaultPrivacyLists(disp):
- iq = Iq('get', NS_PRIVACY)
- def _on_response(_client, resp):
- dict_ = {'active': '', 'default': ''}
- if not isResultNode(resp):
- disp.Event(NS_PRIVACY, PRIVACY_LISTS_ACTIVE_DEFAULT, False)
- return
- for list_ in resp.getQueryPayload():
- if list_.getName() == 'active':
- dict_['active'] = list_.getAttr('name')
- elif list_.getName() == 'default':
- dict_['default'] = list_.getAttr('name')
- disp.Event(NS_PRIVACY, PRIVACY_LISTS_ACTIVE_DEFAULT, dict_)
- disp.SendAndCallForResponse(iq, _on_response)
-
-def getPrivacyList(disp, listname):
- """
- Request specific privacy list listname. Returns list of XML nodes (rules)
- taken from the server responce.
- """
- def _on_response(_client, resp):
- if not isResultNode(resp):
- disp.Event(NS_PRIVACY, PRIVACY_LIST_RECEIVED, False)
- return
- disp.Event(NS_PRIVACY, PRIVACY_LIST_RECEIVED, resp)
- iq = Iq('get', NS_PRIVACY, payload=[Node('list', {'name': listname})])
- disp.SendAndCallForResponse(iq, _on_response)
-
-def setActivePrivacyList(disp, listname=None, typ='active', cb=None):
- """
- Switch privacy list 'listname' to specified type. By default the type is
- 'active'. Returns true on success.
- """
- if listname:
- attrs={'name':listname}
- else:
- attrs={}
- iq = Iq('set', NS_PRIVACY, payload=[Node(typ, attrs)])
- _on_default_response(disp, iq, cb)
-
-def setDefaultPrivacyList(disp, listname=None):
- """
- Set the default privacy list as 'listname'. Returns true on success
- """
- return setActivePrivacyList(disp, listname, 'default')
-
-def setPrivacyList(disp, listname, tags):
- """
- Set the ruleset
-
- 'list' should be the simpleXML node formatted according to RFC 3921
- (XMPP-IM) I.e. Node('list',{'name':listname},payload=[...]).
-
- Returns true on success.
- """
- iq = Iq('set', NS_PRIVACY, xmlns = '')
- list_query = iq.getTag('query').setTag('list', {'name': listname})
- for item in tags:
- if 'type' in item and 'value' in item:
- item_tag = list_query.setTag('item', {'action': item['action'],
- 'order': item['order'], 'type': item['type'],
- 'value': item['value']})
- else:
- item_tag = list_query.setTag('item', {'action': item['action'],
- 'order': item['order']})
- if 'child' in item:
- for child_tag in item['child']:
- item_tag.setTag(child_tag)
- _on_default_response(disp, iq, None)
-
-def delPrivacyList(disp, listname, cb=None):
- """ Deletes privacy list 'listname'. Returns true on success. """
- iq = Iq('set', NS_PRIVACY, payload=[Node('list', {'name':listname})])
- _on_default_response(disp, iq, cb)
diff --git a/nbxmpp/proxy_connectors.py b/nbxmpp/proxy_connectors.py
deleted file mode 100644
index c83e1e2..0000000
--- a/nbxmpp/proxy_connectors.py
+++ /dev/null
@@ -1,247 +0,0 @@
-## proxy_connectors.py
-##
-## Copyright (C) 2003-2004 Alexey "Snake" Nezhdanov
-## modified by Dimitur Kirov <dkirov@gmail.com>
-## modified by Tomas Karasek <tom.to.the.k@gmail.com>
-##
-## This program is free software; you can redistribute it and/or modify
-## it under the terms of the GNU General Public License as published by
-## the Free Software Foundation; either version 2, or (at your option)
-## any later version.
-##
-## This program is distributed in the hope that it will be useful,
-## but WITHOUT ANY WARRANTY; without even the implied warranty of
-## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-## GNU General Public License for more details.
-
-"""
-Module containing classes for proxy connecting. So far its HTTP CONNECT and
-SOCKS5 proxy
-
-Authentication to NTLM (Microsoft implementation) proxies can be next.
-"""
-
-import struct
-import socket
-import logging
-from base64 import b64encode
-
-log = logging.getLogger('nbxmpp.proxy_connectors')
-
-
-class ProxyConnector:
- """
- Interface for proxy-connecting object - when tunnneling XMPP over proxies,
- some connecting process usually has to be done before opening stream. Proxy
- connectors are used right after TCP connection is estabilished
- """
-
- def __init__(self, send_method, onreceive, old_on_receive, on_success,
- on_failure, xmpp_server, proxy_creds=(None, None)):
- """
- Creates proxy connector, starts connecting immediately and gives control
- back to transport afterwards
-
- :param send_method: transport send method
- :param onreceive: method to set on_receive callbacks
- :param old_on_receive: on_receive callback that should be set when
- proxy connection was successful
- :param on_success: called after proxy connection was successfully opened
- :param on_failure: called when errors occured while connecting
- :param xmpp_server: tuple of (hostname, port)
- :param proxy_creds: tuple of (proxy_user, proxy_credentials)
- """
- self.send = send_method
- self.onreceive = onreceive
- self.old_on_receive = old_on_receive
- self.on_success = on_success
- self.on_failure = on_failure
- self.xmpp_server = xmpp_server
- self.proxy_user, self.proxy_pass = proxy_creds
- self.old_on_receive = old_on_receive
-
- self.start_connecting()
-
- @classmethod
- def get_instance(cls, *args, **kwargs):
- """
- Factory Method for object creation
-
- Use this instead of directly initializing the class in order to make unit
- testing much easier.
- """
- return cls(*args, **kwargs)
-
- def start_connecting(self):
- raise NotImplementedError
-
- def connecting_over(self):
- self.onreceive(self.old_on_receive)
- self.on_success()
-
-class HTTPCONNECTConnector(ProxyConnector):
- def start_connecting(self):
- """
- Connect to a proxy, supply login and password to it (if were specified
- while creating instance). Instruct proxy to make connection to the target
- server.
- """
- log.info('Proxy server contacted, performing authentification')
- connector = ['CONNECT %s:%s HTTP/1.1' % self.xmpp_server,
- 'Proxy-Connection: Keep-Alive',
- 'Pragma: no-cache',
- 'Host: %s:%s' % self.xmpp_server,
- 'User-Agent: Gajim']
- if self.proxy_user and self.proxy_pass:
- credentials = '%s:%s' % (self.proxy_user, self.proxy_pass)
- credentials = b64encode(credentials.encode('utf-8')).decode(
- 'utf-8').strip()
- connector.append('Proxy-Authorization: Basic '+credentials)
- connector.append('\r\n')
- self.onreceive(self._on_headers_sent)
- self.send('\r\n'.join(connector))
-
- def _on_headers_sent(self, reply):
- if reply is None:
- return
- self.reply = reply.replace('\r', '')
- try:
- proto, code, desc = reply.split('\n')[0].split(' ', 2)
- except:
- log.error("_on_headers_sent:", exc_info=True)
- #traceback.print_exc()
- self.on_failure('Invalid proxy reply')
- return
- if code != '200':
- log.error('Invalid proxy reply: %s %s %s' % (proto, code, desc))
- self.on_failure('Invalid proxy reply')
- return
- if len(reply) != 2:
- pass
- self.connecting_over()
-
-
-class SOCKS5Connector(ProxyConnector):
- """
- SOCKS5 proxy connection class. Allows to use SOCKS5 proxies with
- (optionally) simple authentication (only USERNAME/PASSWORD auth)
- """
-
- def start_connecting(self):
- log.info('Proxy server contacted, performing authentification')
- if self.proxy_user and self.proxy_pass:
- to_send = b'\x05\x02\x00\x02'
- else:
- to_send = b'\x05\x01\x00'
- self.onreceive(self._on_greeting_sent)
- self.send(to_send)
-
- def _to_int(self, c):
- if type(c) == str: # py2
- return ord(c)
- return c # py3
-
- def _on_greeting_sent(self, reply):
- if reply is None:
- return
- if len(reply) != 2:
- self.on_failure('Invalid proxy reply')
- return
- if self._to_int(reply[0]) != 5:
- log.info('Invalid proxy reply')
- self.on_failure('Invalid proxy reply')
- return
- if self._to_int(reply[1]) == 0:
- return self._on_proxy_auth(b'\x01\x00')
- elif self._to_int(reply[1]) == 2:
- to_send = '\x01' + chr(len(self.proxy_user)) + self.proxy_user +\
- chr(len(self.proxy_pass)) + self.proxy_pass
- self.onreceive(self._on_proxy_auth)
- self.send(to_send)
- else:
- if self._to_int(reply[1]) == 255:
- log.error('Authentification to proxy impossible: no acceptable '
- 'auth method')
- self.on_failure('Authentification to proxy impossible: no '
- 'acceptable authentification method')
- return
- log.error('Invalid proxy reply')
- self.on_failure('Invalid proxy reply')
- return
-
- def _on_proxy_auth(self, reply):
- if reply is None:
- return
- if len(reply) != 2:
- log.error('Invalid proxy reply')
- self.on_failure('Invalid proxy reply')
- return
- if reply != b'\x01\x00':
- log.error('Authentification to proxy failed')
- self.on_failure('Authentification to proxy failed')
- return
-
- log.info('Authentification successfull. Jabber server contacted.')
- # Request connection
- req = b'\x05\x01\x00'
- # If the given destination address is an IP address, we'll
- # use the IPv4 address request even if remote resolving was specified.
- try:
- self.ipaddr = socket.inet_aton(self.xmpp_server[0])
- req = req + b'\x01' + self.ipaddr
- except socket.error:
- # Well it's not an IP number, so it's probably a DNS name.
-# if self.__proxy[3]==True:
- # Resolve remotely
- self.ipaddr = None
- req = req + b'\x03' + (chr(len(self.xmpp_server[0])) + self.xmpp_server[0]).encode('utf-8')
-# else:
-# # Resolve locally
-# self.ipaddr = socket.inet_aton(socket.gethostbyname(self.xmpp_server[0]))
-# req = req + "\x01" + ipaddr
-
- req += struct.pack(">H", self.xmpp_server[1])
- self.onreceive(self._on_req_sent)
- self.send(req)
-
- def _on_req_sent(self, reply):
- if reply is None:
- return
- if len(reply) < 10:
- log.error('Invalid proxy reply')
- self.on_failure('Invalid proxy reply')
- return
- if self._to_int(reply[0]) != 5:
- log.error('Invalid proxy reply')
- self.on_failure('Invalid proxy reply')
- return
- if self._to_int(reply[1]) != 0:
- # Connection failed
- if self._to_int(reply[1]) < 9:
- errors = ['general SOCKS server failure',
- 'connection not allowed by ruleset',
- 'Network unreachable',
- 'Host unreachable',
- 'Connection refused',
- 'TTL expired',
- 'Command not supported',
- 'Address type not supported'
- ]
- txt = errors[self._to_int(reply[1])-1]
- else:
- txt = 'Invalid proxy reply'
- log.error(txt)
- self.on_failure(txt)
- return
- # Get the bound address/port
- elif self._to_int(reply[3]) == 1:
- begin, end = 3, 7
- elif self._to_int(reply[3]) == 3:
- begin, end = 4, 4 + self._to_int(reply[4])
- elif self._to_int(reply[3]) == 4:
- begin, end = 3, 19
- else:
- log.error('Invalid proxy reply')
- self.on_failure('Invalid proxy reply')
- return
- self.connecting_over()
diff --git a/nbxmpp/roster.py b/nbxmpp/roster.py
deleted file mode 100644
index a804cb3..0000000
--- a/nbxmpp/roster.py
+++ /dev/null
@@ -1,367 +0,0 @@
-## roster.py
-##
-## Copyright (C) 2003-2005 Alexey "Snake" Nezhdanov
-## modified by Dimitur Kirov <dkirov@gmail.com>
-##
-## This program is free software; you can redistribute it and/or modify
-## it under the terms of the GNU General Public License as published by
-## the Free Software Foundation; either version 2, or (at your option)
-## any later version.
-##
-## This program is distributed in the hope that it will be useful,
-## but WITHOUT ANY WARRANTY; without even the implied warranty of
-## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-## GNU General Public License for more details.
-
-"""
-Simple roster implementation. Can be used though for different tasks like
-mass-renaming of contacts.
-"""
-
-import logging
-
-from .protocol import JID, Iq, Presence, Node, NS_MUC_USER, NS_ROSTER
-from .plugin import PlugIn
-
-
-log = logging.getLogger('nbxmpp.roster')
-
-
-class NonBlockingRoster(PlugIn):
- """
- Defines a plenty of methods that will allow you to manage roster. Also
- automatically track presences from remote JIDs taking into account that
- every JID can have multiple resources connected. Does not currently support
- 'error' presences. You can also use mapping interface for access to the
- internal representation of contacts in roster
- """
-
- def __init__(self, version=None):
- """
- Init internal variables
- """
- PlugIn.__init__(self)
- self.version = version
- self._data = {}
- self._set=None
- self._exported_methods=[self.getRoster]
- self.received_from_server = False
-
- def Request(self, force=0):
- """
- Request roster from server if it were not yet requested (or if the
- 'force' argument is set)
- """
- if self._set is None:
- self._set = 0
- elif not force:
- return
-
- iq = Iq('get', NS_ROSTER)
- if self.version is not None:
- iq.setTagAttr('query', 'ver', self.version)
- id_ = self._owner.getAnID()
- iq.setID(id_)
- self._owner.send(iq)
- log.info('Roster requested from server')
- return id_
-
- def RosterIqHandler(self, dis, stanza):
- """
- Subscription tracker. Used internally for setting items state in internal
- roster representation
- """
- sender = stanza.getAttr('from')
- if sender is not None and not sender.bareMatch(
- self._owner.User + '@' + self._owner.Server):
- return
- query = stanza.getTag('query')
- if query:
- self.received_from_server = True
- self.version = stanza.getTagAttr('query', 'ver')
- if self.version is None:
- self.version = ''
- for item in query.getTags('item'):
- jid=item.getAttr('jid')
- if item.getAttr('subscription')=='remove':
- if jid in self._data: del self._data[jid]
- # Looks like we have a workaround
- # raise NodeProcessed # a MUST
- log.info('Setting roster item %s...' % jid)
- if jid not in self._data: self._data[jid]={}
- self._data[jid]['name']=item.getAttr('name')
- self._data[jid]['ask']=item.getAttr('ask')
- self._data[jid]['subscription']=item.getAttr('subscription')
- self._data[jid]['groups']=[]
- if 'resources' not in self._data[jid]: self._data[jid]['resources']={}
- for group in item.getTags('group'):
- if group.getData() not in self._data[jid]['groups']:
- self._data[jid]['groups'].append(group.getData())
- self._data[self._owner.User+'@'+self._owner.Server]={'resources': {}, 'name': None, 'ask': None, 'subscription': None, 'groups': None,}
- self._set=1
- # Looks like we have a workaround
- # raise NodeProcessed # a MUST. Otherwise you'll get back an <iq type='error'/>
-
- def PresenceHandler(self, dis, pres):
- """
- Presence tracker. Used internally for setting items' resources state in
- internal roster representation
- """
- if pres.getTag('x', namespace=NS_MUC_USER):
- return
- jid=pres.getFrom()
- if not jid:
- # If no from attribue, it's from server
- jid=self._owner.Server
- jid=JID(jid)
- if jid.getStripped() not in self._data: self._data[jid.getStripped()]={'name':None,'ask':None,'subscription':'none','groups':['Not in roster'],'resources':{}}
- if type(self._data[jid.getStripped()]['resources'])!=type(dict()):
- self._data[jid.getStripped()]['resources']={}
- item=self._data[jid.getStripped()]
- typ=pres.getType()
-
- if not typ:
- log.info('Setting roster item %s for resource %s...'%(jid.getStripped(), jid.getResource()))
- item['resources'][jid.getResource()]=res={'show':None,'status':None,'priority':'0','timestamp':None}
- if pres.getTag('show'): res['show']=pres.getShow()
- if pres.getTag('status'): res['status']=pres.getStatus()
- if pres.getTag('priority'): res['priority']=pres.getPriority()
- if not pres.getTimestamp(): pres.setTimestamp()
- res['timestamp']=pres.getTimestamp()
- elif typ=='unavailable' and jid.getResource() in item['resources']: del item['resources'][jid.getResource()]
- # Need to handle type='error' also
-
- def _getItemData(self, jid, dataname):
- """
- Return specific jid's representation in internal format. Used internally
- """
- jid = jid[:(jid+'/').find('/')]
- return self._data[jid][dataname]
-
- def _getResourceData(self, jid, dataname):
- """
- Return specific jid's resource representation in internal format. Used
- internally
- """
- if jid.find('/') + 1:
- jid, resource = jid.split('/', 1)
- if resource in self._data[jid]['resources']:
- return self._data[jid]['resources'][resource][dataname]
- elif list(self._data[jid]['resources'].keys()):
- lastpri = -129
- for r in list(self._data[jid]['resources'].keys()):
- if int(self._data[jid]['resources'][r]['priority']) > lastpri:
- resource, lastpri=r, int(self._data[jid]['resources'][r]['priority'])
- return self._data[jid]['resources'][resource][dataname]
-
- def delItem(self, jid):
- """
- Delete contact 'jid' from roster
- """
- self._owner.send(Iq('set', NS_ROSTER, payload=[Node('item', {'jid': jid, 'subscription': 'remove'})]))
-
- def getAsk(self, jid):
- """
- Return 'ask' value of contact 'jid'
- """
- return self._getItemData(jid, 'ask')
-
- def getGroups(self, jid):
- """
- Return groups list that contact 'jid' belongs to
- """
- return self._getItemData(jid, 'groups')
-
- def getName(self, jid):
- """
- Return name of contact 'jid'
- """
- return self._getItemData(jid, 'name')
-
- def getPriority(self, jid):
- """
- Return priority of contact 'jid'. 'jid' should be a full (not bare) JID
- """
- return self._getResourceData(jid, 'priority')
-
- def getRawRoster(self):
- """
- Return roster representation in internal format
- """
- return self._data
-
- def getRawItem(self, jid):
- """
- Return roster item 'jid' representation in internal format
- """
- return self._data[jid[:(jid+'/').find('/')]]
-
- def getShow(self, jid):
- """
- Return 'show' value of contact 'jid'. 'jid' should be a full (not bare)
- JID
- """
- return self._getResourceData(jid, 'show')
-
- def getStatus(self, jid):
- """
- Return 'status' value of contact 'jid'. 'jid' should be a full (not bare)
- JID
- """
- return self._getResourceData(jid, 'status')
-
- def getSubscription(self, jid):
- """
- Return 'subscription' value of contact 'jid'
- """
- return self._getItemData(jid, 'subscription')
-
- def getResources(self, jid):
- """
- Return list of connected resources of contact 'jid'
- """
- return list(self._data[jid[:(jid+'/').find('/')]]['resources'].keys())
-
- def setItem(self, jid, name=None, groups=None):
- """
- Rename contact 'jid' and sets the groups list that it now belongs to
- """
- iq = Iq('set', NS_ROSTER)
- query = iq.getTag('query')
- attrs = {'jid': jid}
- if name:
- attrs['name'] = name
- item = query.setTag('item', attrs)
- if groups is not None:
- for group in groups:
- item.addChild(node=Node('group', payload=[group]))
- self._owner.send(iq)
-
- def setItemMulti(self, items):
- """
- Rename multiple contacts and sets their group lists
- """
- for i in items:
- iq = Iq('set', NS_ROSTER)
- query = iq.getTag('query')
- attrs = {'jid': i['jid']}
- if i['name']:
- attrs['name'] = i['name']
- item = query.setTag('item', attrs)
- for group in i['groups']:
- item.addChild(node=Node('group', payload=[group]))
- self._owner.send(iq)
-
- def getItems(self):
- """
- Return list of all [bare] JIDs that the roster is currently tracks
- """
- return list(self._data.keys())
-
- def keys(self):
- """
- Same as getItems. Provided for the sake of dictionary interface
- """
- return list(self._data.keys())
-
- def __getitem__(self, item):
- """
- Get the contact in the internal format. Raises KeyError if JID 'item' is
- not in roster
- """
- return self._data[item]
-
- def getItem(self, item):
- """
- Get the contact in the internal format (or None if JID 'item' is not in
- roster)
- """
- if item in self._data:
- return self._data[item]
-
- def Subscribe(self, jid):
- """
- Send subscription request to JID 'jid'
- """
- self._owner.send(Presence(jid, 'subscribe'))
-
- def Unsubscribe(self, jid):
- """
- Ask for removing our subscription for JID 'jid'
- """
- self._owner.send(Presence(jid, 'unsubscribe'))
-
- def Authorize(self, jid):
- """
- Authorize JID 'jid'. Works only if these JID requested auth previously
- """
- self._owner.send(Presence(jid, 'subscribed'))
-
- def Unauthorize(self, jid):
- """
- Unauthorise JID 'jid'. Use for declining authorisation request or for
- removing existing authorization
- """
- self._owner.send(Presence(jid, 'unsubscribed'))
-
- def getRaw(self):
- """
- Return the internal data representation of the roster
- """
- return self._data
-
- def setRaw(self, data):
- """
- Return the internal data representation of the roster
- """
- self._data = data
- self._data[self._owner.User + '@' + self._owner.Server] = {
- 'resources': {},
- 'name': None,
- 'ask': None,
- 'subscription': None,
- 'groups': None
- }
- self._set = 1
-
- def plugin(self, owner, request=1):
- """
- Register presence and subscription trackers in the owner's dispatcher.
- Also request roster from server if the 'request' argument is set. Used
- internally
- """
- self._owner.RegisterHandler('iq', self.RosterIqHandler, 'result', NS_ROSTER)
- self._owner.RegisterHandler('iq', self.RosterIqHandler, 'set', NS_ROSTER)
- self._owner.RegisterHandler('presence', self.PresenceHandler)
- if request:
- return self.Request()
-
- def _on_roster_set(self, data):
- if data:
- self._owner.Dispatcher.ProcessNonBlocking(data)
- if not self._set:
- return
- if not hasattr(self, '_owner') or not self._owner:
- # Connection has been closed by receiving a <stream:error> for ex,
- return
- self._owner.onreceive(None)
- if self.on_ready:
- self.on_ready(self)
- self.on_ready = None
- return True
-
- def getRoster(self, on_ready=None, force=False):
- """
- Request roster from server if neccessary and returns self
- """
- return_self = True
- if not self._set:
- self.on_ready = on_ready
- self._owner.onreceive(self._on_roster_set)
- return_self = False
- elif on_ready:
- on_ready(self)
- return_self = False
- if return_self or force:
- return self
- return None
diff --git a/nbxmpp/tls.py b/nbxmpp/tls.py
deleted file mode 100644
index b29e629..0000000
--- a/nbxmpp/tls.py
+++ /dev/null
@@ -1,432 +0,0 @@
-## tls.py
-##
-## Copyright (C) 2003-2004 Alexey "Snake" Nezhdanov
-## modified by Dimitur Kirov <dkirov@gmail.com>
-## modified by Tomas Karasek <tom.to.the.k@gmail.com>
-##
-## This program is free software; you can redistribute it and/or modify
-## it under the terms of the GNU General Public License as published by
-## the Free Software Foundation; either version 2, or (at your option)
-## any later version.
-##
-## This program is distributed in the hope that it will be useful,
-## but WITHOUT ANY WARRANTY; without even the implied warranty of
-## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-## GNU General Public License for more details.
-
-import sys
-import os
-import time
-import traceback
-import logging
-
-from .plugin import PlugIn
-
-log = logging.getLogger('nbxmpp.tls')
-
-PYOPENSSL = 'PYOPENSSL'
-
-import OpenSSL.SSL
-import OpenSSL.crypto
-
-def gattr(obj, attr, default=None):
- try:
- return getattr(obj, attr)
- except AttributeError:
- return default
-
-
-class SSLWrapper:
- """
- Abstract SSLWrapper base class
- """
-
- class Error(IOError):
- """
- Generic SSL Error Wrapper
- """
-
- def __init__(self, sock=None, exc=None, errno=None, strerror=None,
- peer=None):
- self.parent = IOError
-
- errno = errno or gattr(exc, 'errno') or exc.args[0]
- strerror = strerror or gattr(exc, 'strerror') or gattr(exc, 'args')
- if not isinstance(strerror, str):
- strerror = repr(strerror)
-
- self.sock = sock
- self.exc = exc
- self.peer = peer
- self.exc_name = None
- self.exc_args = None
- self.exc_str = None
- self.exc_repr = None
-
- if self.exc is not None:
- self.exc_name = str(self.exc.__class__)
- self.exc_args = gattr(self.exc, 'args')
- self.exc_str = str(self.exc)
- self.exc_repr = repr(self.exc)
- if not errno:
- try:
- if isinstance(exc, OpenSSL.SSL.SysCallError):
- if self.exc_args[0] > 0:
- errno = self.exc_args[0]
- strerror = self.exc_args[1]
- except: pass
-
- self.parent.__init__(self, errno, strerror)
-
- if self.peer is None and sock is not None:
- try:
- ppeer = self.sock.getpeername()
- if len(ppeer) == 2 and isinstance(ppeer[0], str) \
- and isinstance(ppeer[1], int):
- self.peer = ppeer
- except:
- pass
-
- def __str__(self):
- s = str(self.__class__)
- if self.peer:
- s += " for %s:%d" % self.peer
- if self.errno is not None:
- s += ": [Errno: %d]" % self.errno
- if self.strerror:
- s += " (%s)" % self.strerror
- if self.exc_name:
- s += ", Caused by %s" % self.exc_name
- if self.exc_str:
- if self.strerror:
- s += "(%s)" % self.exc_str
- else:
- s += "(%s)" % str(self.exc_args)
- return s
-
- def __init__(self, sslobj, sock=None):
- self.sslobj = sslobj
- self.sock = sock
- log.debug("%s.__init__ called with %s", self.__class__, sslobj)
-
- def recv(self, data, flags=None):
- """
- Receive wrapper for SSL object
-
- We can return None out of this function to signal that no data is
- available right now. Better than an exception, which differs
- depending on which SSL lib we're using. Unfortunately returning ''
- can indicate that the socket has been closed, so to be sure, we avoid
- this by returning None.
- """
- raise NotImplementedError
-
- def send(self, data, flags=None, now=False):
- """
- Send wrapper for SSL object
- """
- raise NotImplementedError
-
-
-class PyOpenSSLWrapper(SSLWrapper):
- """
- Wrapper class for PyOpenSSL's recv() and send() methods
- """
-
- def __init__(self, *args):
- self.parent = SSLWrapper
- self.parent.__init__(self, *args)
-
- def is_numtoolarge(self, e):
- """ Magic methods don't need documentation """
- t = ('asn1 encoding routines', 'a2d_ASN1_OBJECT', 'first num too large')
- return (isinstance(e.args, (list, tuple)) and len(e.args) == 1 and
- isinstance(e.args[0], (list, tuple)) and len(e.args[0]) == 2 and
- e.args[0][0] == e.args[0][1] == t)
-
- def recv(self, bufsize, flags=None):
- retval = None
- try:
- if flags is None:
- retval = self.sslobj.recv(bufsize)
- else:
- retval = self.sslobj.recv(bufsize, flags)
- except (OpenSSL.SSL.WantReadError, OpenSSL.SSL.WantWriteError) as e:
- log.debug("Recv: Want-error: " + repr(e))
- except OpenSSL.SSL.SysCallError as e:
- log.debug("Recv: Got OpenSSL.SSL.SysCallError: " + repr(e),
- exc_info=True)
- raise SSLWrapper.Error(self.sock or self.sslobj, e)
- except OpenSSL.SSL.ZeroReturnError as e:
- # end-of-connection raises ZeroReturnError instead of having the
- # connection's .recv() method return a zero-sized result.
- raise SSLWrapper.Error(self.sock or self.sslobj, e, -1)
- except OpenSSL.SSL.Error as e:
- if self.is_numtoolarge(e):
- # warn, but ignore this exception
- log.warning("Recv: OpenSSL: asn1enc: first num too large (ignored)")
- else:
- log.debug("Recv: Caught OpenSSL.SSL.Error:", exc_info=True)
- raise SSLWrapper.Error(self.sock or self.sslobj, e)
- return retval
-
- def send(self, data, flags=None, now=False):
- try:
- if flags is None:
- return self.sslobj.send(data)
- else:
- return self.sslobj.send(data, flags)
- except (OpenSSL.SSL.WantReadError, OpenSSL.SSL.WantWriteError) as e:
- #log.debug("Send: " + repr(e))
- time.sleep(0.1) # prevent 100% CPU usage
- except OpenSSL.SSL.SysCallError as e:
- log.error("Send: Got OpenSSL.SSL.SysCallError: " + repr(e),
- exc_info=True)
- raise SSLWrapper.Error(self.sock or self.sslobj, e)
- except OpenSSL.SSL.Error as e:
- if self.is_numtoolarge(e):
- # warn, but ignore this exception
- log.warning("Send: OpenSSL: asn1enc: first num too large (ignored)")
- else:
- log.error("Send: Caught OpenSSL.SSL.Error:", exc_info=True)
- raise SSLWrapper.Error(self.sock or self.sslobj, e)
- return 0
-
-
-class NonBlockingTLS(PlugIn):
- """
- TLS connection used to encrypt already estabilished tcp connection
-
- Can be plugged into NonBlockingTCP and will make use of PyOpenSSLWrapper.
- """
-
- def __init__(self, cacerts, mycerts, tls_version, cipher_list, alpn):
- """
- :param cacerts: path to pem file with certificates of known XMPP servers
- :param mycerts: path to pem file with certificates of user trusted
- servers
- :param tls_version: The lowest supported TLS version. If None is
- provided, version 1.2 is used. For example setting to 1.3 will
- enable TLS 1.3 and all further protocols
- :param cipher_list: list of ciphers to use when connection to server. If
- None is provided, a default list is used: HIGH:!aNULL
- """
- PlugIn.__init__(self)
- self._exported_methods = [self.get_ssl_connection]
- self.cacerts = cacerts
- self.mycerts = mycerts
- if cipher_list is None:
- self.cipher_list = b'HIGH:!aNULL'
- else:
- self.cipher_list = cipher_list.encode('ascii')
- if tls_version is None:
- self.tls_version = '1.2'
- else:
- self.tls_version = tls_version
- self.alpn = alpn
-
- def plugin(self, owner):
- """
- Use to PlugIn TLS into transport and start establishing immediately.
- Returns True if TLS/SSL was established correctly, otherwise False
- """
- log.info('Starting TLS estabilishing')
- try:
- res = self._startSSL()
- except Exception as e:
- log.error("PlugIn: while trying _startSSL():", exc_info=True)
- return False
- return res
-
- def _dumpX509(self, cert, stream=sys.stderr):
- print("Digest (SHA-2 256):" + cert.digest("sha256"), file=stream)
- print("Digest (SHA-1):" + cert.digest("sha1"), file=stream)
- print("Digest (MD5):" + cert.digest("md5"), file=stream)
- print("Serial #:" + cert.get_serial_number(), file=stream)
- print("Version:" + cert.get_version(), file=stream)
- print("Expired:" + ("Yes" if cert.has_expired() else "No"), file=stream)
- print("Subject:", file=stream)
- self._dumpX509Name(cert.get_subject(), stream)
- print("Issuer:", file=stream)
- self._dumpX509Name(cert.get_issuer(), stream)
- self._dumpPKey(cert.get_pubkey(), stream)
-
- def _dumpX509Name(self, name, stream=sys.stderr):
- print("X509Name:" + str(name), file=stream)
-
- def _dumpPKey(self, pkey, stream=sys.stderr):
- typedict = {OpenSSL.crypto.TYPE_RSA: "RSA",
- OpenSSL.crypto.TYPE_DSA: "DSA"}
- print("PKey bits:" + pkey.bits(), file=stream)
- print("PKey type: %s (%d)" % (typedict.get(pkey.type(),
- "Unknown"), pkey.type()), file=stream)
-
- def _startSSL(self):
- """
- Immediatedly switch socket to TLS mode. Used internally
- """
- log.debug("_startSSL called")
-
- result = self._startSSL_pyOpenSSL()
-
- if result:
- log.debug('Synchronous handshake completed')
- return True
- else:
- return False
-
- def _load_cert_file(self, cert_path, cert_store, logg=True):
- if not os.path.isfile(cert_path):
- return
- try:
- f = open(cert_path, encoding='utf-8')
- lines = f.readlines()
- except (IOError, UnicodeError) as e:
- log.warning('Unable to open certificate file %s: %s' % \
- (cert_path, str(e)))
- return
-
- i = 0
- begin = -1
- for line in lines:
- if 'BEGIN CERTIFICATE' in line:
- begin = i
- elif 'END CERTIFICATE' in line and begin > -1:
- cert = ''.join(lines[begin:i+2])
- try:
- x509cert = OpenSSL.crypto.load_certificate(
- OpenSSL.crypto.FILETYPE_PEM, cert.encode('ascii', 'ignore'))
- cert_store.add_cert(x509cert)
- except OpenSSL.crypto.Error as exception_obj:
- if logg:
- log.warning('Unable to load a certificate from file %s: %s' %\
- (cert_path, exception_obj.args[0][0][2]))
- except:
- log.warning('Unknown error while loading certificate from file '
- '%s' % cert_path)
- begin = -1
- i += 1
- f.close()
-
- def _startSSL_pyOpenSSL(self):
- log.debug("_startSSL_pyOpenSSL called")
- tcpsock = self._owner
- # See http://docs.python.org/dev/library/ssl.html
- tcpsock._sslContext = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD)
- flags = OpenSSL.SSL.OP_NO_SSLv2 | OpenSSL.SSL.OP_NO_SSLv3 | \
- OpenSSL.SSL.OP_NO_TLSv1 | OpenSSL.SSL.OP_NO_TLSv1_1 | \
- OpenSSL.SSL.OP_SINGLE_DH_USE | OpenSSL.SSL.OP_NO_TICKET
-
- if self.alpn:
- # XEP-0368 set ALPN Protocol
- tcpsock._sslContext.set_alpn_protos([b'xmpp-client'])
-
- tcpsock._sslContext.set_options(flags)
-
- # Disable session resumption, protection against Triple Handshakes TLS attack
- tcpsock._sslContext.set_session_cache_mode(OpenSSL.SSL.SESS_CACHE_OFF)
-
- # NonBlockingHTTPBOSH instance has no attribute _owner
- if hasattr(tcpsock, '_owner') and tcpsock._owner._caller.client_cert \
- and os.path.exists(tcpsock._owner._caller.client_cert):
- conn = tcpsock._owner._caller
- log.debug('Using client cert and key from %s' % conn.client_cert)
- try:
- p12 = OpenSSL.crypto.load_pkcs12(open(conn.client_cert, 'rb').read(),
- conn.client_cert_passphrase)
- except OpenSSL.crypto.Error as exception_obj:
- log.warning('Unable to load client pkcs12 certificate from '
- 'file %s: %s ... Is it a valid PKCS12 cert?' % \
- (conn.client_cert, exception_obj.args))
- except:
- log.warning('Unknown error while loading certificate from file '
- '%s' % conn.client_cert)
- else:
- log.info('PKCS12 Client cert loaded OK')
- try:
- tcpsock._sslContext.use_certificate(p12.get_certificate())
- tcpsock._sslContext.use_privatekey(p12.get_privatekey())
- log.info('p12 cert and key loaded')
- except OpenSSL.crypto.Error as exception_obj:
- log.warning('Unable to extract client certificate from '
- 'file %s' % conn.client_cert)
- except Exception as msg:
- log.warning('Unknown error extracting client certificate '
- 'from file %s: %s' % (conn.client_cert, msg))
- else:
- log.info('client cert and key loaded OK')
-
- tcpsock.ssl_errnum = 0
- tcpsock._sslContext.set_verify(OpenSSL.SSL.VERIFY_PEER,
- self._ssl_verify_callback)
- tcpsock._sslContext.set_cipher_list(self.cipher_list)
- store = tcpsock._sslContext.get_cert_store()
- self._load_cert_file(self.cacerts, store)
- self._load_cert_file(self.mycerts, store)
- paths = ['/etc/ssl/certs',
- '/etc/ssl'] # FreeBSD uses this
- for path in paths:
- if not os.path.isdir(path):
- continue
- for f in os.listdir(path):
- # We don't logg because there is a lot a duplicated certs
- # in this folder
- self._load_cert_file(os.path.join(path, f), store, logg=False)
-
- tcpsock._sslObj = OpenSSL.SSL.Connection(tcpsock._sslContext,
- tcpsock._sock)
- tcpsock._sslObj.set_connect_state() # set to client mode
-
- if self.alpn:
- # Set SNI EXT on the SSL Connection object, see XEP-0368
- tcpsock._sslObj.set_tlsext_host_name(tcpsock._owner.Server.encode())
-
- wrapper = PyOpenSSLWrapper(tcpsock._sslObj)
- tcpsock._recv = wrapper.recv
- tcpsock._send = wrapper.send
-
- log.debug("Initiating handshake...")
- try:
- tcpsock._sslObj.do_handshake()
- except (OpenSSL.SSL.WantReadError, OpenSSL.SSL.WantWriteError) as e:
- pass
- except:
- log.error('Error while TLS handshake: ', exc_info=True)
- return False
- self._owner.ssl_lib = PYOPENSSL
- return True
-
- def _ssl_verify_callback(self, sslconn, cert, errnum, depth, ok):
- # Exceptions can't propagate up through this callback, so print them here.
- try:
- if errnum:
- self._owner.ssl_errors.append(errnum)
- # This stores all ssl errors that are encountered while
- # the chain is verifyed
- if not self._owner.ssl_errnum:
- # This records the first ssl error that is encountered
- # we keep this because of backwards compatibility
- self._owner.ssl_errnum = errnum
- if depth == 0:
- self._owner.ssl_certificate = cert
- return True
- except Exception:
- log.exception("Exception caught in _ssl_info_callback:")
- # Make sure something is printed, even if log is disabled.
- traceback.print_exc()
-
- def get_channel_binding(self):
- """
- Get channel binding data. RFC 5929
- """
- sslObj = self._owner._sslObj
- try:
- return sslObj.get_finished()
- except AttributeError:
- raise NotImplementedError
-
- def get_ssl_connection(self):
- try:
- return self._owner._sslObj
- except Exception:
- pass
diff --git a/nbxmpp/transports.py b/nbxmpp/transports.py
deleted file mode 100644
index 901bc51..0000000
--- a/nbxmpp/transports.py
+++ /dev/null
@@ -1,848 +0,0 @@
-## transports.py
-##
-## Copyright (C) 2003-2004 Alexey "Snake" Nezhdanov
-## modified by Dimitur Kirov <dkirov@gmail.com>
-## modified by Tomas Karasek <tom.to.the.k@gmail.com>
-##
-## This program is free software; you can redistribute it and/or modify
-## it under the terms of the GNU General Public License as published by
-## the Free Software Foundation; either version 2, or (at your option)
-## any later version.
-##
-## This program is distributed in the hope that it will be useful,
-## but WITHOUT ANY WARRANTY; without even the implied warranty of
-## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-## GNU General Public License for more details.
-
-"""
-Transports are objects responsible for connecting to XMPP server and putting
-data to wrapped sockets in in desired form (SSL, TLS, TCP, for HTTP proxy,
-for SOCKS5 proxy...)
-
-Transports are not aware of XMPP stanzas and only responsible for low-level
-connection handling.
-"""
-
-import socket
-import errno
-import time
-import traceback
-import base64
-import sys
-import locale
-import logging
-from urllib.parse import urlparse
-
-from .plugin import PlugIn
-from .idlequeue import IdleObject
-from . import proxy_connectors
-from . import tls
-
-log = logging.getLogger('nbxmpp.transports')
-
-def urisplit(uri):
- """
- Function for splitting URI string to tuple (protocol, host, port, path).
- e.g. urisplit('http://httpcm.jabber.org:123/webclient') returns ('http',
- 'httpcm.jabber.org', 123, '/webclient') return 443 as default port if proto
- is https else 80
- """
- splitted = urlparse(uri)
- proto, host, path = splitted.scheme, splitted.hostname, splitted.path
- try:
- port = splitted.port
- except ValueError:
- log.warning('port cannot be extracted from BOSH URL %s, using default port' \
- % uri)
- port = ''
- if not port:
- if proto == 'https':
- port = 443
- else:
- port = 80
- return proto, host, port, path
-
-def get_proxy_data_from_dict(proxy):
- tcp_host, tcp_port, proxy_user, proxy_pass = None, None, None, None
- proxy_type = proxy['type']
- if proxy_type == 'bosh' and not proxy['bosh_useproxy']:
- # with BOSH not over proxy we have to parse the hostname from BOSH URI
- proto, tcp_host, tcp_port, path = urisplit(proxy['bosh_uri'])
- else:
- # with proxy!=bosh or with bosh over HTTP proxy we're connecting to proxy
- # machine
- tcp_host, tcp_port = proxy['host'], proxy['port']
- if proxy.get('useauth', False):
- proxy_user, proxy_pass = proxy['user'], proxy['pass']
- return tcp_host, tcp_port, proxy_user, proxy_pass
-
-def decode_py2(string, encoding):
- # decodes string into unicode if in py2
- # py3 has unicode strings by default
- try:
- string = string.decode(encoding)
- except AttributeError:
- pass
- return string
-
-#: timeout to connect to the server socket, it doesn't include auth
-CONNECT_TIMEOUT_SECONDS = 30
-
-#: how long to wait for a disconnect to complete
-DISCONNECT_TIMEOUT_SECONDS = 5
-
-#: size of the buffer which reads data from server
-# if lower, more stanzas will be fragmented and processed twice
-RECV_BUFSIZE = 32768 # 2x maximum size of ssl packet, should be plenty
-# it's inefficient but should work. Problem is that connect machine makes wrong
-# assumptions and that we only check for pending data in sockets but not in SSL
-# buffer...
-
-DATA_RECEIVED = 'DATA RECEIVED'
-DATA_SENT = 'DATA SENT'
-DATA_ERROR = 'DATA ERROR'
-
-DISCONNECTED = 'DISCONNECTED'
-DISCONNECTING = 'DISCONNECTING'
-CONNECTING = 'CONNECTING'
-PROXY_CONNECTING = 'PROXY_CONNECTING'
-CONNECTED = 'CONNECTED'
-STATES = (DISCONNECTED, CONNECTING, PROXY_CONNECTING, CONNECTED, DISCONNECTING)
-
-class NonBlockingTransport(PlugIn):
- """
- Abstract class representing a transport
-
- Subclasses CAN have different constructor signature but connect method SHOULD
- be the same.
- """
-
- def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls,
- certs, tls_version, cipher_list):
- """
- Each trasport class can have different constructor but it has to have at
- least all the arguments of NonBlockingTransport constructor
-
- :param raise_event: callback for monitoring of sent and received data
- :param on_disconnect: callback called on disconnection during runtime
- :param idlequeue: processing idlequeue
- :param estabilish_tls: boolean whether to estabilish TLS connection
- after TCP connection is done
- :param certs: tuple of (cacerts, mycerts) see constructor
- of tls.NonBlockingTLS for more details
- :param tls_version: The lowest supported TLS version.
- :param cipher_list: list of ciphers used to connect to server
- """
- PlugIn.__init__(self)
- self.raise_event = raise_event
- self.on_disconnect = on_disconnect
- self.on_connect = None
- self.on_connect_failure = None
- self.idlequeue = idlequeue
- self.on_receive = None
- self.server = None
- self.port = None
- self.conn_5tuple = None
- self.set_state(DISCONNECTED)
- self.estabilish_tls = estabilish_tls
- self.certs = certs
- self.tls_version = tls_version
- self.cipher_list = cipher_list
- # type of used ssl lib (if any) will be assigned to this member var
- self.ssl_lib = None
- self._exported_methods=[self.onreceive, self.set_send_timeout,
- self.set_send_timeout2, self.set_timeout, self.remove_timeout,
- self.start_disconnect]
-
- # time to wait for SOME stanza to come and then send keepalive
- self.sendtimeout = 0
-
- # in case we want to something different than sending keepalives
- self.on_timeout = None
- self.on_timeout2 = None
-
- def plugin(self, owner):
- owner.Connection = self
-
- def plugout(self):
- self._owner.Connection = None
- self._owner = None
- self.disconnect(do_callback=False)
-
- def connect(self, conn_5tuple, on_connect, on_connect_failure):
- """
- Creates and connects transport to server and port defined in conn_5tuple
- which should be item from list returned from getaddrinfo
-
- :param conn_5tuple: 5-tuple returned from getaddrinfo
- :param on_connect: callback called on successful connect to the server
- :param on_connect_failure: callback called on failure when connecting
- """
- self.on_connect = on_connect
- self.on_connect_failure = on_connect_failure
- self.server, self.port = conn_5tuple[4][:2]
- self.conn_5tuple = conn_5tuple
-
- def set_state(self, newstate):
- assert(newstate in STATES)
- self.state = newstate
-
- def get_state(self):
- return self.state
-
- def _on_connect(self):
- """
- Preceeds call of on_connect callback
- """
- # data is reference to socket wrapper instance. We don't need it in client
- # because
- self.set_state(CONNECTED)
- self.on_connect()
-
- def _on_connect_failure(self, err_message):
- """
- Preceeds call of on_connect_failure callback
- """
- # In case of error while connecting we need to disconnect transport
- # but we don't want to call DisconnectHandlers from client,
- # thus the do_callback=False
- self.disconnect(do_callback=False)
- self.on_connect_failure(err_message=err_message)
-
- def send(self, raw_data, now=False):
- if self.get_state() == DISCONNECTED:
- log.error('Unable to send %s \n because state is %s.' %
- (raw_data, self.get_state()))
-
- def disconnect(self, do_callback=True):
- self.set_state(DISCONNECTED)
- if do_callback:
- # invoke callback given in __init__
- self.on_disconnect()
-
- def onreceive(self, recv_handler):
- """
- Set the on_receive callback.
-
- onreceive(None) sets callback to Dispatcher.ProcessNonBlocking which is
- the default one that will decide what to do with received stanza based on
- its tag name and namespace.
-
- Do not confuse it with on_receive() method, which is the callback
- itself.
- """
- if not recv_handler:
- if hasattr(self, '_owner') and hasattr(self._owner, 'Dispatcher'):
- self.on_receive = self._owner.Dispatcher.ProcessNonBlocking
- else:
- log.warning('No Dispatcher plugged. Received data will not be processed')
- self.on_receive = None
- return
- self.on_receive = recv_handler
-
- def _tcp_connecting_started(self):
- self.set_state(CONNECTING)
-
- def read_timeout(self):
- """
- Called when there's no response from server in defined timeout
- """
- if self.on_timeout:
- self.on_timeout()
- self.renew_send_timeout()
-
- def read_timeout2(self):
- """
- called when there's no response from server in defined timeout
- """
- if self.on_timeout2:
- self.on_timeout2()
- self.renew_send_timeout2()
-
- def renew_send_timeout(self):
- if self.on_timeout and self.sendtimeout > 0:
- self.set_timeout(self.sendtimeout)
-
- def renew_send_timeout2(self):
- if self.on_timeout2 and self.sendtimeout2 > 0:
- self.set_timeout2(self.sendtimeout2)
-
- def set_timeout(self, timeout):
- self.idlequeue.set_read_timeout(self.fd, timeout)
-
- def set_timeout2(self, timeout2):
- self.idlequeue.set_read_timeout(self.fd, timeout2, self.read_timeout2)
-
- def get_fd(self):
- pass
-
- def remove_timeout(self):
- self.idlequeue.remove_timeout(self.fd)
-
- def set_send_timeout(self, timeout, on_timeout):
- self.sendtimeout = timeout
- if self.sendtimeout > 0:
- self.on_timeout = on_timeout
- else:
- self.on_timeout = None
-
- def set_send_timeout2(self, timeout2, on_timeout2):
- self.sendtimeout2 = timeout2
- if self.sendtimeout2 > 0:
- self.on_timeout2 = on_timeout2
- else:
- self.on_timeout2 = None
-
- # FIXME: where and why does this need to be called
- def start_disconnect(self):
- self.set_state(DISCONNECTING)
- if hasattr(self._owner, 'Smacks'):
- self._owner.Smacks.send_closing_ack()
-
-
-class NonBlockingTCP(NonBlockingTransport, IdleObject):
- """
- Non-blocking TCP socket wrapper
-
- It is used for simple XMPP connection. Can be connected via proxy and can
- estabilish TLS connection.
- """
- def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls,
- certs, tls_version, cipher_list, alpn, proxy_dict=None):
- """
- :param proxy_dict: dictionary with proxy data as loaded from config file
- """
- NonBlockingTransport.__init__(self, raise_event, on_disconnect,
- idlequeue, estabilish_tls, certs, tls_version, cipher_list)
- IdleObject.__init__(self)
-
- # queue with messages to be send
- self.sendqueue = []
-
- # bytes remained from the last send message
- self.sendbuff = ''
- self.sent_bytes_buff = b''
-
- # bytes remained from the last received message
- self.received_bytes_buff = b''
-
- self.proxy_dict = proxy_dict
- self.on_remote_disconnect = self.disconnect
-
- # ssl variables
- self.ssl_certificate = None
- # first ssl error
- self.ssl_errnum = 0
- # all ssl errors
- self.ssl_errors = []
-
- self.alpn = alpn
-
- # FIXME: transport should not be aware xmpp
- def start_disconnect(self):
- NonBlockingTransport.start_disconnect(self)
- self.send('</stream:stream>', now=True)
- self.disconnect()
-
- def connect(self, conn_5tuple, on_connect, on_connect_failure):
- NonBlockingTransport.connect(self, conn_5tuple, on_connect,
- on_connect_failure)
- log.info('NonBlockingTCP Connect :: About to connect to %s:%s' %
- (self.server, self.port))
-
- try:
- self._sock = socket.socket(*conn_5tuple[:3])
- except socket.error as e:
- self._on_connect_failure('NonBlockingTCP Connect: Error while creating\
- socket: %s' % str(e))
- return
-
- self._send = self._sock.send
- self._recv = self._sock.recv
- self.fd = self._sock.fileno()
-
- # we want to be notified when send is possible to connected socket because
- # it means the TCP connection is estabilished
- self._plug_idle(writable=True, readable=False)
- self.peerhost = None
-
- # variable for errno symbol that will be found from exception raised
- # from connect()
- errnum = 0
- errstr = str()
-
- # set timeout for TCP connecting - if nonblocking connect() fails, pollend
- # is called. If if succeeds pollout is called.
- self.idlequeue.set_read_timeout(self.fd, CONNECT_TIMEOUT_SECONDS)
-
- try:
- self._sock.setblocking(False)
- self._sock.connect((self.server, self.port))
- except Exception as exc:
- errnum, errstr = exc.errno, \
- decode_py2(exc.strerror, locale.getpreferredencoding())
-
- if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK):
- # connecting in progress
- log.info('After NB connect() of %s. "%s" raised => CONNECTING' %
- (id(self), errstr))
- self._tcp_connecting_started()
- return
-
- # if there was some other exception, call failure callback and unplug
- # transport which will also remove read_timeouts for descriptor
- self._on_connect_failure('Exception while connecting to %s:%s - %s %s' %
- (self.server, self.port, errnum, errstr))
-
- def _connect_to_proxy(self):
- self.set_state(PROXY_CONNECTING)
- if self.proxy_dict['type'] == 'socks5':
- proxyclass = proxy_connectors.SOCKS5Connector
- elif self.proxy_dict['type'] == 'http' :
- proxyclass = proxy_connectors.HTTPCONNECTConnector
- proxyclass.get_instance(
- send_method=self.send,
- onreceive=self.onreceive,
- old_on_receive=self.on_receive,
- on_success=self._on_connect,
- on_failure=self._on_connect_failure,
- xmpp_server=self.proxy_dict['xmpp_server'],
- proxy_creds=self.proxy_dict['credentials'])
-
- def _on_connect(self):
- """
- Preceed invoking of on_connect callback. TCP connection is already
- estabilished by this time
- """
- if self.estabilish_tls:
- self.tls_init(
- on_succ = lambda: NonBlockingTransport._on_connect(self),
- on_fail = lambda: self._on_connect_failure(
- 'error while estabilishing TLS'))
- else:
- NonBlockingTransport._on_connect(self)
-
- def tls_init(self, on_succ, on_fail):
- """
- Estabilishes TLS/SSL using this TCP connection by plugging a
- NonBlockingTLS module
- """
- cacerts, mycerts = self.certs
- result = tls.NonBlockingTLS.get_instance(cacerts, mycerts,
- self.tls_version, self.cipher_list, self.alpn).PlugIn(self)
- if result:
- on_succ()
- else:
- on_fail()
-
- def pollin(self):
- """
- Called by idlequeu when receive on plugged socket is possible
- """
- log.info('pollin called, state == %s' % self.get_state())
- self._do_receive()
-
- def pollout(self):
- """
- Called by idlequeu when send to plugged socket is possible
- """
- log.info('pollout called, state == %s' % self.get_state())
-
- if self.get_state() == CONNECTING:
- log.info('%s socket wrapper connected' % id(self))
- self.idlequeue.remove_timeout(self.fd)
- self._plug_idle(writable=False, readable=False)
- self.peerhost = self._sock.getsockname()
- if self.proxy_dict:
- self._connect_to_proxy()
- else:
- self._on_connect()
- elif self.get_state() != DISCONNECTED:
- self._do_send()
-
- def pollend(self):
- """
- Called by idlequeue on TCP connection errors
- """
- log.info('pollend called, state == %s' % self.get_state())
-
- if self.get_state() == CONNECTING:
- self._on_connect_failure('Error during connect to %s:%s' %
- (self.server, self.port))
- else:
- self.disconnect()
-
- def disconnect(self, do_callback=True):
- if self.get_state() == DISCONNECTED:
- return
- self.set_state(DISCONNECTED)
- self.idlequeue.unplug_idle(self.fd)
- if 'NonBlockingTLS' in self.__dict__:
- self.NonBlockingTLS.PlugOut()
- try:
- self._sock.shutdown(socket.SHUT_RDWR)
- self._sock.close()
- except socket.error as e:
- errstr = decode_py2(e.strerror, locale.getpreferredencoding())
- log.info('Error while disconnecting socket: %s' % errstr)
- self.fd = -1
- NonBlockingTransport.disconnect(self, do_callback)
-
- def read_timeout(self):
- log.info('read_timeout called, state == %s' % self.get_state())
- if self.get_state() == CONNECTING:
- # if read_timeout is called during connecting, connect() didn't end yet
- # thus we have to call the tcp failure callback
- self._on_connect_failure('Error during connect to %s:%s' %
- (self.server, self.port))
- else:
- NonBlockingTransport.read_timeout(self)
-
- def set_timeout(self, timeout):
- if self.get_state() != DISCONNECTED and self.fd != -1:
- NonBlockingTransport.set_timeout(self, timeout)
- else:
- log.warning('set_timeout: TIMEOUT NOT SET: state is %s, fd is %s' %
- (self.get_state(), self.fd))
-
- def remove_timeout(self):
- if self.fd:
- NonBlockingTransport.remove_timeout(self)
- else:
- log.warning('remove_timeout: no self.fd state is %s' % self.get_state())
-
- def send(self, raw_data, now=False):
- """
- Append raw_data to the queue of messages to be send. If supplied data is
- unicode string, encode it to utf-8.
- """
- NonBlockingTransport.send(self, raw_data, now)
-
- if isinstance(raw_data, bytes):
- r = raw_data
- else:
- r = self.encode_stanza(raw_data)
-
- if now:
- self.sendqueue.insert(0, r)
- self._do_send()
- else:
- self.sendqueue.append(r)
-
- self._plug_idle(writable=True, readable=True)
-
- def encode_stanza(self, stanza):
- """
- Encode str or unicode to utf-8
- """
- if isinstance(stanza, str):
- stanza = stanza.encode('utf-8')
- elif not isinstance(stanza, str):
- stanza = str(stanza).encode('utf-8')
- return stanza
-
- def _plug_idle(self, writable, readable):
- """
- Plug file descriptor of socket to Idlequeue
-
- Plugged socket will be watched for "send possible" or/and "recv possible"
- events. pollin() callback is invoked on "recv possible", pollout() on
- "send_possible".
-
- Plugged socket will always be watched for "error" event - in that case,
- pollend() is called.
- """
- log.info('Plugging fd %d, W:%s, R:%s' % (self.fd, writable, readable))
- self.idlequeue.plug_idle(self, writable, readable)
-
- def _do_send(self):
- """
- Called when send() to connected socket is possible. First message from
- sendqueue will be sent
- """
- if not self.sendbuff:
- if not self.sendqueue:
- log.warning('calling send on empty buffer and queue')
- self._plug_idle(writable=False, readable=True)
- return None
- self.sendbuff = self.sendqueue.pop(0)
- try:
- send_count = self._send(self.sendbuff)
- if send_count:
- sent_data = self.sendbuff[:send_count]
- self.sendbuff = self.sendbuff[send_count:]
- self._plug_idle(writable=((self.sendqueue != []) or (len(self.sendbuff) != 0)), readable=True)
-
- if self.sent_bytes_buff:
- sent_data = self.sent_bytes_buff + sent_data
- self.sent_bytes_buff = b''
- # try to decode sent data
- try:
- sent_data = decode_py2(sent_data, 'utf-8')
- except UnicodeDecodeError:
- for i in range(-1, -4, -1):
- char = sent_data[i]
- if ord(char) & 0xc0 == 0xc0:
- self.sent_bytes_buff = sent_data[i:]
- sent_data = sent_data[:i]
- break
- sent_data = decode_py2(sent_data, 'utf-8')
- self.raise_event(DATA_SENT, sent_data)
-
- except Exception:
- log.error('_do_send:', exc_info=True)
- traceback.print_exc()
- self.disconnect()
-
- def _do_receive(self):
- """
- Reads all pending incoming data. Will call owner's disconnected() method
- if appropriate
- """
- received = None
- errnum = 0
- errstr = 'No Error Set'
-
- try:
- # get as many bites, as possible, but not more than RECV_BUFSIZE
- received = self._recv(RECV_BUFSIZE)
- except tls.SSLWrapper.Error as e:
- log.info("_do_receive, caught SSL error, got %s:" % received,
- exc_info=True)
- errnum, errstr = e.errno,\
- decode_py2(e.strerror, locale.getpreferredencoding())
- except socket.error as e:
- log.info("_do_receive: got %s:" % received, exc_info=True)
-
- if received == '':
- errstr = 'zero bytes on recv'
-
- if (self.ssl_lib is None and received == '') or \
- (self.ssl_lib == tls.PYOPENSSL and errnum == -1 ):
- # -1 in pyopenssl: errstr == Unexpected EOF
- log.info("Disconnected by remote server: #%s, %s" % (errnum, errstr))
- self.on_remote_disconnect()
- return
-
- if errnum:
- log.info("Connection to %s:%s lost: %s %s" % (self.server, self.port,
- errnum, errstr), exc_info=True)
- self.disconnect()
- return
-
- # this branch is for case of non-fatal SSL errors - None is returned from
- # recv() but no errnum is set
- if received is None:
- return
-
- # we have received some bytes, stop the timeout!
- self.remove_timeout()
- self.renew_send_timeout()
- self.renew_send_timeout2()
-
- if self.received_bytes_buff:
- received = self.received_bytes_buff + received
- self.received_bytes_buff = b''
-
- if self.state != PROXY_CONNECTING or self.proxy_dict['type'] != \
- 'socks5':
- # try to decode data
- try:
- received = decode_py2(received, 'utf-8')
- except UnicodeDecodeError:
- for i in range(-1, -4, -1):
- char = received[i]
- if char & 0xc0 == 0xc0:
- self.received_bytes_buff = received[i:]
- received = received[:i]
- break
- received = decode_py2(received, 'utf-8')
-
- # pass received data to owner
- if self.on_receive:
- self.raise_event(DATA_RECEIVED, received)
- self._on_receive(received)
- else:
- # This should never happen, so we need the debug.
- # (If there is no handler on receive specified, data is passed to
- # Dispatcher.ProcessNonBlocking)
- log.error('SOCKET %s Unhandled data received: %s' % (id(self),
- received))
- self.disconnect()
-
- def _on_receive(self, data):
- """
- Preceeds on_receive callback. It peels off and checks HTTP headers in
- HTTP classes, in here it just calls the callback
- """
- self.on_receive(data)
-
-
-class NonBlockingHTTP(NonBlockingTCP):
- """
- Socket wrapper that creates HTTP message out of sent data and peels-off HTTP
- headers from incoming messages
- """
-
- def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls,
- certs, tls_version, cipher_list, on_http_request_possible, on_persistent_fallback,
- http_dict, proxy_dict=None):
- """
- :param on_http_request_possible: method to call when HTTP request to
- socket owned by transport is possible.
- :param on_persistent_fallback: callback called when server ends TCP
- connection. It doesn't have to be fatal for HTTP session.
- :param http_dict: dictionary with data for HTTP request and headers
- """
- NonBlockingTCP.__init__(self, raise_event, on_disconnect, idlequeue,
- estabilish_tls, certs, tls_version, cipher_list, False, proxy_dict)
-
- self.http_protocol, self.http_host, self.http_port, self.http_path = \
- urisplit(http_dict['http_uri'])
- self.http_protocol = self.http_protocol or 'http'
- self.http_path = self.http_path or '/'
- self.http_version = http_dict['http_version']
- self.http_persistent = http_dict['http_persistent']
- self.add_proxy_headers = http_dict['add_proxy_headers']
-
- if 'proxy_user' in http_dict and 'proxy_pass' in http_dict:
- self.proxy_user, self.proxy_pass = http_dict['proxy_user'], http_dict[
- 'proxy_pass']
- else:
- self.proxy_user, self.proxy_pass = None, None
-
- # buffer for partial responses
- self.recvbuff = ''
- self.expected_length = 0
- self.pending_requests = 0
- self.on_http_request_possible = on_http_request_possible
- self.last_recv_time = 0
- self.close_current_connection = False
- self.on_remote_disconnect = lambda: on_persistent_fallback(self)
-
- def http_send(self, raw_data, now=False):
- self.send(self.build_http_message(raw_data), now)
-
- def _on_receive(self, data):
- """
- Preceeds passing received data to owner class. Gets rid of HTTP headers
- and checks them.
- """
- if self.get_state() == PROXY_CONNECTING:
- NonBlockingTCP._on_receive(self, data)
- return
-
- # append currently received data to HTTP msg in buffer
- self.recvbuff = '%s%s' % (self.recvbuff or '', data)
- statusline, headers, httpbody, buffer_rest = self.parse_http_message(
- self.recvbuff)
-
- if not (statusline and headers and httpbody):
- log.debug('Received incomplete HTTP response')
- return
-
- if statusline[1] != '200':
- log.error('HTTP Error: %s %s' % (statusline[1], statusline[2]))
- self.disconnect()
- return
- self.expected_length = int(headers['Content-Length'])
- if 'Connection' in headers and headers['Connection'].strip()=='close':
- self.close_current_connection = True
-
- if self.expected_length > len(httpbody.encode('utf-8')):
- # If we haven't received the whole HTTP mess yet, let's end the thread.
- # It will be finnished from one of following recvs on plugged socket.
- log.info('not enough bytes in HTTP response - %d expected, got %d' %
- (self.expected_length, len(httpbody.encode('utf-8'))))
- else:
- # First part of buffer has been extraced and is going to be handled,
- # remove it from buffer
- self.recvbuff = buffer_rest
-
- # everything was received
- self.expected_length = 0
-
- if not self.http_persistent or self.close_current_connection:
- # not-persistent connections disconnect after response
- self.disconnect(do_callback=False)
- self.close_current_connection = False
- self.last_recv_time = time.time()
- self.on_receive(data=httpbody, socket=self)
- self.on_http_request_possible()
-
- def build_http_message(self, httpbody, method='POST'):
- """
- Builds http message with given body. Values for headers and status line
- fields are taken from class variables
- """
- headers = ['%s %s %s' % (method, self.http_path, self.http_version),
- 'Host: %s:%s' % (self.http_host, self.http_port),
- 'User-Agent: Gajim',
- 'Content-Type: text/xml; charset=utf-8',
- 'Content-Length: %s' % len(httpbody)]
- if self.add_proxy_headers:
- headers.append('Proxy-Connection: keep-alive')
- headers.append('Pragma: no-cache')
- if self.proxy_user and self.proxy_pass:
- credentials = '%s:%s' % (self.proxy_user, self.proxy_pass)
- credentials = base64.encodestring(credentials).strip()
- headers.append('Proxy-Authorization: Basic %s' % credentials)
- else:
- headers.append('Connection: Keep-Alive')
- headers.append('\r\n')
- headers = '\r\n'.join(headers)
- return b'%s%s' % (headers.encode('utf-8'), httpbody)
-
- def parse_http_message(self, message):
- """
- Split http message into a tuple:
- - (statusline - list of e.g. ['HTTP/1.1', '200', 'OK'],
- - headers - dictionary of headers e.g. {'Content-Length': '604',
- 'Content-Type': 'text/xml; charset=utf-8'},
- - httpbody - string with http body)
- - http_rest - what is left in the message after a full HTTP header + body
- """
- splitted = message.split('\r\n\r\n')
- if len(splitted) < 2:
- # no complete http message. Keep filling the buffer until we find one
- buffer_rest = message
- return ('', '', '', buffer_rest)
- else:
- (header, httpbody) = splitted[:2]
- header = header.replace('\r', '')
- header = header.lstrip('\n')
- header = header.split('\n')
- statusline = header[0].split(' ', 2)
- header = header[1:]
- headers = {}
- for dummy in header:
- row = dummy.split(' ', 1)
- headers[row[0][:-1]] = row[1]
- body_size = int(headers['Content-Length'])
- rest_splitted = splitted[2:]
- while (len(httpbody) < body_size) and rest_splitted:
- # Complete httpbody until it has the announced size
- httpbody = '\n\n'.join([httpbody, rest_splitted.pop(0)])
- buffer_rest = "\n\n".join(rest_splitted)
- return (statusline, headers, httpbody, buffer_rest)
-
-
-class NonBlockingHTTPBOSH(NonBlockingHTTP):
- """
- Class for BOSH HTTP connections. Slightly redefines HTTP transport by
- calling bosh bodytag generating callback before putting data on wire
- """
-
- def set_stanza_build_cb(self, build_cb):
- self.build_cb = build_cb
-
- def _do_send(self):
- if self.state == PROXY_CONNECTING:
- NonBlockingTCP._do_send(self)
- return
- if not self.sendbuff:
- stanza = self.build_cb(socket=self)
- stanza = self.encode_stanza(stanza)
- stanza = self.build_http_message(httpbody=stanza)
- self.sendbuff = stanza
- NonBlockingTCP._do_send(self)