# This file is part of Gajim.
#
# Gajim is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation; version 3 only.
#
# Gajim is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Gajim. If not, see .
from typing import Any
import time
import nbxmpp
from gi.repository import Gdk
from gi.repository import GLib
from gi.repository import GObject
from gi.repository import Gtk
from gi.repository import GtkSource
from gajim.common import app
from gajim.common import ged
from gajim.common.const import Direction
from gajim.common.events import AccountDisabled
from gajim.common.events import AccountEnabled
from gajim.common.events import StanzaReceived
from gajim.common.events import StanzaSent
from gajim.common.i18n import _
from gajim.common.logging_helpers import get_log_console_handler
from gajim.gtk.builder import get_builder
from gajim.gtk.const import Setting
from gajim.gtk.const import SettingKind
from gajim.gtk.const import SettingType
from gajim.gtk.dialogs import ErrorDialog
from gajim.gtk.settings import SettingsDialog
from gajim.gtk.util import at_the_end
from gajim.gtk.util import EventHelper
from gajim.gtk.util import MaxWidthComboBoxText
from gajim.gtk.util import scroll_to_end
class XMLConsoleWindow(Gtk.ApplicationWindow, EventHelper):
def __init__(self) -> None:
Gtk.ApplicationWindow.__init__(self)
EventHelper.__init__(self)
self.set_application(app.app)
self.set_position(Gtk.WindowPosition.CENTER)
self.set_default_size(800, 600)
self.set_resizable(True)
self.set_show_menubar(False)
self.set_name('XMLConsoleWindow')
self._selected_account = 'AllAccounts'
self._selected_send_account: str | None = None
self._filter_dialog: SettingsDialog | None = None
self._sent_stanzas = SentSzanzas()
self._last_selected_ts = 0
self._last_search: str = ''
self._presence = True
self._message = True
self._iq = True
self._stream = True
self._incoming = True
self._outgoing = True
self._ui = get_builder('xml_console.ui')
self.set_titlebar(self._ui.headerbar)
self._set_titlebar()
self.add(self._ui.stack)
self._ui.paned.set_position(
self._ui.paned.get_property('max-position'))
self._combo = MaxWidthComboBoxText()
self._combo.set_max_size(200)
self._combo.set_hexpand(False)
self._combo.set_halign(Gtk.Align.END)
self._combo.set_no_show_all(True)
self._combo.set_visible(False)
self._combo.connect('changed', self._on_value_change)
available_accounts = self._get_accounts()
for account, label in available_accounts:
self._combo.append(account, label)
if available_accounts:
self._combo.set_active(0)
self._ui.actionbox.pack_end(self._combo, False, False, 0)
self._ui.actionbox.reorder_child(self._combo, 1)
self._create_tags()
source_manager = GtkSource.LanguageManager.get_default()
lang = source_manager.get_language('xml')
self._ui.protocol_view.get_buffer().set_language(lang)
self._ui.input_entry.get_buffer().set_language(lang)
style_scheme_manager = GtkSource.StyleSchemeManager.get_default()
style_scheme = style_scheme_manager.get_scheme('solarized-dark')
if style_scheme is not None:
self._ui.protocol_view.get_buffer().set_style_scheme(style_scheme)
self._ui.input_entry.get_buffer().set_style_scheme(style_scheme)
self._ui.log_view.get_buffer().set_style_scheme(style_scheme)
for record in app.logging_records:
self._add_log_record(record)
log_handler = get_log_console_handler()
log_handler.set_callback(self._add_log_record)
self._ui.stack.connect('notify::visible-child-name',
self._on_stack_child_changed)
self.show_all()
self.connect('key-press-event', self._on_key_press)
self.connect('destroy', self._on_destroy)
self._ui.connect_signals(self)
self.register_events([
('stanza-received', ged.GUI1, self._on_stanza_received),
('stanza-sent', ged.GUI1, self._on_stanza_sent),
('account-enabled', ged.GUI1, self._on_account_changed),
('account-disabled', ged.GUI1, self._on_account_changed)
])
def _on_destroy(self, *args: Any) -> None:
get_log_console_handler().set_callback(None)
self._ui.popover.destroy()
app.check_finalize(self)
def _on_value_change(self, combo: Gtk.ComboBox) -> None:
self._selected_send_account = combo.get_active_id()
def _set_titlebar(self) -> None:
if self._selected_account == 'AllAccounts':
title = _('All Accounts')
elif self._selected_account == 'AccountWizard':
title = _('Account Wizard')
else:
title = app.get_jid_from_account(self._selected_account)
self._ui.headerbar.set_subtitle(title)
def _on_account_changed(self,
event: AccountEnabled | AccountDisabled
) -> None:
buf = self._ui.protocol_view.get_buffer()
if isinstance(event, AccountEnabled):
buf.create_tag(event.account)
else:
start, end = buf.get_bounds()
buf.remove_tag_by_name(event.account, start, end)
def _on_stack_child_changed(self,
_widget: Gtk.Stack,
_pspec: GObject.ParamSpec
) -> None:
name = self._ui.stack.get_visible_child_name()
self._ui.search_toggle.set_sensitive(name == 'protocol')
def _create_tags(self) -> None:
tags = [
'incoming',
'outgoing',
'presence',
'message',
'stream',
'iq'
]
accounts = app.settings.get_active_accounts()
tags.extend(accounts)
tags.append('AccountWizard')
for tag_name in tags:
self._ui.protocol_view.get_buffer().create_tag(tag_name)
def _add_log_record(self, message: str) -> None:
buf = self._ui.log_view.get_buffer()
end_iter = buf.get_end_iter()
buf.insert(end_iter, message)
def _on_key_press(self, _widget: Gtk.Widget, event: Gdk.EventKey) -> None:
if event.keyval == Gdk.KEY_Escape:
if self._ui.search_revealer.get_child_revealed():
self._ui.search_revealer.set_reveal_child(False)
return
self.destroy()
if (event.state & Gdk.ModifierType.CONTROL_MASK and
event.keyval == Gdk.KEY_Return or
event.keyval == Gdk.KEY_KP_Enter):
self._on_send()
if (event.state & Gdk.ModifierType.CONTROL_MASK and
event.keyval == Gdk.KEY_Up):
self._on_paste_previous()
if (event.state & Gdk.ModifierType.CONTROL_MASK and
event.keyval == Gdk.KEY_Down):
self._on_paste_next()
if (event.state & Gdk.ModifierType.CONTROL_MASK and
event.keyval == Gdk.KEY_f):
self._ui.search_toggle.set_active(
not self._ui.search_revealer.get_child_revealed())
if event.keyval == Gdk.KEY_F3:
self._find(Direction.NEXT)
def _on_row_activated(self,
_listbox: Gtk.ListBox,
row: Gtk.ListBoxRow
) -> None:
child = row.get_child()
assert isinstance(child, Gtk.Label)
text = child.get_text()
input_text = None
if text == 'Presence':
input_text = (
'\n'
'\n'
'\n'
'\n'
'')
elif text == 'Message':
input_text = (
'\n'
'\n'
'')
elif text == 'Iq':
input_text = (
'\n'
'\n'
'')
elif text == 'Disco Info':
input_text = (
'\n'
''
'\n')
if input_text is not None:
buffer_ = self._ui.input_entry.get_buffer()
buffer_.set_text(input_text)
self._ui.input_entry.grab_focus()
def _on_send(self, *args: Any) -> None:
if not self._selected_send_account:
return
if not app.account_is_available(self._selected_send_account):
# If offline or connecting
ErrorDialog(
_('Connection not available'),
_('Please make sure you are connected with "%s".') %
self._selected_send_account)
return
buffer_ = self._ui.input_entry.get_buffer()
begin_iter, end_iter = buffer_.get_bounds()
stanza = buffer_.get_text(begin_iter, end_iter, True)
if stanza:
try:
node = nbxmpp.Node(node=stanza)
except Exception as error:
ErrorDialog(_('Invalid Node'), str(error))
return
if node.getName() in ('message', 'presence', 'iq'):
# Parse stanza again if its a message, presence or iq and
# set jabber:client as toplevel namespace
# Use type Protocol so nbxmpp counts the stanza for
# stream management
node = nbxmpp.Protocol(node=stanza,
attrs={'xmlns': 'jabber:client'})
client = app.get_client(self._selected_send_account)
assert isinstance(node, nbxmpp.Protocol)
client.connection.send_stanza(node)
self._sent_stanzas.add(stanza)
buffer_.set_text('')
def _on_paste_previous(self, *args: Any) -> None:
buffer_ = self._ui.input_entry.get_buffer()
buffer_.set_text(self._sent_stanzas.get_previous())
self._ui.input_entry.grab_focus()
def _on_paste_next(self, *args: Any) -> None:
buffer_ = self._ui.input_entry.get_buffer()
buffer_.set_text(self._sent_stanzas.get_next())
self._ui.input_entry.grab_focus()
def _on_input(self, button: Gtk.ToggleButton) -> None:
child2 = self._ui.paned.get_child2()
assert child2 is not None
if button.get_active():
child2.show()
self._ui.send.show()
self._ui.paste.show()
self._ui.account_label.show()
self._combo.show()
self._ui.menubutton.show()
self._ui.input_entry.grab_focus()
else:
child2.hide()
self._ui.send.hide()
self._ui.paste.hide()
self._ui.account_label.hide()
self._combo.hide()
self._ui.menubutton.hide()
def _on_search_toggled(self, button: Gtk.ToggleButton) -> None:
self._ui.search_revealer.set_reveal_child(button.get_active())
self._ui.search_entry.grab_focus()
def _on_search_activate(self, _entry: Gtk.SearchEntry) -> None:
self._find(Direction.NEXT)
def _on_search_clicked(self, button: Gtk.ToolButton) -> None:
if button is self._ui.search_forward:
direction = Direction.NEXT
else:
direction = Direction.PREV
self._find(direction)
def _find(self, direction: Direction) -> None:
search_str = self._ui.search_entry.get_text()
textbuffer = self._ui.protocol_view.get_buffer()
cursor_mark = textbuffer.get_insert()
current_pos = textbuffer.get_iter_at_mark(cursor_mark)
if current_pos.get_offset() == textbuffer.get_char_count():
current_pos = textbuffer.get_start_iter()
last_pos_mark = textbuffer.get_mark('last_pos')
if last_pos_mark is not None:
current_pos = textbuffer.get_iter_at_mark(last_pos_mark)
if search_str != self._last_search:
current_pos = textbuffer.get_start_iter()
if direction == Direction.NEXT:
match = current_pos.forward_search(
search_str,
Gtk.TextSearchFlags.VISIBLE_ONLY |
Gtk.TextSearchFlags.CASE_INSENSITIVE,
None)
else:
current_pos.backward_cursor_position()
match = current_pos.backward_search(
search_str,
Gtk.TextSearchFlags.VISIBLE_ONLY |
Gtk.TextSearchFlags.CASE_INSENSITIVE,
None)
if match is not None:
match_start, match_end = match
textbuffer.select_range(match_start, match_end)
mark = textbuffer.create_mark('last_pos', match_end, True)
self._ui.protocol_view.scroll_to_mark(mark, 0, True, 0.5, 0.5)
self._last_search = search_str
@staticmethod
def _get_accounts() -> list[tuple[str | None, str]]:
accounts = app.get_accounts_sorted()
combo_accounts: list[tuple[str | None, str]] = []
for account in accounts:
label = app.get_account_label(account)
combo_accounts.append((account, label))
combo_accounts.append(('AccountWizard', 'Account Wizard'))
return combo_accounts
def _on_filter_options(self, _button: Gtk.Button) -> None:
if self._filter_dialog is not None:
self._filter_dialog.present()
return
combo_accounts = self._get_accounts()
combo_accounts.insert(0, ('AllAccounts', _('All Accounts')))
settings = [
Setting(SettingKind.COMBO,
_('Account'),
SettingType.VALUE,
self._selected_account,
callback=self._set_account,
props={'combo_items': combo_accounts}),
Setting(SettingKind.SWITCH,
'Presence',
SettingType.VALUE,
self._presence,
callback=self._on_setting,
data='presence'),
Setting(SettingKind.SWITCH,
'Message',
SettingType.VALUE,
self._message,
callback=self._on_setting,
data='message'),
Setting(SettingKind.SWITCH,
'IQ', SettingType.VALUE,
self._iq,
callback=self._on_setting,
data='iq'),
Setting(SettingKind.SWITCH,
'Stream Management',
SettingType.VALUE,
self._stream,
callback=self._on_setting,
data='stream'),
Setting(SettingKind.SWITCH,
'In',
SettingType.VALUE,
self._incoming,
callback=self._on_setting,
data='incoming'),
Setting(SettingKind.SWITCH,
'Out',
SettingType.VALUE,
self._outgoing,
callback=self._on_setting,
data='outgoing'),
]
self._filter_dialog = SettingsDialog(
self,
_('Filter'),
Gtk.DialogFlags.DESTROY_WITH_PARENT,
settings,
self._selected_account or 'AllAccounts')
self._filter_dialog.connect('destroy', self._on_filter_destroyed)
def _on_filter_destroyed(self, _widget: Gtk.Widget) -> None:
self._filter_dialog = None
def _on_clear(self, _button: Gtk.Button) -> None:
self._ui.protocol_view.get_buffer().set_text('')
def _set_account(self, value: str, _data: Any) -> None:
self._selected_account = value
self._set_titlebar()
active_accounts = app.settings.get_active_accounts()
active_accounts.append('AccountWizard')
table = self._ui.protocol_view.get_buffer().get_tag_table()
if value == 'AllAccounts':
for account in active_accounts:
tag = table.lookup(account)
if tag is not None:
tag.set_property('invisible', False)
return
for account in active_accounts:
tag = table.lookup(account)
if tag is not None:
tag.set_property('invisible', account != value)
def _on_setting(self, value: bool, data: str) -> None:
setattr(self, f'_{data}', value)
value = not value
table = self._ui.protocol_view.get_buffer().get_tag_table()
tag = table.lookup(data)
if tag is None:
return
if data in ('incoming', 'outgoing'):
if value:
tag.set_priority(table.get_size() - 1)
else:
tag.set_priority(0)
tag.set_property('invisible', value)
def _on_stanza_received(self, event: StanzaReceived):
self._print_stanza(event, 'incoming')
def _on_stanza_sent(self, event: StanzaSent):
self._print_stanza(event, 'outgoing')
def _print_stanza(self,
event: StanzaReceived | StanzaSent,
kind: str
) -> None:
if event.account == 'AccountWizard':
account_label = 'Account Wizard'
else:
account_label = app.get_account_label(event.account)
stanza = event.stanza
if not isinstance(stanza, str):
# pylint: disable=unnecessary-dunder-call
stanza = stanza.__str__(fancy=True)
if not stanza:
return
is_at_the_end = at_the_end(self._ui.scrolled)
buffer_ = self._ui.protocol_view.get_buffer()
end_iter = buffer_.get_end_iter()
type_ = kind
if stanza.startswith(' None:
self._sent_stanzas: dict[float, str] = {}
self._last_selected_ts = 0
def add(self, stanza: str) -> None:
self._sent_stanzas[time.time()] = stanza
self._last_selected_ts = 0
def get_previous(self) -> str:
return self._get(Direction.PREV)
def get_next(self) -> str:
return self._get(Direction.NEXT)
def _get(self, direction: Direction) -> str:
if not self._sent_stanzas:
return ''
if direction == Direction.PREV:
for timestamp, stanza in reversed(self._sent_stanzas.items()):
if timestamp >= self._last_selected_ts:
continue
self._last_selected_ts = timestamp
return stanza
else:
for timestamp, stanza in self._sent_stanzas.items():
if timestamp <= self._last_selected_ts:
continue
self._last_selected_ts = timestamp
return stanza
self._last_selected_ts = list(self._sent_stanzas.keys())[-1]
return self._sent_stanzas[self._last_selected_ts]