Welcome to mirror list, hosted at ThFree Co, Russian Federation.

plugin.py « omemo - dev.gajim.org/gajim/gajim-plugins.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: dd5d5cc14429c9435fd2ef0e99d84a3d456603f6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
# Copyright (C) 2019 Philipp Hörist <philipp AT hoerist.com>
# Copyright (C) 2015 Bahtiar `kalkin-` Gadimov <bahtiar@gadimov.de>
# Copyright (C) 2015 Daniel Gultsch <daniel@cgultsch.de>
#
# This file is part of OMEMO Gajim Plugin.
#
# OMEMO Gajim Plugin is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation; version 3 only.
#
# OMEMO Gajim Plugin is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OMEMO Gajim Plugin. If not, see <http://www.gnu.org/licenses/>.

import logging
import binascii
import threading
from enum import IntEnum, unique
from pathlib import Path
from functools import partial

from gi.repository import GLib
from gi.repository import Gtk
from gi.repository import Gdk

from nbxmpp.namespaces import Namespace

from gajim.common import app, ged
from gajim.common.modules.contacts import GroupchatContact

from gajim.gtk.dialogs import ErrorDialog

from gajim.plugins import GajimPlugin
from gajim.plugins.plugins_i18n import _

AXOLOTL_MISSING = 'You are missing Python3-Axolotl or use an outdated version'
PROTOBUF_MISSING = "OMEMO can't import Google Protobuf, you can find help in " \
                   "the GitLab Wiki"
ERROR_MSG = ''


log = logging.getLogger('gajim.p.omemo')
if log.getEffectiveLevel() == logging.DEBUG:
    log_axolotl = logging.getLogger('axolotl')
    log_axolotl.setLevel(logging.DEBUG)
    log_axolotl.addHandler(logging.StreamHandler())
    log_axolotl.propagate = False

try:
    import google.protobuf
except Exception as error:
    log.error(error)
    ERROR_MSG = PROTOBUF_MISSING

try:
    import axolotl
except Exception as error:
    log.error(error)
    ERROR_MSG = AXOLOTL_MISSING

if not ERROR_MSG:
    try:
        from omemo.modules import omemo
        from omemo.gtk.key import KeyDialog
        from omemo.gtk.config import OMEMOConfigDialog
        from omemo.backend.aes import aes_encrypt_file
    except Exception as error:
        log.error(error)
        ERROR_MSG = 'Error: %s' % error


@unique
class UserMessages(IntEnum):
    QUERY_DEVICES = 0
    NO_FINGERPRINTS = 1
    UNDECIDED_FINGERPRINTS = 2


class OmemoPlugin(GajimPlugin):
    def init(self):
        # pylint: disable=attribute-defined-outside-init
        if ERROR_MSG:
            self.activatable = False
            self.available_text = ERROR_MSG
            self.config_dialog = None
            return
        self.encryption_name = 'OMEMO'
        self.allow_groupchat = True
        self.events_handlers = {
            'omemo-new-fingerprint': (ged.PRECORE, self._on_new_fingerprints),
            'signed-in': (ged.PRECORE, self._on_signed_in),
            'muc-disco-update': (ged.GUI1, self._on_muc_disco_update),
            'muc-added': (ged.GUI1, self._on_muc_added),
        }
        self.modules = [omemo]

        self.config_dialog = partial(OMEMOConfigDialog, self)
        self.gui_extension_points = {
            'encrypt' + self.encryption_name: (self._encrypt_message, None),
            'gc_encrypt' + self.encryption_name: (
                self._muc_encrypt_message, None),
            'send_message' + self.encryption_name: (
                self._before_sendmessage, None),
            'encryption_dialog' + self.encryption_name: (
                self._on_encryption_button_clicked, None),
            'encryption_state' + self.encryption_name: (
                self._encryption_state, None),
            'update_caps': (self._update_caps, None),
        }

        self.disabled_accounts = []
        self._windows = {}

        self.config_default_values = {
            'DISABLED_ACCOUNTS': ([], ''),
            'BLIND_TRUST': (True, ''),
            'SHOW_HELP_FINGERPRINTS': (True, ''),
        }

        for account in self.config['DISABLED_ACCOUNTS']:
            self.disabled_accounts.append(account)

        self._load_css()

    def _is_enabled_account(self, account):
        if account in self.disabled_accounts:
            return False
        if account == 'Local':
            return False
        return True

    @staticmethod
    def get_omemo(account):
        return app.get_client(account).get_module('OMEMO')

    @staticmethod
    def _load_css():
        path = Path(__file__).parent / 'gtk' / 'style.css'
        try:
            with path.open("r") as file:
                css = file.read()
        except Exception as exc:
            log.error('Error loading css: %s', exc)
            return

        try:
            provider = Gtk.CssProvider()
            provider.load_from_data(bytes(css.encode('utf-8')))
            Gtk.StyleContext.add_provider_for_screen(Gdk.Screen.get_default(),
                                                     provider, 610)
        except Exception:
            log.exception('Error loading application css')

    def activate(self):
        """
        Method called when the Plugin is activated in the PluginManager
        """
        for account in app.settings.get_active_accounts():
            if not self._is_enabled_account(account):
                continue
            self.get_omemo(account).activate()

    def deactivate(self):
        """
        Method called when the Plugin is deactivated in the PluginManager
        """
        for account in app.settings.get_active_accounts():
            if not self._is_enabled_account(account):
                continue
            self.get_omemo(account).deactivate()

    def _on_signed_in(self, event):
        if not self._is_enabled_account(event.account):
            return
        self.get_omemo(event.account).on_signed_in()

    def _on_muc_disco_update(self, event):
        if not self._is_enabled_account(event.account):
            return
        self.get_omemo(event.account).on_muc_disco_update(event)

    def _on_room_joined(self, contact, _signal_name: str):
        if not self._is_enabled_account(contact.account):
            return
        self.get_omemo(contact.account).on_room_joined(contact)

    def _update_caps(self, account, features):
        if not self._is_enabled_account(account):
            return
        features.append('%s+notify' % Namespace.OMEMO_TEMP_DL)

    @staticmethod
    def activate_encryption(chat_control):
        return True

    def _muc_encrypt_message(self, conn, obj, callback):
        account = conn.name
        if not self._is_enabled_account(account):
            return
        self.get_omemo(account).encrypt_message(conn, obj, callback, True)

    def _encrypt_message(self, conn, obj, callback):
        account = conn.name
        if not self._is_enabled_account(account):
            return
        self.get_omemo(account).encrypt_message(conn, obj, callback, False)

    def encrypt_file(self, file, _account, callback):
        thread = threading.Thread(target=self._encrypt_file_thread,
                                  args=(file, callback))
        thread.daemon = True
        thread.start()

    @staticmethod
    def _encrypt_file_thread(file, callback, *args, **kwargs):
        result = aes_encrypt_file(file.get_data())
        file.size = len(result.payload)
        fragment = binascii.hexlify(result.iv + result.key).decode()
        file.set_uri_transform_func(
            lambda uri: 'aesgcm%s#%s' % (uri[5:], fragment))
        file.set_encrypted_data(result.payload)
        GLib.idle_add(callback, file)

    @staticmethod
    def _encryption_state(_chat_control, state):
        state['visible'] = True
        state['authenticated'] = True

    def _on_encryption_button_clicked(self, chat_control):
        self._show_fingerprint_window(chat_control)

    def _before_sendmessage(self, chat_control):
        account = chat_control.account
        if not self._is_enabled_account(account):
            return
        contact = chat_control.contact
        omemo = self.get_omemo(account)
        self.new_fingerprints_available(chat_control)
        if chat_control.is_groupchat:
            room = chat_control.room_jid
            if not omemo.is_omemo_groupchat(room):
                ErrorDialog(
                    _('Bad Configuration'),
                    _('To use OMEMO in a Groupchat, the Groupchat should be'
                      ' non-anonymous and members-only.'))
                chat_control.sendmessage = False
                return

            missing = True
            for jid in omemo.backend.get_muc_members(room):
                if not omemo.are_keys_missing(jid):
                    missing = False
            if missing:
                log.info('%s => No Trusted Fingerprints for %s',
                         account, room)
                self.print_message(chat_control, UserMessages.NO_FINGERPRINTS)
                chat_control.sendmessage = False
        else:
            # check if we have devices for the contact
            if not omemo.backend.get_devices(contact.jid, without_self=True):
                omemo.request_devicelist(contact.jid)
                self.print_message(chat_control, UserMessages.QUERY_DEVICES)
                chat_control.sendmessage = False
                return
            # check if bundles are missing for some devices
            if omemo.backend.storage.hasUndecidedFingerprints(contact.jid):
                log.info('%s => Undecided Fingerprints for %s',
                         account, contact.jid)
                self.print_message(chat_control, UserMessages.UNDECIDED_FINGERPRINTS)
                chat_control.sendmessage = False
            else:
                log.debug('%s => Sending Message to %s',
                          account, contact.jid)

    def _on_new_fingerprints(self, event):
        self.new_fingerprints_available(event.chat_control)

    def new_fingerprints_available(self, chat_control):
        jid = chat_control.contact.jid
        account = chat_control.account
        omemo = self.get_omemo(account)
        if chat_control.is_groupchat:
            for jid_ in omemo.backend.get_muc_members(chat_control.room_jid,
                                                      without_self=False):
                fingerprints = omemo.backend.storage.getNewFingerprints(jid_)
                if fingerprints:
                    self._show_fingerprint_window(
                        chat_control, fingerprints)
                    break
        else:
            fingerprints = omemo.backend.storage.getNewFingerprints(jid)
            if fingerprints:
                self._show_fingerprint_window(
                    chat_control, fingerprints)

    def _show_fingerprint_window(self, chat_control, fingerprints=None):
        contact = chat_control.contact
        account = chat_control.account
        omemo = self.get_omemo(account)

        if 'dialog' not in self._windows:
            self._windows['dialog'] = \
                KeyDialog(self, contact, app.window,
                          self._windows, groupchat=chat_control.is_groupchat)
            if fingerprints:
                log.debug('%s => Showing Fingerprint Prompt for %s',
                          account, contact.jid)
                omemo.backend.storage.setShownFingerprints(fingerprints)
        else:
            self._windows['dialog'].present()
            self._windows['dialog'].update()
            if fingerprints:
                omemo.backend.storage.setShownFingerprints(fingerprints)

    @staticmethod
    def print_message(chat_control, kind):
        msg = None
        if kind == UserMessages.QUERY_DEVICES:
            msg = _('No devices found. Query in progress...')
        elif kind == UserMessages.NO_FINGERPRINTS:
            msg = _('To send an encrypted message, you have to '
                    'first trust the fingerprint of your contact!')
        elif kind == UserMessages.UNDECIDED_FINGERPRINTS:
            msg = _('You have undecided fingerprints')
        if msg is None:
            return
        chat_control.add_info_message(msg)

    def _on_muc_added(self, event):
        client = app.get_client(event.account)
        contact = client.get_module('Contacts').get_contact(event.jid)
        if not isinstance(contact, GroupchatContact):
            log.warning('%s is not a groupchat contact', contact)
            return

        # Event is triggert on every join, avoid multiple connects
        contact.disconnect_all_from_obj(self)
        contact.connect('room-joined', self._on_room_joined)