Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mrDoctorWho/xmpppy.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoralkorgun <alkorgun@gmail.com>2014-02-16 00:35:04 +0400
committeralkorgun <alkorgun@gmail.com>2014-02-16 00:35:04 +0400
commita538e8b141dbac5762a50b82d66c4b7eede6cc4d (patch)
treedcc9eed5e6a3c605012474f481494ddbe6c56270
parent7c407a470a6ab7eb53bdf3727ff591c4e49c7ab4 (diff)
send logic reimagined
-rw-r--r--xmpp/dispatcher.py25
-rw-r--r--xmpp/transports.py61
2 files changed, 58 insertions, 28 deletions
diff --git a/xmpp/dispatcher.py b/xmpp/dispatcher.py
index fb6710c..fea280a 100644
--- a/xmpp/dispatcher.py
+++ b/xmpp/dispatcher.py
@@ -35,6 +35,16 @@ ID = 0
DBG_LINE = "dispatcher"
+if sys.hexversion >= 0x30000F0:
+
+ def deferredRaise(e):
+ raise e[0](e[1]).with_traceback(e[2])
+
+else:
+
+ def deferredRaise(e):
+ raise e[0], e[1], e[2]
+
class Dispatcher(PlugIn):
"""
Ancestor of PlugIn class. Handles XMPP stream, i.e. aware of stream headers.
@@ -150,16 +160,10 @@ class Dispatcher(PlugIn):
for handler in self._cycleHandlers:
handler(self)
if self._pendingExceptions:
- e = self._pendingExceptions.pop()
- raise e[0](e[1]).with_traceback(e[2])
- conn = self._owner.Connection
- recv, send = select([conn._sock], [conn._sock] if conn._send_queue else [], [], timeout)[:2]
- if send:
- while conn._send_queue:
- conn.send_now(conn._send_queue.pop(0))
- if recv:
+ deferredRaise(self._pendingExceptions.pop())
+ if self._owner.Connection.pending_data(timeout):
try:
- data = conn.receive()
+ data = self._owner.Connection.receive()
except IOError:
return None
try:
@@ -167,8 +171,7 @@ class Dispatcher(PlugIn):
except ExpatError:
pass
if self._pendingExceptions:
- e = self._pendingExceptions.pop()
- raise e[0](e[1]).with_traceback(e[2])
+ deferredRaise(self._pendingExceptions.pop())
if data:
return len(data)
return "0"
diff --git a/xmpp/transports.py b/xmpp/transports.py
index 47aa50d..3900d79 100644
--- a/xmpp/transports.py
+++ b/xmpp/transports.py
@@ -31,6 +31,8 @@ import sys
import socket
if sys.hexversion >= 0x20600F0:
import ssl
+import thread
+import time
from . import dispatcher
from base64 import encodestring
@@ -49,7 +51,33 @@ DATA_RECEIVED = 'DATA RECEIVED'
DATA_SENT = 'DATA SENT'
DBG_CONNECT_PROXY = 'CONNECTproxy'
-BUFLEN = 1024
+BUFLEN = 2024
+SEND_INTERVAL = 0
+
+class SendSemaphore(object):
+
+ def __init__(self):
+ self.__lock = thread.allocate_lock()
+ self.__released = 0
+ self.interval = SEND_INTERVAL
+
+ def set_inteval(self, interval):
+ self.interval = interval
+
+ def acquire(self, blocking=1):
+ rc = self.__lock.acquire(blocking)
+ if blocking and self.interval and time.time() - self.__released < self.interval:
+ time.sleep(self.interval)
+ return rc
+
+ __enter__ = acquire
+
+ def release(self):
+ self.__released = time.time()
+ self.__lock.release()
+
+ def __exit__(self, *args):
+ self.release()
class error:
"""
@@ -80,9 +108,10 @@ class TCPsocket(PlugIn):
"""
PlugIn.__init__(self)
self.DBG_LINE = "socket"
- self._exported_methods = [self.send, self.disconnect]
+ self._sequence = SendSemaphore()
+ self.set_send_interval = self._sequence.set_inteval
+ self._exported_methods = [self.send, self.disconnect, self.set_send_interval]
self._server, self.use_srv = server, use_srv
- self._send_queue = []
def srv_lookup(self, server):
"""
@@ -221,9 +250,6 @@ class TCPsocket(PlugIn):
return data
def send(self, data):
- self._send_queue.append(data)
-
- def send_now(self, data, timeout=0.002):
"""
Writes raw outgoing data. Blocks until done.
If supplied data is unicode string, encodes it to utf-8 before send.
@@ -232,17 +258,18 @@ class TCPsocket(PlugIn):
data = data.encode("utf-8")
elif not isinstance(data, str):
data = ustr(data).encode("utf-8")
- try:
- self._send(data)
- except Exception:
- self.DEBUG("Socket error while sending data.", "error")
- self._owner.disconnected()
- else:
- if not data.strip():
- data = repr(data)
- self.DEBUG(data, "sent")
- if hasattr(self._owner, "Dispatcher"):
- self._owner.Dispatcher.Event("", DATA_SENT, data)
+ with self._sequence:
+ try:
+ self._send(data)
+ except Exception:
+ self.DEBUG("Socket error while sending data.", "error")
+ self._owner.disconnected()
+ else:
+ if not data.strip():
+ data = repr(data)
+ self.DEBUG(data, "sent")
+ if hasattr(self._owner, "Dispatcher"):
+ self._owner.Dispatcher.Event("", DATA_SENT, data)
def pending_data(self, timeout=0):
"""