From cc42529b8865c33899ea41af39df3d43d9eb2b62 Mon Sep 17 00:00:00 2001 From: alkorgun Date: Wed, 15 Jan 2014 19:49:07 +0600 Subject: implemented send queue for synchronicity --- xmpp/dispatcher.py | 10 ++++++++-- xmpp/transports.py | 49 ++++++++++++++++++++++++++----------------------- 2 files changed, 34 insertions(+), 25 deletions(-) (limited to 'xmpp') diff --git a/xmpp/dispatcher.py b/xmpp/dispatcher.py index 62151d3..5d020d0 100644 --- a/xmpp/dispatcher.py +++ b/xmpp/dispatcher.py @@ -27,6 +27,7 @@ import time from plugin import PlugIn from protocol import * +from select import select from xml.parsers.expat import ExpatError DefaultTimeout = 25 @@ -151,9 +152,14 @@ class Dispatcher(PlugIn): if self._pendingExceptions: e = self._pendingExceptions.pop() raise e[0], e[1], e[2] - if self._owner.Connection.pending_data(timeout): + conn = self._owner.Connection + recv, send = select([conn._sock], [conn._sock], [], timeout)[:2] + if send: + while conn._send_queue: + conn.send_now(conn._send_queue.pop(0)) + if recv: try: - data = self._owner.Connection.receive() + data = conn.receive() except IOError: return None try: diff --git a/xmpp/transports.py b/xmpp/transports.py index 8082718..2d93686 100644 --- a/xmpp/transports.py +++ b/xmpp/transports.py @@ -82,6 +82,7 @@ class TCPsocket(PlugIn): self.DBG_LINE = "socket" self._exported_methods = [self.send, self.disconnect] self._server, self.use_srv = server, use_srv + self._send_queue = [] def srv_lookup(self, server): """ @@ -142,9 +143,7 @@ class TCPsocket(PlugIn): host, port = server socktype = socket.SOCK_STREAM try: - if self.use_srv: - raise Exception() - lookup = socket.getaddrinfo(host, int(port), 0, socktype)[0] + lookup = reversed(socket.getaddrinfo(host, int(port), 0, socktype)) except Exception: addr = (host, int(port)) if ":" in host: @@ -152,31 +151,34 @@ class TCPsocket(PlugIn): addr = addr.__add__((0, 0)) else: af = socket.AF_INET - else: - af, socktype, proto, cn, addr = lookup - try: - self._sock = socket.socket(af, socktype) - self._sock.connect(addr) - self._send = self._sock.sendall - self._recv = self._sock.recv - except socket.error as error: + lookup = [(af, socktype, 1, 6, addr)] + for af, socktype, proto, cn, addr in lookup: try: - code, error = error + self._sock = socket.socket(af, socktype) + self._sock.connect(addr) + self._send = self._sock.sendall + self._recv = self._sock.recv + except socket.error as error: + if getattr(self, "_sock", None): + self._sock.close() + try: + code, error = error + except Exception: + code = -1 + self.DEBUG("Failed to connect to remote host %s: %s (%s)" % (repr(server), error, code), "error") except Exception: - code = -1 - self.DEBUG("Failed to connect to remote host %s: %s (%s)" % (repr(server), error, code), "error") - except Exception: - pass - else: - self.DEBUG("Successfully connected to remote host %s." % repr(server), "start") - return "ok" + pass + else: + self.DEBUG("Successfully connected to remote host %s." % repr(server), "start") + return "ok" def plugout(self): """ Disconnect from the remote server and unregister self.disconnected method from the owner's dispatcher. """ - self._sock.close() + if getattr(self, "_sock", None): + self._sock.close() if hasattr(self._owner, "Connection"): del self._owner.Connection self._owner.UnregisterDisconnectHandler(self.disconnected) @@ -218,7 +220,10 @@ class TCPsocket(PlugIn): raise IOError("Disconnected!") return data - def send(self, data, timeout=0.002): + def send(self, data): + self._send_queue.append(data) + + def send_now(self, data, timeout=0.002): """ Writes raw outgoing data. Blocks until done. If supplied data is unicode string, encodes it to utf-8 before send. @@ -227,8 +232,6 @@ class TCPsocket(PlugIn): data = data.encode("utf-8") elif not isinstance(data, str): data = ustr(data).encode("utf-8") - while not select([], [self._sock], [], timeout)[1]: - pass try: self._send(data) except Exception: -- cgit v1.2.3