Welcome to mirror list, hosted at ThFree Co, Russian Federation.

utils.py « library - github.com/mrDoctorWho/vk4xmpp.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: a79be40fb3c7278de8ac449cf5382339356ccdac (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# coding: utf-8
# This file is a part of VK4XMPP transport
# © simpleApps, 2014.

"""
Contains useful functions which used across the modules
"""

import threading
import xmpp
import urllib
from socket import error
from writer import *

isNumber = lambda obj: (not execute(int, (obj,), False) is None)


def execute(handler, list=(), log=True):
	"""
	Just executes handler(*list) safely
	Writes a crashlog if errors occurred
	"""
	try:
		result = handler(*list)
	except (SystemExit, xmpp.NodeProcessed):
		result = True
	except Exception:
		result = None
		if log:
			crashLog(handler.func_name)
			logger.error("Exception happened during executing function: %s%s" % (handler.func_name, str(list)))
	return result


def runThread(func, args=(), name=None, delay=0):
	"""
	Runs a thread with custom args and name
	Needed to reduce code
	Parameters:
		func: function you need to be running in a thread
		args: function arguments
		name: thread name
		att: number of attempts
		delay: if set, then threading.Timer will be started, not threading.Thread

	"""
	if delay:
		logger.debug("threading: starting timer for %s%s, "
			"name:%s, delay:%s" % (func.func_name, str(args), name, delay))
		thr = threading.Timer(delay, execute, (func, args))
	else:
		thr = threading.Thread(target=execute, args=(func, args))
	thr.name = str(name or func.__name__) + "-" + str(time.time())
	thr.start()
	return thr


def safe(func):
	"""
	Executes func(*args) safely
	"""
	def wrapper(*args):
		try:
			func(*args)
		except xmpp.NodeProcessed:
			pass
		except Exception:
			crashLog(func.func_name)
	wrapper.__name__ = func.__name__
	return wrapper


def cache(func):
	"""
	Caches user/group ids for future usage
	"""
	def wrapper(self, uid, fields=None):
		fields = fields or []
		fieldsStr = ",".join(fields)
		if uid in self.cache:
			if self.cache[uid]["fields"] == fieldsStr:
				return self.cache[uid]

		result = func(self, uid, fields)
		if result:
			result["fields"] = fieldsStr
			if "user_id" in result:
				del result["user_id"]
			if uid in self.cache:
				self.cache[uid].update(result)
			else:
				self.cache[uid] = result
		return result
	wrapper.__name__ = func.__name__
	return wrapper


def threaded(func):
	"""
	Another decorator.
	Executes a function in a thread
	"""
	def wrapper(*args):
		runThread(func, args)
	wrapper.__name__ = "threaded_%s" % func.__name__
	return wrapper


def buildDataForm(form=None, type="form", fields=[], title=None, data=[]):
	"""
	Provides easier method to build data forms using dict for each form object
	Parameters:
		form: xmpp.DataForm object
		type: form type
		fields: list of form objects represented as dict, e.g.
			[{"var": "cool", "type": "text-single",
			"desc": "my cool description", "value": "cool"}]
		title: form title
		data: advanced data for form. e.g.
			instructions (if string in the list), look at xmpp/protocol.py:1326
	"""
	if title and form:
		form.setTitle(title)
	form = form or xmpp.DataForm(type, data, title)
	for key in fields:
		field = form.setField(key["var"], key.get("value"),
					key.get("type"), key.get("desc"), key.get("options"))
		if key.get("payload"):
			field.setPayload(key["payload"])
		if key.get("label"):
			field.setLabel(key["label"])
		if key.get("requred"):
			field.setRequired()
	return form


def buildIQError(stanza, error=xmpp.ERR_FEATURE_NOT_IMPLEMENTED, text=None):
	"""
	Provides a way to build IQ error reply
	"""
	error = xmpp.Error(stanza, error, True)
	if text:
		tag = error.getTag("error")
		if tag:
			tag.setTagData("text", text)
	return error


def normalizeValue(value):
	"""
	Normalizes boolean values from dataform replies
	"""
	if isNumber(value):
		value = int(value)
	elif value and value.lower() == "true":
		value = 1
	else:
		value = 0
	return value


def getLinkData(url, encode=True):
	"""
	Gets link data and ignores any exceptions
	Parameters:
		encode: base64 data encode
	"""
	try:
		opener = urllib.urlopen(url)
		data = opener.read()
	except (Exception, error):
		return ""
	if data and encode:
		data = data.encode("base64")
	return data


TIME_VALUES = {"s": 60, "m": 360, "d": 86400, "M": 2592000, "y": 31536000}


def TimeMachine(text):
	"""
	TARDIS Prototype
	"""
	time = 0
	for i in xrange(0, len(text) - 1, 3):
		current = text[i:i + 3]
		x = current[-1]
		if x in TIME_VALUES:
			time += int(current[:-1]) * TIME_VALUES[x]
	return time

# Yay!