# Copyright (C) 2011-2017 Yann Leboulanger # Copyright (C) 2022 Philipp Hörist # # This file is part of Gajim. # # Gajim is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published # by the Free Software Foundation; version 3 only. # # Gajim is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Gajim. If not, see . from __future__ import annotations from typing import Any from typing import Callable from typing import cast from typing import Union import logging from functools import partial from nbxmpp.protocol import JID from gajim.common import app from gajim.common import ged from gajim.common.const import PROPAGATE_EVENT from gajim.common.const import STOP_EVENT from gajim.common.events import Notification from gajim.common.events import GcMessageReceived from gajim.common.events import MessageReceived from gajim.common.events import PresenceReceived from gajim.common.helpers import exec_command from gajim.common.helpers import play_sound_file from gajim.plugins import GajimPlugin from gajim.plugins.plugins_i18n import _ from triggers.gtk.config import ConfigDialog from triggers.util import log_result from triggers.util import RuleResult log = logging.getLogger('gajim.p.triggers') MessageEventsT = Union[GcMessageReceived, MessageReceived] ProcessableEventsT = Union[MessageEventsT, Notification, PresenceReceived] RuleT = dict[str, Any] class Triggers(GajimPlugin): def init(self) -> None: self.description = _( 'Configure Gajim’s behaviour with triggers for each contact') self.config_dialog = partial(ConfigDialog, self) self.config_default_values = {} self.events_handlers = { 'notification': (ged.PREGUI, self._on_notification), 'message-received': (ged.PREGUI2, self._on_message_received), 'gc-message-received': (ged.PREGUI2, self._on_message_received), # 'presence-received': (ged.PREGUI, self._on_presence_received), } def _on_notification(self, event: Notification) -> bool: log.info('Process %s, %s', event.name, event.type) result = self._check_all(event, self._check_rule_apply_notification, self._apply_rule) log.info('Result: %s', result) return self._excecute_notification_rules(result, event) def _on_message_received(self, event: MessageEventsT) -> bool: log.info('Process %s', event.name) result = self._check_all(event, self._check_rule_apply_msg_received, self._apply_rule) log.info('Result: %s', result) return self._excecute_message_rules(result) def _on_presence_received(self, event: PresenceReceived) -> None: # TODO return if event.old_show < 2 and event.new_show > 1: check_func = self._check_rule_apply_connected elif event.old_show > 1 and event.new_show < 2: check_func = self._check_rule_apply_disconnected else: check_func = self._check_rule_apply_status_changed self._check_all(event, check_func, self._apply_rule) def _check_all(self, event: ProcessableEventsT, check_func: Callable[..., bool], apply_func: Callable[..., Any] ) -> RuleResult: result = RuleResult() rules_num = [int(item) for item in self.config.keys()] rules_num.sort() to_remove: list[int] = [] for num in rules_num: rule = cast(RuleT, self.config[str(num)]) if check_func(event, rule): apply_func(result, rule) if 'one_shot' in rule and rule['one_shot']: to_remove.append(num) decal = 0 num = 0 while str(num) in self.config: if num + decal in to_remove: num2 = num while str(num2 + 1) in self.config: copy = self.config[str(num2 + 1)].copy() # type: ignore self.config[str(num2)] = copy num2 += 1 del self.config[str(num2)] decal += 1 else: num += 1 return result @log_result def _check_rule_apply_msg_received(self, event: MessageEventsT, rule: RuleT ) -> bool: return self._check_rule_all('message_received', event, rule) @log_result def _check_rule_apply_connected(self, event: PresenceReceived, rule: RuleT ) -> bool: return self._check_rule_all('contact_connected', event, rule) @log_result def _check_rule_apply_disconnected(self, event: PresenceReceived, rule: RuleT ) -> bool: return self._check_rule_all('contact_disconnected', event, rule) @log_result def _check_rule_apply_status_changed(self, event: PresenceReceived, rule: RuleT ) -> bool: return self._check_rule_all('contact_status_change', event, rule) @log_result def _check_rule_apply_notification(self, event: Notification, rule: RuleT ) -> bool: # Check notification type notif_type = '' if event.type == 'incoming-message': notif_type = 'message_received' # if event.type == 'pres': # # TODO: # if (event.base_event.old_show < 2 and # event.base_event.new_show > 1): # notif_type = 'contact_connected' # elif (event.base_event.old_show > 1 and # event.base_event.new_show < 2): # notif_type = 'contact_disconnected' # else: # notif_type = 'contact_status_change' return self._check_rule_all(notif_type, event, rule) def _check_rule_all(self, notif_type: str, event: ProcessableEventsT, rule: RuleT ) -> bool: # Check notification type if rule['event'] != notif_type: return False # notification type is ok. Now check recipient if not self._check_rule_recipients(event, rule): return False # recipient is ok. Now check our status if not self._check_rule_status(event, rule): return False # our_status is ok. Now check opened chat window if not self._check_rule_tab_opened(event, rule): return False # tab_opened is ok. Now check opened chat window if not self._check_rule_has_focus(event, rule): return False # All is ok return True @log_result def _check_rule_recipients(self, event: ProcessableEventsT, rule: RuleT ) -> bool: rule_recipients = [t.strip() for t in rule['recipients'].split(',')] if rule['recipient_type'] == 'groupchat': if event.jid in rule_recipients: return True return False if (rule['recipient_type'] == 'contact' and event.jid not in rule_recipients): return False client = app.get_client(event.account) contact = client.get_module('Contacts').get_contact(event.jid) if contact.is_groupchat or not contact.is_in_roster: return False group_found = False for group in contact.groups: if group in rule_recipients: group_found = True break if rule['recipient_type'] == 'group' and not group_found: return False return True @log_result def _check_rule_status(self, event: ProcessableEventsT, rule: RuleT ) -> bool: rule_statuses = rule['status'].split() client = app.get_client(event.account) if rule['status'] != 'all' and client.status not in rule_statuses: return False return True @log_result def _check_rule_tab_opened(self, event: ProcessableEventsT, rule: RuleT ) -> bool: if rule['tab_opened'] == 'both': return True tab_opened = False assert isinstance(event.jid, JID) if app.window.chat_exists(event.account, event.jid): tab_opened = True if tab_opened and rule['tab_opened'] == 'no': return False elif not tab_opened and rule['tab_opened'] == 'yes': return False return True @log_result def _check_rule_has_focus(self, event: ProcessableEventsT, rule: RuleT ) -> bool: if rule['has_focus'] == 'both': return True if rule['tab_opened'] == 'no': # Does not apply in this case return True assert isinstance(event.jid, JID) chat_active = app.window.is_chat_active(event.account, event.jid) if chat_active and rule['has_focus'] == 'no': return False elif not chat_active and rule['has_focus'] == 'yes': return False return True def _apply_rule(self, result: RuleResult, rule: RuleT) -> None: if rule['sound'] == 'no': result.sound = False result.sound_file = None elif rule['sound'] == 'yes': result.sound = False result.sound_file = rule['sound_file'] if rule['run_command']: result.command = rule['command'] if rule['popup'] == 'no': result.show_notification = False elif rule['popup'] == 'yes': result.show_notification = True def _excecute_notification_rules(self, result: RuleResult, event: Notification ) -> bool: if result.sound is False: event.sound = None if result.sound_file is not None: play_sound_file(result.sound_file) if result.show_notification is False: return STOP_EVENT return PROPAGATE_EVENT def _excecute_message_rules(self, result: RuleResult) -> bool: if result.command is not None: try: exec_command(result.command, use_shell=True) except Exception: pass return PROPAGATE_EVENT