1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
|
from subprocess import check_output
from typing import List, Any
from nextcloud_news_updater.api.api import Api, Feed
from nextcloud_news_updater.api.updater import Updater, UpdateThread
from nextcloud_news_updater.common.logger import Logger
from nextcloud_news_updater.config import Config
class Cli:
def run(self, commands: List[str]) -> bytes:
return check_output(commands)
class CliApi(Api):
"""Cli API for Nextcloud News up to v14 (API version 1.2)"""
def __init__(self, config: Config) -> None:
directory = config.url
phpini = config.phpini
if not directory.endswith('/'):
directory += '/'
self.directory = directory
self.base_command = [config.php, '-f', self.directory + 'occ']
if phpini is not None and phpini.strip() != '':
self.base_command += ['-c', phpini]
self.before_cleanup_command = self.base_command + [
'news:updater:before-update']
self.all_feeds_command = self.base_command + [
'news:updater:all-feeds']
self.update_feed_command = self.base_command + [
'news:updater:update-feed']
self.after_cleanup_command = self.base_command + [
'news:updater:after-update']
class CliApiV2(CliApi):
"""Cli API for Nextcloud News up to v14 (API version 2)"""
def _parse_feeds_json(self, feeds: dict, userID: str) -> List[Feed]:
feeds = feeds['updater']
return [Feed(info['feedId'], info['userId']) for info in feeds]
class CliApiV15(CliApi):
"""Cli API for Nextcloud News v15+"""
def __init__(self, config: Config) -> None:
super().__init__(config)
self.all_feeds_command = self.base_command + ['news:feed:list']
self.users_list_command = self.base_command + ['user:list', '--output',
'json']
def _parse_feeds_json(self, feeds_json: Any, userID: str) -> List[Feed]:
if not feeds_json:
return []
return [Feed(info['id'], userID) for info in feeds_json]
def create_cli_api(config: Config) -> CliApi:
if config.apilevel == 'v1-2':
return CliApi(config)
if config.apilevel == 'v2':
return CliApiV2(config)
if config.apilevel == 'v15':
return CliApiV15(config)
class CliUpdateThread(UpdateThread):
def __init__(self, feeds: List[Feed], logger: Logger, api: CliApi,
cli: Cli) -> None:
super().__init__(feeds, logger)
self.cli = cli
self.api = api
def update_feed(self, feed: Feed) -> None:
command = self.api.update_feed_command + [str(feed.feed_id),
feed.user_id]
self.logger.info('Running update command: %s' % ' '.join(command))
self.cli.run(command)
class CliUpdateThreadV15(CliUpdateThread):
"""Cli Updater for Nextcloud News v15+"""
def update_feed(self, feed: Feed) -> None:
command = self.api.update_feed_command + [feed.user_id,
str(feed.feed_id)]
self.logger.info('Running update command: %s' % ' '.join(command))
self.cli.run(command)
class CliUpdater(Updater):
def __init__(self, config: Config, logger: Logger, api: CliApi,
cli: Cli) -> None:
super().__init__(config, logger)
self.cli = cli
self.api = api
def before_update(self) -> None:
self.logger.info('Running before update command: %s' %
' '.join(self.api.before_cleanup_command))
self.cli.run(self.api.before_cleanup_command)
def start_update_thread(self, feeds: List[Feed]) -> CliUpdateThread:
return CliUpdateThread(feeds, self.logger, self.api, self.cli)
def all_feeds(self) -> List[Feed]:
feeds_json_bytes = self.cli.run(self.api.all_feeds_command).strip()
feeds_json = str(feeds_json_bytes, 'utf-8')
self.logger.info('Running get all feeds command: %s' %
' '.join(self.api.all_feeds_command))
self.logger.info('Received these feeds to update: %s' % feeds_json)
return self.api.parse_feeds(feeds_json)
def after_update(self) -> None:
self.logger.info('Running after update command: %s' %
' '.join(self.api.after_cleanup_command))
self.cli.run(self.api.after_cleanup_command)
class CliUpdaterV15(CliUpdater):
"""Cli Updater for Nextcloud News v15+"""
def start_update_thread(self, feeds: List[Feed]) -> CliUpdateThread:
return CliUpdateThreadV15(feeds, self.logger, self.api, self.cli)
def all_feeds(self) -> List[Feed]:
self.logger.info('Running get user list command: %s' %
' '.join(self.api.users_list_command))
users_json = self.cli.run(self.api.users_list_command).strip()
users_json = str(users_json, 'utf-8')
users = self.api.parse_users(users_json)
feeds_list = []
for userID in users:
self.logger.info('Running get feeds for user "%s" command: %s' %
(userID, ' '.join(self.api.all_feeds_command)))
cmd = self.api.all_feeds_command + [userID]
feeds_json_bytes = self.cli.run(cmd).strip()
feeds_json = str(feeds_json_bytes, 'utf-8')
self.logger.info('Received these feeds to update for user %s: %s' %
(userID, feeds_json))
feeds_list += self.api.parse_feeds(feeds_json, userID)
return feeds_list
|