Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mrDoctorWho/vk4xmpp.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Smith <mrdoctorwho@gmail.com>2016-12-25 19:39:37 +0300
committerJohn Smith <mrdoctorwho@gmail.com>2016-12-25 19:39:37 +0300
commit77e8b33ce1360bfe78cd1fa2d67c8010c0256ce2 (patch)
treeed76c8e9fbfb68c1c87322644d3c8bce88020213 /library
parent1642e16f72404756c1dfc93cee42b25b03660b21 (diff)
Add socket expiry checker to longpoll (fixes #74)
Diffstat (limited to 'library')
-rw-r--r--library/longpoll.py181
-rw-r--r--library/utils.py6
-rw-r--r--library/vkapi.py1
3 files changed, 160 insertions, 28 deletions
diff --git a/library/longpoll.py b/library/longpoll.py
index 1106852..2be0bca 100644
--- a/library/longpoll.py
+++ b/library/longpoll.py
@@ -2,26 +2,143 @@
# © simpleApps, 2014 — 2016.
__authors__ = ("Al Korgun <alkorgun@gmail.com>", "John Smith <mrdoctorwho@gmail.com>")
-__version__ = "2.2.2"
+__version__ = "2.3"
__license__ = "MIT"
"""
Implements a single-threaded longpoll client
"""
+import select
+import socket
+import ssl
+import json
+import httplib
import threading
import time
import vkapi as api
-import select
-import socket
+
import utils
-from __main__ import Users, logger, ALIVE, DEBUG_POLL, crashLog
+# from __main__ import Users, logger, ALIVE, DEBUG_POLL, crashLog, \
+# USER_CAPS_HASH, sendMessage, sendPresence, vk2xmpp
+
+from __main__ import *
+SOCKET_CHECK_TIMEOUT = 10
LONGPOLL_RETRY_COUNT = 10
LONGPOLL_RETRY_TIMEOUT = 10
+SELECT_WAIT = 25
+OPENER_LIFETIME = 60
+
+
+CODE_SKIP = -1
+CODE_FINE = 0
+CODE_ERROR = 1
+
+TYPE_MSG = 4
+TYPE_PRS_IN = 8
+TYPE_PRS_OUT = 9
+TYPE_TYPING = 61
+
+FLAG_OUT = 2
+FLAG_CHAT = 16
+
+MIN_CHAT_UID = 2000000000
+
+
+def debug(message, *args):
+ if DEBUG_POLL:
+ logger.debug(message, *args)
+
+
+def read(opener, source):
+ """
+ Read a socket ignoring errors
+ Args:
+ opener: a socket to read
+ source: the user's jid
+ Returns:
+ JSON data or an empty string
+ """
+ try:
+ data = opener.read()
+ except (httplib.BadStatusLine, socket.error, socket.timeout) as e:
+ data = ""
+ logger.warning("longpoll: got error `%s` (jid: %s)", e.__class__.__name__, source)
+ return data
+
+
+def processPollResult(user, data):
+ """
+ Processes a poll result
+ Decides whether to send a chat/groupchat message or presence or just pass the iteration
+ Args:
+ user: the User object
+ data: a valid json with poll result
+ Returns:
+ CODE_SKIP: just skip iteration, not adding the user to poll again
+ CODE_FINE: add user for the next iteration
+ CODE_ERROR: user should be added to the init buffer
+ """
+ debug("longpoll: processing result (jid: %s)", user.source)
+
+ retcode = CODE_FINE
+ try:
+ data = json.loads(data)
+ except ValueError:
+ logger.error("longpoll: no data. Gonna request again (jid: %s)",
+ user.source)
+ retcode = CODE_ERROR
+
+ if "failed" in data:
+ logger.debug("longpoll: failed. Searching for a new server (jid: %s)",
+ user.source)
+ retcode = CODE_ERROR
+ else:
+ user.vk.pollConfig["ts"] = data["ts"]
+ for evt in data.get("updates", ()):
+ typ = evt.pop(0)
+
+ debug("longpoll: got updates, processing event %s with arguments %s (jid: %s)", typ,
+ str(evt), user.source)
+ if typ == TYPE_MSG: # new message
+ if len(evt) == 7:
+ message = None
+ mid, flags, uid, date, subject, body, attachments = evt
+ out = flags & FLAG_OUT
+ chat = (flags & FLAG_CHAT) or (
+ uid > MIN_CHAT_UID) # a groupchat always has uid > 2000000000
+ if not out:
+ if not attachments and not chat:
+ message = [1,
+ {"out": 0, "uid": uid, "mid": mid, "date": date,
+ "body": body}]
+ utils.runThread(user.sendMessages, (None, message),
+ "sendMessages-%s" % user.source)
+ else:
+ logger.warning(
+ "longpoll: incorrect events number while trying to "
+ "process arguments %s (jid: %s)",
+ str(evt), user.source)
-# TODO: make it an abstract, to reuse in Steampunk
+ elif typ == TYPE_PRS_IN: # user has joined
+ uid = abs(evt[0])
+ sendPresence(user.source, vk2xmpp(uid), hash=USER_CAPS_HASH)
+
+ elif typ == TYPE_PRS_OUT: # user has left
+ uid = abs(evt[0])
+ sendPresence(user.source, vk2xmpp(uid), "unavailable")
+
+ elif typ == TYPE_TYPING: # user is typing
+ if evt[0] not in user.typing:
+ sendMessage(user.source, vk2xmpp(evt[0]), typ="composing")
+ user.typing[evt[0]] = time.time()
+ retcode = CODE_FINE
+ return retcode
+
+
+# TODO: make it abstract, to reuse in Steampunk
class Poll(object):
"""
Class used to handle longpoll
@@ -34,34 +151,33 @@ class Poll(object):
@classmethod
def __add(cls, user):
"""
- Issues readable socket to use it in select()
- Adds user in buffer on error occurred
- Adds user in self.__list if no errors
+ Issues a readable socket to use it in select()
+ Adds user in buffer if a error occurred
+ Adds user in cls.__list if no errors
"""
if user.source in Users:
# in case the new instance was created
user = Users[user.source]
opener = user.vk.makePoll()
- if DEBUG_POLL:
- logger.debug("longpoll: user has been added to poll (jid: %s)", user.source)
+ debug("longpoll: user has been added to poll (jid: %s)", user.source)
if opener:
cls.__list[opener.sock] = (user, opener)
return opener
logger.warning("longpoll: got null opener! (jid: %s)", user.source)
cls.__addToBuffer(user)
- return None
@classmethod
def add(cls, some_user):
"""
Adds the User class object to poll
"""
- if DEBUG_POLL:
- logger.debug("longpoll: adding user to poll (jid: %s)", some_user.source)
+ debug("longpoll: adding user to poll (jid: %s)", some_user.source)
with cls.__lock:
+ if not cls.__list:
+ cls.checkIfSocketsAlive()
if some_user in cls.__buff:
return None
- # check if someone tries to add an already existing user
+ # check if someone is trying to add an already existing user
for sock, (user, opener) in cls.__list.iteritems():
if some_user == user:
break
@@ -115,25 +231,26 @@ class Poll(object):
Args:
user: the user object
"""
- for i in xrange(LONGPOLL_RETRY_COUNT):
+ for _ in xrange(LONGPOLL_RETRY_COUNT):
if user.source in Users:
user = Users[user.source] # we might have a new instance here
if user.vk.initPoll():
with cls.__lock:
- logger.debug("longpoll: successfully initialized longpoll (jid: %s)", user.source)
+ logger.debug("longpoll: successfully initialized longpoll (jid: %s)",
+ user.source)
cls.__add(user)
cls.__removeFromBuffer(user)
break
else:
logger.debug("longpoll: while we were wasting our time"
- ", the user has left (jid: %s)", user.source)
+ ", the user has left (jid: %s)", user.source)
cls.removeFromBuffer(user)
return None
time.sleep(LONGPOLL_RETRY_TIMEOUT)
else:
cls.removeFromBuffer(user)
logger.error("longpoll: failed to add user to poll in 10 retries"
- " (jid: %s)", user.source)
+ " (jid: %s)", user.source)
@classmethod
def process(cls):
@@ -149,7 +266,7 @@ class Poll(object):
continue
# TODO: epoll()?
try:
- ready, error = select.select(socks, [], socks, 2)[::2]
+ ready, error = select.select(socks, [], socks, SELECT_WAIT)[::2]
except (select.error, socket.error, socket.timeout) as e:
logger.error("longpoll: %s", e.message)
continue
@@ -173,35 +290,45 @@ class Poll(object):
# Update the user instance
user = Users.get(user.source)
if user:
- utils.runThread(cls.processResult, (user, opener),
- "poll.processResult-%s" % user.source)
+ cls.processResult(user, opener)
with cls.__lock:
- for sock, (user, opener) in cls.__list.items():
+ for sock, (user, opener) in cls.__list.iteritems():
if hasattr(user, "vk") and not user.vk.online:
logger.debug("longpoll: user is not online, so removing them from poll"
- " (jid: %s)", user.source)
+ " (jid: %s)", user.source)
try:
del cls.__list[sock]
except KeyError:
pass
@classmethod
+ @utils.threaded
def processResult(cls, user, opener):
"""
Processes the select result (see above)
Handles answers from user.processPollResult()
Decides if need to add user to poll or not
"""
- result = utils.execute(user.processPollResult, (opener,))
- if DEBUG_POLL:
- logger.debug("longpoll: result=%s (jid: %s)", result, user.source)
+ data = read(opener, user.source)
+ result = utils.execute(processPollResult, (user, data,))
+ debug("longpoll: result=%s (jid: %s)", result, user.source)
if result == -1:
return None
# if we'll set user.vk.pollInitialized to False
# then makePoll() will raise an exception
- # by doing that, we force user's poll reinitialization
+ # by doing that, we force the user's poll reinitialization
if not result:
user.vk.pollInitialized = False
cls.add(user)
+ @classmethod
+ @utils.threaded
+ def checkIfSocketsAlive(cls):
+ while cls.__list:
+ for sock, (user, opener) in cls.__list.iteritems():
+ if (time.time() - opener.created) > OPENER_LIFETIME:
+ with cls.__lock:
+ del cls.__list[sock]
+ cls.processPollResult(user, opener)
+ time.sleep(SOCKET_CHECK_TIMEOUT) \ No newline at end of file
diff --git a/library/utils.py b/library/utils.py
index 42d0711..e22847e 100644
--- a/library/utils.py
+++ b/library/utils.py
@@ -202,6 +202,10 @@ def TimeMachine(text):
class ExpiringObject(object):
+ """
+ Object that acts the same as the one it keeps
+ But also has a limited lifetime
+ """
def __init__(self, obj, lifetime):
self.obj = obj
self.created = time.time()
@@ -232,7 +236,7 @@ class ExpiringObject(object):
result = ""
for num, i in enumerate(self.obj):
result += str(i)
- if num < (len(self.obj) -1):
+ if num < (len(self.obj) - 1):
result += ", "
return result
diff --git a/library/vkapi.py b/library/vkapi.py
index 3546684..36f364d 100644
--- a/library/vkapi.py
+++ b/library/vkapi.py
@@ -105,6 +105,7 @@ class AsyncHTTPRequest(httplib.HTTPSConnection):
self.url = url
self.data = data
self.headers = headers or {}
+ self.created = time.time()
@attemptTo(REQUEST_RETRIES, None, *ERRORS)
def open(self):