Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/dax/jmc.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Rousselie <dax@happycoders.org>2007-10-30 20:27:31 +0300
committerDavid Rousselie <dax@happycoders.org>2007-10-30 20:27:31 +0300
commit4065cba5b37f252689d064cd6f044dda30afbc42 (patch)
tree3814e5833d67f7ee0a2c08b352f48e4206f0edc4
parent16b78d407b0991ee16d32a760405c6375719a5c9 (diff)
Force check emails ad-hoc command implementation and add forgotten command.py file
darcs-hash:20071030172731-86b55-97be8bfd051a2d489b9e24d47c8ddb67ad953b5c.gz
-rw-r--r--src/jmc/jabber/command.py102
-rw-r--r--src/jmc/jabber/component.py5
-rw-r--r--src/jmc/jabber/tests/command.py317
3 files changed, 424 insertions, 0 deletions
diff --git a/src/jmc/jabber/command.py b/src/jmc/jabber/command.py
new file mode 100644
index 0000000..8b497b4
--- /dev/null
+++ b/src/jmc/jabber/command.py
@@ -0,0 +1,102 @@
+##
+## command.py
+## Login : David Rousselie <dax@happycoders.org>
+## Started on Tue Oct 23 18:45:19 2007 David Rousselie
+## $Id$
+##
+## Copyright (C) 2007 David Rousselie
+## This program is free software; you can redistribute it and/or modify
+## it under the terms of the GNU General Public License as published by
+## the Free Software Foundation; either version 2 of the License, or
+## (at your option) any later version.
+##
+## This program is distributed in the hope that it will be useful,
+## but WITHOUT ANY WARRANTY; without even the implied warranty of
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+## GNU General Public License for more details.
+##
+## You should have received a copy of the GNU General Public License
+## along with this program; if not, write to the Free Software
+## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+##
+
+import logging
+
+from pyxmpp.jabber.dataforms import Form
+import jcl.model.account as account
+
+from jmc.model.account import MailAccount
+from jcl.jabber.command import JCLCommandManager
+import jcl.jabber.command as command
+
+class MailCommandManager(JCLCommandManager):
+ """
+ Implement ad-hoc commands specific to JMC:
+ - retrieve email attachments
+ - force email checking
+ """
+
+ def __init__(self, component=None, account_manager=None):
+ """
+ JMCCommandManager constructor.
+ Register JMC ad-hoc commands
+ """
+ JCLCommandManager.__init__(self, component, account_manager)
+ self.__logger = logging.getLogger("jmc.jabber.command.JMCCommandManager")
+ #self.commands["jmc#retrieve-attachment"] = (False, command.account_node_re)
+ self.commands["jmc#force-check"] = (False, command.account_node_re)
+
+ # Delayed for JMC 0.3.1
+ def execute_retrieve_attachment_1(self, info_query, session_context,
+ command_node, lang_class):
+ # TODO : translate
+ self.__logger.debug("Executing command 'retrieve-attachment' step 1")
+ self.add_actions(command_node, [command.ACTION_NEXT])
+ bare_from_jid = info_query.get_from().bare()
+ account_name = info_query.get_to().node
+ print str(bare_from_jid)
+ print str(account_name)
+ _account = account.get_account(bare_from_jid, account_name,
+ MailAccount)
+ if _account is not None:
+ result_form = Form(xmlnode_or_type="form",
+ title="TODO:TITLE",
+ instructions="TODO:INS")
+ field = result_form.add_field(name="attachments",
+ field_type="list-multi",
+ label="select attachments")
+ field.add_option(label="Next",
+ values=["-1"])
+ for (mail_id, mail_title) in _account.get_mail_with_attachment_list():
+ field.add_option(label=mail_title,
+ values=[mail_id])
+ result_form.as_xml(command_node)
+ return (result_form, [])
+ else:
+ # TODO Error
+ return (None, [])
+
+ # TODO: retrieve step2: Delayed to JMC 0.3.1
+
+ def execute_force_check_1(self, info_query, session_context,
+ command_node, lang_class):
+ self.__logger.debug("Executing command 'force-check' step 1")
+ self.add_actions(command_node, [command.ACTION_COMPLETE])
+ session_context["user_jids"] = [unicode(info_query.get_from().bare())]
+ return (self.add_form_select_accounts(session_context, command_node,
+ lang_class, "TODO:TITLE",
+ "TODO:DESC", format_as_xml=True,
+ show_user_jid=False), [])
+
+ def execute_force_check_2(self, info_query, session_context,
+ command_node, lang_class):
+ self.__logger.debug("Executing command 'force-check' step 2")
+ command_node.setProp("status", command.STATUS_COMPLETED)
+ accounts = []
+ for account_name in session_context["account_names"]:
+ name, user_jid = self.get_name_and_jid(account_name)
+ _account = account.get_account(user_jid, name)
+ _account.lastcheck = _account.interval - 1
+ accounts.append(_account)
+ self.component.check_email_accounts(accounts, lang_class)
+ return (None, [])
diff --git a/src/jmc/jabber/component.py b/src/jmc/jabber/component.py
index b0be13f..0f2c703 100644
--- a/src/jmc/jabber/component.py
+++ b/src/jmc/jabber/component.py
@@ -146,6 +146,11 @@ class MailComponent(FeederComponent):
AccountDiscoGetInfoHandler,
IMAPAccountDiscoGetInfoHandler(self))
+ def check_email_accounts(self, accounts, lang_class=None):
+ if lang_class is None:
+ lang_class = self.lang.get_default_lang_class()
+ self.handler.handle(None, lang_class, accounts)
+
class MailFeeder(Feeder):
"""Email check"""
diff --git a/src/jmc/jabber/tests/command.py b/src/jmc/jabber/tests/command.py
new file mode 100644
index 0000000..6a2d47e
--- /dev/null
+++ b/src/jmc/jabber/tests/command.py
@@ -0,0 +1,317 @@
+##
+## command.py
+## Login : David Rousselie <dax@happycoders.org>
+## Started on Tue Oct 23 18:53:28 2007 David Rousselie
+## $Id$
+##
+## Copyright (C) 2007 David Rousselie
+## This program is free software; you can redistribute it and/or modify
+## it under the terms of the GNU General Public License as published by
+## the Free Software Foundation; either version 2 of the License, or
+## (at your option) any later version.
+##
+## This program is distributed in the hope that it will be useful,
+## but WITHOUT ANY WARRANTY; without even the implied warranty of
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+## GNU General Public License for more details.
+##
+## You should have received a copy of the GNU General Public License
+## along with this program; if not, write to the Free Software
+## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+##
+
+import unittest
+import tempfile
+from ConfigParser import ConfigParser
+import os
+
+from pyxmpp.iq import Iq
+from pyxmpp.jabber.dataforms import Form
+
+import jcl.tests
+from jcl.jabber.tests.command import JCLCommandManager_TestCase
+from jcl.jabber.feeder import Feeder
+from jcl.model.account import Account, PresenceAccount, LegacyJID, \
+ User
+import jcl.jabber.command as command
+
+from jmc.model.account import POP3Account, IMAPAccount, SMTPAccount, \
+ MailAccount
+from jmc.jabber.component import MailComponent
+from jmc.jabber.command import MailCommandManager
+
+from jmc.jabber.tests.component import MockIMAPAccount
+
+class MailCommandManager_TestCase(JCLCommandManager_TestCase):
+ def setUp(self):
+ JCLCommandManager_TestCase.setUp(self, tables=[POP3Account, IMAPAccount,
+ SMTPAccount, MailAccount,
+ MockIMAPAccount])
+ self.config_file = tempfile.mktemp(".conf", "jmctest", jcl.tests.DB_DIR)
+ self.config = ConfigParser()
+ self.config.read(self.config_file)
+ self.comp = MailComponent("jmc.test.com",
+ "password",
+ "localhost",
+ "5347",
+ self.config,
+ self.config_file)
+ self.comp.set_admins(["admin@test.com"])
+ self.command_manager = command.command_manager
+
+ def tearDown(self):
+ JCLCommandManager_TestCase.tearDown(self)
+ if os.path.exists(self.config_file):
+ os.unlink(self.config_file)
+
+# def test_execute_retrieve_attachment(self):
+# self.comp.account_manager.account_classes = (POP3Account, IMAPAccount,
+# SMTPAccount, MockIMAPAccount)
+# account1 = MockIMAPAccount(user=User(jid="test1@test.com"),
+# name="account1",
+# jid="account1@" + unicode(self.comp.jid))
+# info_query = Iq(stanza_type="set",
+# from_jid="test1@test.com",
+# to_jid="account1@" + unicode(self.comp.jid))
+# command_node = info_query.set_new_content(command.COMMAND_NS, "command")
+# command_node.setProp("node", "jmc#retrieve-attachment")
+# result = self.command_manager.apply_command_action(info_query,
+# "jmc#retrieve-attachment",
+# "execute")
+# self.assertNotEquals(result, None)
+# self.assertEquals(len(result), 1)
+# print str(result[0].xmlnode)
+# xml_command = result[0].xpath_eval("c:command",
+# {"c": "http://jabber.org/protocol/commands"})[0]
+# self.assertEquals(xml_command.prop("status"), "executing")
+# self.assertNotEquals(xml_command.prop("sessionid"), None)
+# self._check_actions(result[0], ["next"])
+# print str(result[0].xmlnode)
+# x_data = result[0].xpath_eval("c:command/data:x",
+# {"c": "http://jabber.org/protocol/commands",
+# "data": "jabber:x:data"})
+# self.assertEquals(len(x_data), 1)
+# self.assertEquals(x_data[0].prop("type"), "form")
+# options = result[0].xpath_eval("c:command/data:x/data:field[1]/data:option",
+# {"c": "http://jabber.org/protocol/commands",
+# "data": "jabber:x:data"})
+# self.assertEquals(len(options), 3)
+# self.assertEquals(options[0].prop("label"), "Next")
+# self.assertEquals(options[0].children.name, "value")
+# self.assertEquals(options[0].children.content, "-1")
+# self.assertEquals(options[1].prop("label"), "mail 1")
+# self.assertEquals(options[1].children.name, "value")
+# self.assertEquals(options[1].children.content, "1")
+# self.assertEquals(options[2].prop("label"), "mail 2")
+# self.assertEquals(options[2].children.name, "value")
+# self.assertEquals(options[2].children.content, "2")
+
+# # Delayed to JMC 0.3.1
+# return
+# # Second step: TODO
+# info_query = Iq(stanza_type="set",
+# from_jid="admin@test.com",
+# to_jid=self.comp.jid)
+# command_node = info_query.set_new_content(command.COMMAND_NS, "command")
+# command_node.setProp("node", "http://jabber.org/protocol/admin#add-user")
+# session_id = xml_command.prop("sessionid")
+# command_node.setProp("sessionid", session_id)
+# command_node.setProp("action", "next")
+# submit_form = Form(xmlnode_or_type="submit")
+# submit_form.add_field(field_type="list-single",
+# name="account_type",
+# value="Example")
+# submit_form.add_field(field_type="jid-single",
+# name="user_jid",
+# value="user2@test.com")
+# submit_form.as_xml(command_node)
+# result = self.command_manager.apply_command_action(info_query,
+# "http://jabber.org/protocol/admin#add-user",
+# "next")
+# self.assertNotEquals(result, None)
+# self.assertEquals(len(result), 1)
+# xml_command = result[0].xpath_eval("c:command",
+# {"c": "http://jabber.org/protocol/commands"})[0]
+# self.assertEquals(xml_command.prop("status"), "executing")
+# self.assertEquals(xml_command.prop("sessionid"), session_id)
+# self._check_actions(result[0], ["prev", "complete"], 1)
+# x_data = result[0].xpath_eval("c:command/data:x",
+# {"c": "http://jabber.org/protocol/commands",
+# "data": "jabber:x:data"})
+# self.assertEquals(len(x_data), 1)
+# self.assertEquals(x_data[0].prop("type"), "form")
+# fields = result[0].xpath_eval("c:command/data:x/data:field",
+# {"c": "http://jabber.org/protocol/commands",
+# "data": "jabber:x:data"})
+# self.assertEquals(len(fields), 6)
+# context_session = self.command_manager.sessions[session_id][1]
+# self.assertEquals(context_session["account_type"], ["Example"])
+# self.assertEquals(context_session["user_jid"], ["user2@test.com"])
+
+# # Third step
+# info_query = Iq(stanza_type="set",
+# from_jid="admin@test.com",
+# to_jid=self.comp.jid)
+# command_node = info_query.set_new_content(command.COMMAND_NS, "command")
+# command_node.setProp("node", "http://jabber.org/protocol/admin#add-user")
+# command_node.setProp("sessionid", session_id)
+# command_node.setProp("action", "complete")
+# submit_form = Form(xmlnode_or_type="submit")
+# submit_form.add_field(field_type="text-single",
+# name="name",
+# value="account1")
+# submit_form.add_field(field_type="text-single",
+# name="login",
+# value="login1")
+# submit_form.add_field(field_type="text-private",
+# name="password",
+# value="pass1")
+# submit_form.add_field(field_type="boolean",
+# name="store_password",
+# value="1")
+# submit_form.add_field(field_type="list-single",
+# name="test_enum",
+# value="choice2")
+# submit_form.add_field(field_type="text-single",
+# name="test_int",
+# value="42")
+# submit_form.as_xml(command_node)
+
+# result = self.command_manager.apply_command_action(info_query,
+# "http://jabber.org/protocol/admin#add-user",
+# "execute")
+# xml_command = result[0].xpath_eval("c:command",
+# {"c": "http://jabber.org/protocol/commands"})[0]
+# self.assertEquals(xml_command.prop("status"), "completed")
+# self.assertEquals(xml_command.prop("sessionid"), session_id)
+# self._check_actions(result[0])
+
+# self.assertEquals(context_session["name"], ["account1"])
+# self.assertEquals(context_session["login"], ["login1"])
+# self.assertEquals(context_session["password"], ["pass1"])
+# self.assertEquals(context_session["store_password"], ["1"])
+# self.assertEquals(context_session["test_enum"], ["choice2"])
+# self.assertEquals(context_session["test_int"], ["42"])
+
+# model.db_connect()
+# _account = account.get_account("user2@test.com",
+# "account1")
+# self.assertNotEquals(_account, None)
+# self.assertEquals(_account.user.jid, "user2@test.com")
+# self.assertEquals(_account.name, "account1")
+# self.assertEquals(_account.jid, "account1@" + unicode(self.comp.jid))
+# model.db_disconnect()
+
+# stanza_sent = result
+# self.assertEquals(len(stanza_sent), 4)
+# iq_result = stanza_sent[0]
+# self.assertTrue(isinstance(iq_result, Iq))
+# self.assertEquals(iq_result.get_node().prop("type"), "result")
+# self.assertEquals(iq_result.get_from(), self.comp.jid)
+# self.assertEquals(iq_result.get_to(), "admin@test.com")
+# presence_component = stanza_sent[1]
+# self.assertTrue(isinstance(presence_component, Presence))
+# self.assertEquals(presence_component.get_from(), self.comp.jid)
+# self.assertEquals(presence_component.get_to(), "user2@test.com")
+# self.assertEquals(presence_component.get_node().prop("type"),
+# "subscribe")
+# message = stanza_sent[2]
+# self.assertTrue(isinstance(message, Message))
+# self.assertEquals(message.get_from(), self.comp.jid)
+# self.assertEquals(message.get_to(), "user2@test.com")
+# self.assertEquals(message.get_subject(),
+# _account.get_new_message_subject(Lang.en))
+# self.assertEquals(message.get_body(),
+# _account.get_new_message_body(Lang.en))
+# presence_account = stanza_sent[3]
+# self.assertTrue(isinstance(presence_account, Presence))
+# self.assertEquals(presence_account.get_from(), "account1@" + unicode(self.comp.jid))
+# self.assertEquals(presence_account.get_to(), "user2@test.com")
+# self.assertEquals(presence_account.get_node().prop("type"),
+# "subscribe")
+
+ def test_execute_force_check(self):
+ self.comp.account_manager.account_classes = (POP3Account, IMAPAccount,
+ SMTPAccount, MockIMAPAccount)
+ class MockFeederHandler(Feeder):
+ def __init__(self, component):
+ Feeder.__init__(self, component)
+ self.checked_accounts = []
+
+ def feed(self, _account):
+ self.checked_accounts.append(_account)
+ assert(_account.lastcheck == (_account.interval - 1))
+ return []
+
+ self.comp.handler.feeder = MockFeederHandler(self.comp)
+ user1 = User(jid="test1@test.com")
+ user2 = User(jid="test2@test.com")
+ account11 = MockIMAPAccount(user=user1,
+ name="account11",
+ jid="account11@" + unicode(self.comp.jid))
+ account12 = MockIMAPAccount(user=user1,
+ name="account12",
+ jid="account12@" + unicode(self.comp.jid))
+ account21 = MockIMAPAccount(user=user2,
+ name="account21",
+ jid="account21@" + unicode(self.comp.jid))
+ account22 = MockIMAPAccount(user=user2,
+ name="account11",
+ jid="account11@" + unicode(self.comp.jid))
+ info_query = Iq(stanza_type="set",
+ from_jid="test1@test.com",
+ to_jid=self.comp.jid)
+ command_node = info_query.set_new_content(command.COMMAND_NS, "command")
+ command_node.setProp("node", "jmc#force-check")
+ result = self.command_manager.apply_command_action(info_query,
+ "jmc#force-check",
+ "execute")
+ self.assertNotEquals(result, None)
+ self.assertEquals(len(result), 1)
+ xml_command = result[0].xpath_eval("c:command",
+ {"c": "http://jabber.org/protocol/commands"})[0]
+ self.assertEquals(xml_command.prop("status"), "executing")
+ self.assertNotEquals(xml_command.prop("sessionid"), None)
+ self._check_actions(result[0], ["complete"])
+ session_id = xml_command.prop("sessionid")
+ context_session = self.command_manager.sessions[session_id][1]
+ self.assertEquals(context_session["user_jids"],
+ ["test1@test.com"])
+
+ # Second step
+ info_query = Iq(stanza_type="set",
+ from_jid="admin@test.com",
+ to_jid=self.comp.jid)
+ command_node = info_query.set_new_content(command.COMMAND_NS, "command")
+ command_node.setProp("node", "jmc#force-check")
+ command_node.setProp("sessionid", session_id)
+ command_node.setProp("action", "complete")
+ submit_form = Form(xmlnode_or_type="submit")
+ submit_form.add_field(field_type="list-multi",
+ name="account_names",
+ values=["account11/test1@test.com",
+ "account12/test1@test.com"])
+ submit_form.as_xml(command_node)
+ result = self.command_manager.apply_command_action(info_query,
+ "jmc#force-check",
+ "execute")
+ xml_command = result[0].xpath_eval("c:command",
+ {"c": "http://jabber.org/protocol/commands"})[0]
+ self.assertEquals(xml_command.prop("status"), "completed")
+ self.assertEquals(xml_command.prop("sessionid"), session_id)
+ self._check_actions(result[0])
+ self.assertEquals(context_session["account_names"],
+ ["account11/test1@test.com",
+ "account12/test1@test.com"])
+ feeder = self.comp.handler.feeder
+ self.assertEquals(len(feeder.checked_accounts), 2)
+ self.assertEquals(feeder.checked_accounts[0], account11)
+ self.assertEquals(feeder.checked_accounts[1], account12)
+
+def suite():
+ test_suite = unittest.TestSuite()
+ test_suite.addTest(unittest.makeSuite(MailCommandManager_TestCase, 'test'))
+ return test_suite
+
+if __name__ == '__main__':
+ unittest.main(defaultTest='suite')