Welcome to mirror list, hosted at ThFree Co, Russian Federation.

dev.gajim.org/gajim/gajim.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYann Leboulanger <asterix@lagaule.org>2011-08-22 11:45:51 +0400
committerYann Leboulanger <asterix@lagaule.org>2011-08-22 11:45:51 +0400
commitf0a0929d5c44fc2114fd6ce83e85f7a7cc8d38e9 (patch)
treef606b611829891192c2f36512abe1042bdf1d139 /src/common/xmpp
parenta66b2795187d5ac04a05ea4ed02b092548f12efb (diff)
parent4971f7d8cc03e94b409d278c4be09720082dcbec (diff)
merge XEP-0198 implementation from gajim-XEP-198 branch to trunk. Thanks Jefry for your work during GSOC
Diffstat (limited to 'src/common/xmpp')
-rw-r--r--src/common/xmpp/__init__.py1
-rw-r--r--src/common/xmpp/auth_nb.py113
-rw-r--r--src/common/xmpp/client_nb.py11
-rw-r--r--src/common/xmpp/dispatcher_nb.py17
-rw-r--r--src/common/xmpp/protocol.py29
-rw-r--r--src/common/xmpp/smacks.py129
6 files changed, 257 insertions, 43 deletions
diff --git a/src/common/xmpp/__init__.py b/src/common/xmpp/__init__.py
index 4ebc4cfa3..46037a1cb 100644
--- a/src/common/xmpp/__init__.py
+++ b/src/common/xmpp/__init__.py
@@ -15,3 +15,4 @@ import simplexml, protocol, auth_nb, transports_nb, roster_nb
import dispatcher_nb, features_nb, idlequeue, bosh, tls_nb, proxy_connectors
from client_nb import NonBlockingClient
from plugin import PlugIn
+from smacks import Smacks
diff --git a/src/common/xmpp/auth_nb.py b/src/common/xmpp/auth_nb.py
index fbe30aef8..3f4fcb015 100644
--- a/src/common/xmpp/auth_nb.py
+++ b/src/common/xmpp/auth_nb.py
@@ -22,8 +22,10 @@ See client_nb.py
"""
from protocol import NS_SASL, NS_SESSION, NS_STREAMS, NS_BIND, NS_AUTH
+from protocol import NS_STREAM_MGMT
from protocol import Node, NodeProcessed, isResultNode, Iq, Protocol, JID
from plugin import PlugIn
+from smacks import Smacks
import base64
import random
import itertools
@@ -142,7 +144,7 @@ class SASL(PlugIn):
elif self._owner.Dispatcher.Stream.features:
try:
self.FeaturesHandler(self._owner.Dispatcher,
- self._owner.Dispatcher.Stream.features)
+ self._owner.Dispatcher.Stream.features)
except NodeProcessed:
pass
else:
@@ -154,16 +156,16 @@ class SASL(PlugIn):
"""
if 'features' in self._owner.__dict__:
self._owner.UnregisterHandler('features', self.FeaturesHandler,
- xmlns=NS_STREAMS)
+ xmlns=NS_STREAMS)
if 'challenge' in self._owner.__dict__:
self._owner.UnregisterHandler('challenge', self.SASLHandler,
- xmlns=NS_SASL)
+ xmlns=NS_SASL)
if 'failure' in self._owner.__dict__:
self._owner.UnregisterHandler('failure', self.SASLHandler,
- xmlns=NS_SASL)
+ xmlns=NS_SASL)
if 'success' in self._owner.__dict__:
self._owner.UnregisterHandler('success', self.SASLHandler,
- xmlns=NS_SASL)
+ xmlns=NS_SASL)
def auth(self):
"""
@@ -178,12 +180,12 @@ class SASL(PlugIn):
elif self._owner.Dispatcher.Stream.features:
try:
self.FeaturesHandler(self._owner.Dispatcher,
- self._owner.Dispatcher.Stream.features)
+ self._owner.Dispatcher.Stream.features)
except NodeProcessed:
pass
else:
self._owner.RegisterHandler('features',
- self.FeaturesHandler, xmlns=NS_STREAMS)
+ self.FeaturesHandler, xmlns=NS_STREAMS)
def FeaturesHandler(self, conn, feats):
"""
@@ -198,7 +200,8 @@ class SASL(PlugIn):
'mechanism'):
self.mecs.append(mec.getData())
- self._owner.RegisterHandler('challenge', self.SASLHandler, xmlns=NS_SASL)
+ self._owner.RegisterHandler('challenge', self.SASLHandler,
+ xmlns=NS_SASL)
self._owner.RegisterHandler('failure', self.SASLHandler, xmlns=NS_SASL)
self._owner.RegisterHandler('success', self.SASLHandler, xmlns=NS_SASL)
self.MechanismHandler()
@@ -206,7 +209,8 @@ class SASL(PlugIn):
def MechanismHandler(self):
if 'ANONYMOUS' in self.mecs and self.username is None:
self.mecs.remove('ANONYMOUS')
- node = Node('auth', attrs={'xmlns': NS_SASL, 'mechanism': 'ANONYMOUS'})
+ node = Node('auth', attrs={'xmlns': NS_SASL,
+ 'mechanism': 'ANONYMOUS'})
self.mechanism = 'ANONYMOUS'
self.startsasl = SASL_IN_PROCESS
self._owner.send(str(node))
@@ -226,11 +230,11 @@ class SASL(PlugIn):
self.mecs.remove('GSSAPI')
try:
self.gss_vc = kerberos.authGSSClientInit('xmpp@' + \
- self._owner.xmpp_hostname)[1]
+ self._owner.xmpp_hostname)[1]
kerberos.authGSSClientStep(self.gss_vc, '')
response = kerberos.authGSSClientResponse(self.gss_vc)
- node=Node('auth', attrs={'xmlns': NS_SASL, 'mechanism': 'GSSAPI'},
- payload=(response or ''))
+ node=Node('auth', attrs={'xmlns': NS_SASL,
+ 'mechanism': 'GSSAPI'}, payload=(response or ''))
self.mechanism = 'GSSAPI'
self.gss_step = GSS_STATE_STEP
self.startsasl = SASL_IN_PROCESS
@@ -247,7 +251,8 @@ class SASL(PlugIn):
raise NodeProcessed
if 'DIGEST-MD5' in self.mecs:
self.mecs.remove('DIGEST-MD5')
- node = Node('auth', attrs={'xmlns': NS_SASL, 'mechanism': 'DIGEST-MD5'})
+ node = Node('auth', attrs={'xmlns': NS_SASL,
+ 'mechanism': 'DIGEST-MD5'})
self.mechanism = 'DIGEST-MD5'
self.startsasl = SASL_IN_PROCESS
self._owner.send(str(node))
@@ -307,13 +312,13 @@ class SASL(PlugIn):
handlers = self._owner.Dispatcher.dumpHandlers()
# Bosh specific dispatcher replugging
- # save old features. They will be used in case we won't get response on
- # stream restart after SASL auth (happens with XMPP over BOSH with
- # Openfire)
+ # save old features. They will be used in case we won't get response
+ # on stream restart after SASL auth (happens with XMPP over BOSH
+ # with Openfire)
old_features = self._owner.Dispatcher.Stream.features
self._owner.Dispatcher.PlugOut()
dispatcher_nb.Dispatcher.get_instance().PlugIn(self._owner,
- after_SASL=True, old_features=old_features)
+ after_SASL=True, old_features=old_features)
self._owner.Dispatcher.restoreHandlers(handlers)
self._owner.User = self.username
@@ -333,12 +338,12 @@ class SASL(PlugIn):
rc = kerberos.authGSSClientUnwrap(self.gss_vc, incoming_data)
response = kerberos.authGSSClientResponse(self.gss_vc)
rc = kerberos.authGSSClientWrap(self.gss_vc, response,
- kerberos.authGSSClientUserName(self.gss_vc))
+ kerberos.authGSSClientUserName(self.gss_vc))
response = kerberos.authGSSClientResponse(self.gss_vc)
if not response:
response = ''
self._owner.send(Node('response', attrs={'xmlns': NS_SASL},
- payload=response).__str__())
+ payload=response).__str__())
raise NodeProcessed
if self.mechanism == 'SCRAM-SHA-1':
hashfn = hashlib.sha1
@@ -416,8 +421,8 @@ class SASL(PlugIn):
else:
self.resp['realm'] = self._owner.Server
self.resp['nonce'] = chal['nonce']
- self.resp['cnonce'] = ''.join("%x" % randint(0, 2**28) for randint in
- itertools.repeat(random.randint, 7))
+ self.resp['cnonce'] = ''.join("%x" % randint(0, 2**28) for randint \
+ in itertools.repeat(random.randint, 7))
self.resp['nc'] = ('00000001')
self.resp['qop'] = 'auth'
self.resp['digest-uri'] = 'xmpp/' + self._owner.Server
@@ -477,14 +482,15 @@ class SASL(PlugIn):
else:
sasl_data += u'%s="%s",' % (key, self.resp[key])
sasl_data = sasl_data[:-1].encode('utf-8').encode('base64').replace(
- '\r', '').replace('\n', '')
- node = Node('response', attrs={'xmlns':NS_SASL}, payload=[sasl_data])
+ '\r', '').replace('\n', '')
+ node = Node('response', attrs={'xmlns': NS_SASL},
+ payload=[sasl_data])
elif self.mechanism == 'PLAIN':
sasl_data = u'\x00%s\x00%s' % (self.username, self.password)
sasl_data = sasl_data.encode('utf-8').encode('base64').replace(
- '\n', '')
+ '\n', '')
node = Node('auth', attrs={'xmlns': NS_SASL, 'mechanism': 'PLAIN'},
- payload=[sasl_data])
+ payload=[sasl_data])
self._owner.send(str(node))
@@ -516,8 +522,8 @@ class NonBlockingNonSASL(PlugIn):
self.owner = owner
owner.Dispatcher.SendAndWaitForResponse(
- Iq('get', NS_AUTH, payload=[Node('username', payload=[self.user])]),
- func=self._on_username)
+ Iq('get', NS_AUTH, payload=[Node('username', payload=[self.user])]),
+ func=self._on_username)
def _on_username(self, resp):
if not isResultNode(resp):
@@ -532,8 +538,8 @@ class NonBlockingNonSASL(PlugIn):
if query.getTag('digest'):
log.info("Performing digest authentication")
query.setTagData('digest',
- hashlib.sha1(self.owner.Dispatcher.Stream._document_attrs['id']
- + self.password).hexdigest())
+ hashlib.sha1(self.owner.Dispatcher.Stream._document_attrs['id']
+ + self.password).hexdigest())
if query.getTag('password'):
query.delChild('password')
self._method = 'digest'
@@ -548,23 +554,25 @@ class NonBlockingNonSASL(PlugIn):
def hash_n_times(s, count):
return count and hasher(hash_n_times(s, count-1)) or s
- hash_ = hash_n_times(hasher(hasher(self.password) + token), int(seq))
+ hash_ = hash_n_times(hasher(hasher(self.password) + token),
+ int(seq))
query.setTagData('hash', hash_)
self._method='0k'
else:
log.warn("Secure methods unsupported, performing plain text \
- authentication")
+ authentication")
query.setTagData('password', self.password)
self._method = 'plain'
- resp = self.owner.Dispatcher.SendAndWaitForResponse(iq, func=self._on_auth)
+ resp = self.owner.Dispatcher.SendAndWaitForResponse(iq,
+ func=self._on_auth)
def _on_auth(self, resp):
if isResultNode(resp):
log.info('Sucessfully authenticated with remote host.')
self.owner.User = self.user
self.owner.Resource = self.resource
- self.owner._registered_name = self.owner.User+'@'+self.owner.Server+\
- '/'+self.owner.Resource
+ self.owner._registered_name = self.owner.User + '@' + \
+ self.owner.Server+ '/' + self.owner.Resource
return self.on_auth(self._method)
log.info('Authentication failed!')
return self.on_auth(None)
@@ -579,24 +587,34 @@ class NonBlockingBind(PlugIn):
def __init__(self):
PlugIn.__init__(self)
self.bound = None
+ self.supports_sm = False
+ self.resuming = False
def plugin(self, owner):
''' Start resource binding, if allowed at this time. Used internally. '''
if self._owner.Dispatcher.Stream.features:
try:
self.FeaturesHandler(self._owner.Dispatcher,
- self._owner.Dispatcher.Stream.features)
+ self._owner.Dispatcher.Stream.features)
except NodeProcessed:
pass
else:
self._owner.RegisterHandler('features', self.FeaturesHandler,
- xmlns=NS_STREAMS)
+ xmlns=NS_STREAMS)
def FeaturesHandler(self, conn, feats):
"""
Determine if server supports resource binding and set some internal
- attributes accordingly
+ attributes accordingly.
+
+ It also checks if server supports stream management
"""
+
+ if feats.getTag('sm', namespace=NS_STREAM_MGMT):
+ self.supports_sm = True # server supports stream management
+ if self.resuming:
+ self._owner._caller.sm.resume_request()
+
if not feats.getTag('bind', namespace=NS_BIND):
log.info('Server does not requested binding.')
# we try to bind resource anyway
@@ -614,12 +632,14 @@ class NonBlockingBind(PlugIn):
Remove Bind handler from owner's dispatcher. Used internally
"""
self._owner.UnregisterHandler('features', self.FeaturesHandler,
- xmlns=NS_STREAMS)
+ xmlns=NS_STREAMS)
def NonBlockingBind(self, resource=None, on_bound=None):
"""
Perform binding. Use provided resource name or random (if not provided).
"""
+ if self.resuming: # We don't bind if we resume the stream
+ return
self.on_bound = on_bound
self._resource = resource
if self._resource:
@@ -629,8 +649,9 @@ class NonBlockingBind(PlugIn):
self._owner.onreceive(None)
self._owner.Dispatcher.SendAndWaitForResponse(
- Protocol('iq', typ='set', payload=[Node('bind', attrs={'xmlns':NS_BIND},
- payload=self._resource)]), func=self._on_bound)
+ Protocol('iq', typ='set', payload=[Node('bind',
+ attrs={'xmlns': NS_BIND}, payload=self._resource)]),
+ func=self._on_bound)
def _on_bound(self, resp):
if isResultNode(resp):
@@ -640,14 +661,22 @@ class NonBlockingBind(PlugIn):
jid = JID(resp.getTag('bind').getTagData('jid'))
self._owner.User = jid.getNode()
self._owner.Resource = jid.getResource()
+ # Only negociate stream management after bounded
+ sm = self._owner._caller.sm
+ if self.supports_sm:
+ # starts negociation
+ sm.set_owner(self._owner)
+ sm.negociate()
+ self._owner.Dispatcher.sm = sm
+
if hasattr(self, 'session') and self.session == -1:
# Server don't want us to initialize a session
log.info('No session required.')
self.on_bound('ok')
else:
self._owner.SendAndWaitForResponse(Protocol('iq', typ='set',
- payload=[Node('session', attrs={'xmlns':NS_SESSION})]),
- func=self._on_session)
+ payload=[Node('session', attrs={'xmlns':NS_SESSION})]),
+ func=self._on_session)
return
if resp:
log.info('Binding failed: %s.' % resp.getTag('error'))
diff --git a/src/common/xmpp/client_nb.py b/src/common/xmpp/client_nb.py
index dc4c4476b..92b182765 100644
--- a/src/common/xmpp/client_nb.py
+++ b/src/common/xmpp/client_nb.py
@@ -521,7 +521,16 @@ class NonBlockingClient:
self.connected = None # FIXME: is this intended? We use ''elsewhere
self._on_sasl_auth(None)
elif self.SASL.startsasl == 'success':
- auth_nb.NonBlockingBind.get_instance().PlugIn(self)
+ nb_bind = auth_nb.NonBlockingBind.get_instance()
+ sm = self._caller.sm
+ if sm._owner and sm.resumption:
+ nb_bind.resuming = True
+ sm.set_owner(self)
+ self.Dispatcher.sm = sm
+ nb_bind.PlugIn(self)
+ return
+
+ nb_bind.PlugIn(self)
self.onreceive(self._on_auth_bind)
return True
diff --git a/src/common/xmpp/dispatcher_nb.py b/src/common/xmpp/dispatcher_nb.py
index cca56f33a..9d54822f3 100644
--- a/src/common/xmpp/dispatcher_nb.py
+++ b/src/common/xmpp/dispatcher_nb.py
@@ -90,6 +90,9 @@ class XMPPDispatcher(PlugIn):
self.SendAndWaitForResponse, self.SendAndCallForResponse,
self.getAnID, self.Event, self.send]
+ # Let the dispatcher know if there is support for stream management
+ self.sm = None
+
def getAnID(self):
global outgoingID
outgoingID += 1
@@ -417,6 +420,12 @@ class XMPPDispatcher(PlugIn):
stanza.props = stanza.getProperties()
ID = stanza.getID()
+ # If server supports stream management
+ if self.sm and self.sm.enabled and (stanza.getName() != 'r' and
+ stanza.getName() != 'a' and stanza.getName() != 'enabled' and
+ stanza.getName() != 'resumed'):
+ # increments the number of stanzas that has been handled
+ self.sm.in_h = self.sm.in_h + 1
list_ = ['default'] # we will use all handlers:
if typ in self.handlers[xmlns][name]:
list_.append(typ) # from very common...
@@ -525,6 +534,14 @@ class XMPPDispatcher(PlugIn):
ID = stanza.getID()
if self._owner._registered_name and not stanza.getAttr('from'):
stanza.setAttr('from', self._owner._registered_name)
+
+ # If no ID then it is a whitespace
+ if self.sm and self.sm.enabled and ID:
+ self.sm.uqueue.append(stanza)
+ self.sm.out_h = self.sm.out_h + 1
+ if len(self.sm.uqueue) > self.sm.max_queue:
+ self.sm.request_ack()
+
self._owner.Connection.send(stanza, now)
return ID
diff --git a/src/common/xmpp/protocol.py b/src/common/xmpp/protocol.py
index 45453310e..58c99c608 100644
--- a/src/common/xmpp/protocol.py
+++ b/src/common/xmpp/protocol.py
@@ -149,6 +149,7 @@ NS_DATA_LAYOUT = 'http://jabber.org/protocol/xdata-layout' # XEP-0141
NS_DATA_VALIDATE = 'http://jabber.org/protocol/xdata-validate' # XEP-0122
NS_XMPP_STREAMS = 'urn:ietf:params:xml:ns:xmpp-streams'
NS_RECEIPTS = 'urn:xmpp:receipts'
+NS_STREAM_MGMT = 'urn:xmpp:sm:2' # XEP-198
xmpp_stream_error_conditions = '''
bad-format -- -- -- The entity has sent XML that cannot be processed.
@@ -984,6 +985,34 @@ class Iq(Protocol):
iq.setQueryNS(self.getQueryNS())
return iq
+class Acks(Node):
+ """
+ Acknowledgement elements for Stream Management
+ """
+ def __init__(self, nsp=NS_STREAM_MGMT):
+ Node.__init__(self, None, {}, [], None, None,False, None)
+ self.setNamespace(nsp)
+
+ def buildAnswer(self, handled):
+ """
+ handled is the number of stanzas handled
+ """
+ self.setName('a')
+ self.setAttr('h', handled)
+
+ def buildRequest(self):
+ self.setName('r')
+
+ def buildEnable(self, resume=False):
+ self.setName('enable')
+ if resume:
+ self.setAttr('resume', 'true')
+
+ def buildResume(self, handled, previd):
+ self.setName('resume')
+ self.setAttr('h', handled)
+ self.setAttr('previd', previd)
+
class ErrorNode(Node):
"""
XMPP-style error element
diff --git a/src/common/xmpp/smacks.py b/src/common/xmpp/smacks.py
new file mode 100644
index 000000000..c26fb1c05
--- /dev/null
+++ b/src/common/xmpp/smacks.py
@@ -0,0 +1,129 @@
+from protocol import Acks
+from protocol import NS_STREAM_MGMT
+import logging
+log = logging.getLogger('gajim.c.x.smacks')
+
+class Smacks():
+ '''
+ This is Smacks is the Stream Management class. It takes care of requesting
+ and sending acks. Also, it keeps track of the unhandled outgoing stanzas.
+
+ The dispatcher has to be able to access this class to increment the
+ number of handled stanzas
+ '''
+
+ def __init__(self, con):
+ self.con = con # Connection object
+ self.out_h = 0 # Outgoing stanzas handled
+ self.in_h = 0 # Incoming stanzas handled
+ self.uqueue = [] # Unhandled stanzas queue
+ self.session_id = None
+ self.resumption = False # If server supports resume
+ # Max number of stanzas in queue before making a request
+ self.max_queue = 5
+ self._owner = None
+ self.resuming = False
+ self.enabled = False # If SM is enabled
+ self.location = None
+
+ def set_owner(self, owner):
+ self._owner = owner
+
+ # Register handlers
+ owner.Dispatcher.RegisterNamespace(NS_STREAM_MGMT)
+ owner.Dispatcher.RegisterHandler('enabled', self._neg_response,
+ xmlns=NS_STREAM_MGMT)
+ owner.Dispatcher.RegisterHandler('r', self.send_ack,
+ xmlns=NS_STREAM_MGMT)
+ owner.Dispatcher.RegisterHandler('a', self.check_ack,
+ xmlns=NS_STREAM_MGMT)
+ owner.Dispatcher.RegisterHandler('resumed', self.check_ack,
+ xmlns=NS_STREAM_MGMT)
+ owner.Dispatcher.RegisterHandler('failed', self.error_handling,
+ xmlns=NS_STREAM_MGMT)
+
+ def _neg_response(self, disp, stanza):
+ r = stanza.getAttr('resume')
+ if r == 'true' or r == 'True' or r == '1':
+ self.resumption = True
+ self.session_id = stanza.getAttr('id')
+
+ if r == 'false' or r == 'False' or r == '0':
+ self.negociate(False)
+
+ l = stanza.getAttr('location')
+ if l:
+ self.location = l
+
+ def negociate(self, resume=True):
+ # Every time we attempt to negociate, we must erase all previous info
+ # about any previous session
+ self.uqueue = []
+ self.in_h = 0
+ self.out_h = 0
+ self.session_id = None
+ self.enabled = True
+
+ stanza = Acks()
+ stanza.buildEnable(resume)
+ self._owner.Connection.send(stanza, now=True)
+
+ def resume_request(self):
+ if not self.session_id:
+ self.resuming = False
+ log.error('Attempted to resume without a valid session id ')
+ return
+ resume = Acks()
+ resume.buildResume(self.in_h, self.session_id)
+ self._owner.Connection.send(resume, False)
+
+ def send_ack(self, disp, stanza):
+ ack = Acks()
+ ack.buildAnswer(self.in_h)
+ self._owner.Connection.send(ack, False)
+
+ def request_ack(self):
+ r = Acks()
+ r.buildRequest()
+ self._owner.Connection.send(r, False)
+
+ def check_ack(self, disp, stanza):
+ '''
+ Checks if the number of stanzas sent are the same as the
+ number of stanzas received by the server. Pops stanzas that were
+ handled by the server from the queue.
+ '''
+ h = int(stanza.getAttr('h'))
+ diff = self.out_h - h
+
+ if len(self.uqueue) < diff or diff < 0:
+ log.error('Server and client number of stanzas handled mismatch ')
+ else:
+ while (len(self.uqueue) > diff):
+ self.uqueue.pop(0)
+
+ if stanza.getName() == 'resumed':
+ self.resuming = True
+ self.con.set_oldst()
+ if self.uqueue != []:
+ for i in self.uqueue:
+ self._owner.Connection.send(i, False)
+
+ def error_handling(self, disp, stanza):
+ # If the server doesn't recognize previd, forget about resuming
+ # Ask for service discovery, etc..
+ if stanza.getTag('item-not-found'):
+ self.resuming = False
+ self.negociate()
+ self.con._discover_server_at_connection(self.con.connection)
+ return
+
+ # Doesn't support resumption
+ if stanza.getTag('feature-not-implemented'):
+ self.negociate(False)
+ return
+
+ if stanza.getTag('unexpected-request'):
+ self.enabled = False
+ log.error('Gajim failed to negociate Stream Management')
+ return