1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
import sys
import threading
import time
import traceback
class Updater:
"""
Baseclass for implementing your own updater type. Takes care of logging,
threading and the general workflow
"""
def __init__(self, config, logger):
self.logger = logger
self.config = config
def run(self):
single_run = self.config.mode == 'singlerun'
if single_run:
self.logger.info('Running update once with %d threads' %
self.config.threads)
else:
self.logger.info(('Running update in an interval of %d seconds '
'using %d threads') % (self.config.interval,
self.config.threads))
while True:
start_time = time.time() # reset clock
try:
self.before_update()
feeds = self.all_feeds()
threads = []
for num in range(0, self.config.threads):
thread = self.start_update_thread(feeds)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
self.after_update()
if single_run:
return
# wait until the interval finished to run again and subtract
# the update run time from the interval
update_duration_seconds = int((time.time() - start_time))
timeout = self.config.interval - update_duration_seconds
if timeout > 0:
self.logger.info(('Finished updating in %d seconds, '
'next update in %d seconds') %
(update_duration_seconds, timeout))
time.sleep(timeout)
except Exception as e:
self.logger.error('%s: Trying again in 30 seconds' % e)
traceback.print_exc(file=sys.stderr)
if single_run:
return
else:
time.sleep(30)
def before_update(self):
raise NotImplementedError
def start_update_thread(self, feeds):
raise NotImplementedError
def all_feeds(self):
raise NotImplementedError
def after_update(self):
raise NotImplementedError
class UpdateThread(threading.Thread):
"""
Baseclass for the updating thread which executes the feed updates in
parallel
"""
lock = threading.Lock()
def __init__(self, feeds, logger):
super().__init__()
self.feeds = feeds
self.logger = logger
def run(self):
while True:
with UpdateThread.lock:
if len(self.feeds) > 0:
feed = self.feeds.pop()
else:
return
try:
self.logger.info('Updating feed with id %s and user %s' %
(feed.feed_id, feed.user_id))
self.update_feed(feed)
except Exception as e:
self.logger.error(e)
traceback.print_exc(file=sys.stderr)
def update_feed(self, feed):
"""
Updates a single feed
feed: the feed object containing the feed_id and user_id
"""
raise NotImplementedError
|