Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/dax/jmc.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDavid Rousselie <dax@happycoders.org>2008-03-06 10:34:19 +0300
committerDavid Rousselie <dax@happycoders.org>2008-03-06 10:34:19 +0300
commit20ddfe0db3ac246b6a27c924551dab4f7b253dfd (patch)
tree020576829ac10264111ef0ba1c3bd1909b4f28ba /src
parent103f8accd8d91f2fd853696bdd26a12bbc5828fe (diff)
get-email ad-hoc command basic behavior
darcs-hash:20080306073419-86b55-b04b36cd21ba4b85897295aef07362a69402a84e.gz
Diffstat (limited to 'src')
-rw-r--r--src/jmc/jabber/command.py69
-rw-r--r--src/jmc/jabber/component.py158
-rw-r--r--src/jmc/jabber/tests/command.py592
-rw-r--r--src/jmc/jabber/tests/component.py5
-rw-r--r--src/jmc/model/account.py15
-rw-r--r--src/jmc/model/tests/account.py76
6 files changed, 474 insertions, 441 deletions
diff --git a/src/jmc/jabber/command.py b/src/jmc/jabber/command.py
index 4aa3d81..67d6003 100644
--- a/src/jmc/jabber/command.py
+++ b/src/jmc/jabber/command.py
@@ -24,11 +24,13 @@ import logging
import re
from pyxmpp.jabber.dataforms import Form
+
import jcl.model.account as account
+import jcl.jabber.command as command
+from jcl.jabber.command import JCLCommandManager
from jmc.model.account import MailAccount
-from jcl.jabber.command import JCLCommandManager
-import jcl.jabber.command as command
+from jmc.jabber.feeder import MailSender
class MailCommandManager(JCLCommandManager):
"""
@@ -46,6 +48,7 @@ class MailCommandManager(JCLCommandManager):
self.__logger = logging.getLogger("jmc.jabber.command.JMCCommandManager")
#self.commands["jmc#retrieve-attachment"] = (False, command.account_node_re)
self.commands["jmc#force-check"] = (False, re.compile(".*"))
+ self.commands["jmc#get-email"] = (False, command.account_node_re)
# Delayed to JMC 0.3.1
def execute_retrieve_attachment_1(self, info_query, session_context,
@@ -84,10 +87,11 @@ class MailCommandManager(JCLCommandManager):
self.__logger.debug("Executing command 'force-check' step 1 on root node")
self.add_actions(command_node, [command.ACTION_COMPLETE])
session_context["user_jids"] = [unicode(info_query.get_from().bare())]
- return (self.add_form_select_accounts(session_context, command_node,
- lang_class, "TODO:TITLE",
- "TODO:DESC", format_as_xml=True,
- show_user_jid=False), [])
+ return (self.add_form_select_accounts(\
+ session_context, command_node, lang_class,
+ lang_class.command_force_check,
+ lang_class.command_force_check_1_description,
+ format_as_xml=True, show_user_jid=False), [])
def execute_force_check_1(self, info_query, session_context,
command_node, lang_class):
@@ -117,3 +121,56 @@ class MailCommandManager(JCLCommandManager):
return (None, [])
execute_force_check_2 = execute_force_check_1
+
+ def execute_get_email_1(self, info_query, session_context,
+ command_node, lang_class):
+ self.__logger.debug("Executing command 'get-email' step 1")
+ self.add_actions(command_node, [command.ACTION_COMPLETE])
+ bare_from_jid = info_query.get_from().bare()
+ account_name = info_query.get_to().node
+ _account = account.get_account(bare_from_jid, account_name)
+ if _account is not None:
+ result_form = Form(\
+ xmlnode_or_type="form",
+ title=lang_class.command_get_email,
+ instructions=lang_class.command_get_email_1_description)
+ email_list = _account.get_mail_list_summary()
+ field = result_form.add_field(name="emails",
+ field_type="list-multi",
+ label=lang_class.field_email_subject)
+ for (email_index, email_subject) in email_list:
+ field.add_option(label=email_subject, values=[email_index])
+ result_form.add_field(name="fetch_more",
+ field_type="boolean",
+ label=lang_class.field_select_more_emails)
+ result_form.as_xml(command_node)
+ return (result_form, [])
+ else:
+ # TODO Error
+ return (None, [])
+
+ def execute_get_email_2(self, info_query, session_context,
+ command_node, lang_class):
+ self.__logger.debug("Executing command 'get-email' step 2")
+ result = []
+ mail_sender = MailSender(self.component)
+ bare_from_jid = info_query.get_from().bare()
+ account_name = info_query.get_to().node
+ _account = account.get_account(bare_from_jid, account_name)
+ if _account is not None:
+ for email_index in session_context["emails"]:
+ (email_body, email_from) = _account.get_mail(email_index)
+ result.append(\
+ mail_sender.create_full_email_message(\
+ email_from,
+ lang_class.mail_subject % (email_from),
+ email_body,
+ _account))
+ result_form = Form(\
+ xmlnode_or_type="form",
+ title=lang_class.command_get_email,
+ instructions=lang_class.command_get_email_2_description \
+ % (len(session_context["emails"])))
+ result_form.as_xml(command_node)
+ command_node.setProp("status", command.STATUS_COMPLETED)
+ return (None, result)
diff --git a/src/jmc/jabber/component.py b/src/jmc/jabber/component.py
index ab21381..7514582 100644
--- a/src/jmc/jabber/component.py
+++ b/src/jmc/jabber/component.py
@@ -21,22 +21,19 @@
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##
-import logging
-
-from pyxmpp.jid import JID
-
from sqlobject.sqlbuilder import AND
import jcl.jabber as jabber
import jcl.model as model
from jcl.model import account
-from jcl.model.account import Account, User, PresenceAccount
+from jcl.model.account import Account, User
from jcl.jabber.disco import \
AccountTypeDiscoGetInfoHandler, AccountDiscoGetInfoHandler
-from jcl.jabber.feeder import FeederComponent, Feeder, MessageSender, \
- HeadlineSender, FeederHandler
+from jcl.jabber.feeder import FeederComponent
from jcl.jabber.command import CommandRootDiscoGetInfoHandler
from jcl.jabber.component import AccountManager
+from jmc.model.account import IMAPAccount, POP3Account, \
+ SMTPAccount
from jmc.jabber.disco import MailRootDiscoGetInfoHandler, \
IMAPAccountDiscoGetItemsHandler, MailAccountTypeDiscoGetInfoHandler, \
@@ -45,10 +42,9 @@ from jmc.jabber.message import SendMailMessageHandler, \
RootSendMailMessageHandler
from jmc.jabber.presence import MailSubscribeHandler, \
MailUnsubscribeHandler, MailPresenceHandler
-from jmc.model.account import MailAccount, IMAPAccount, POP3Account, \
- SMTPAccount
from jmc.lang import Lang
from jmc.jabber.command import MailCommandManager
+from jmc.jabber.feeder import MailFeederHandler, MailFeeder, MailSender
class MailAccountManager(AccountManager):
def account_get_register(self, info_query,
@@ -156,147 +152,3 @@ class MailComponent(FeederComponent):
if lang_class is None:
lang_class = self.lang.get_default_lang_class()
self.handler.handle(None, lang_class, accounts)
-
-class MailFeeder(Feeder):
- """Email check"""
-
- def __init__(self, component):
- """MailFeeder constructor"""
- Feeder.__init__(self, component)
- self.__logger = logging.getLogger("jmc.jabber.component.MailFeeder")
-
- def initialize_live_email(self, _account):
- """For live email checking account, mark emails received while
- offline as read.
- Return a boolean to continue mail checking or not
- (if waiting for password).
- """
- if _account.password is None:
- if not _account.waiting_password_reply:
- account_manager = self.component.account_manager
- self.component.send_stanzas(\
- account_manager.ask_password(_account,
- _account.default_lang_class))
- return False
- try:
- _account.connect()
- _account.mark_all_as_read()
- _account.disconnect()
- _account.first_check = False
- _account.error = None
- return True
- except Exception, e:
- if _account.connected:
- try:
- _account.disconnect()
- except:
- # We have done everything we could
- _account.connected = False
- self.component.send_error(_account, e)
- return False
-
- def feed(self, _account):
- """Check for new emails for given MailAccount and return a list of
- those emails or a summary.
- """
- self.__logger.debug("MailFeeder.feed")
- result = []
- if _account.first_check and _account.live_email_only:
- continue_checking = self.initialize_live_email(_account)
- if not continue_checking:
- return result
- _account.lastcheck += 1
- if _account.lastcheck == _account.interval:
- _account.lastcheck = 0
- action = _account.action
- if action != PresenceAccount.DO_NOTHING:
- try:
- if _account.password is None:
- account_manager = self.component.account_manager
- self.component.send_stanzas(\
- account_manager.ask_password(_account,
- _account.default_lang_class))
- return result
- self.__logger.debug("Checking " + _account.name)
- self.__logger.debug("\t" + _account.login \
- + "@" + _account.host)
- _account.connect()
- mail_list = _account.get_new_mail_list()
- default_lang_class = _account.default_lang_class
- if action == MailAccount.RETRIEVE:
- # TODO : use generator (yield)
- mail_index = _account.get_next_mail_index(mail_list)
- while mail_index is not None:
- (body, email_from) = _account.get_mail(mail_index)
- result.append((email_from,
- default_lang_class.new_mail_subject\
- % (email_from),
- body))
- mail_index = _account.get_next_mail_index(mail_list)
- elif action == MailAccount.DIGEST:
- body = ""
- new_mail_count = 0
- mail_index = _account.get_next_mail_index(mail_list)
- while mail_index is not None:
- (tmp_body, from_email) = \
- _account.get_mail_summary(mail_index)
- body += tmp_body
- body += "\n----------------------------------\n"
- mail_index = _account.get_next_mail_index(mail_list)
- new_mail_count += 1
- if body != "":
- result.append((None,
- default_lang_class.new_digest_subject\
- % (new_mail_count),
- body))
- else:
- raise Exception("Unkown action: " + str(action) \
- + "\nPlease reconfigure account.")
- _account.disconnect()
- _account.error = None
- self.__logger.debug("\nCHECK_MAIL ends " + _account.jid)
- except Exception, e:
- if _account.connected:
- try:
- _account.disconnect()
- except:
- # We have done everything we could
- _account.connected = False
- self.component.send_error(_account, e)
- return result
-
-class MailSender(HeadlineSender):
- """Send emails messages to jabber users"""
-
- def send(self, to_account, data):
- """Call MessageSender send method"""
- MessageSender.send(self, to_account, data)
-
- def create_message(self, to_account, data):
- """Send given emails (in data) as Jabber messages"""
- email_from, subject, body = data
- if to_account.action == MailAccount.RETRIEVE:
- message = MessageSender.create_message(self, to_account,
- (subject, body))
- msg_node = message.get_node()
- addresses_node = msg_node.newChild(None, "addresses", None)
- address_ns = addresses_node.newNs("http://jabber.org/protocol/address", None)
- addresses_node.setNs(address_ns)
- replyto_address_node = addresses_node.newChild(address_ns, "address", None)
- replyto_address_node.setProp("type", "replyto")
- replyto_jid = email_from.replace('@', '%', 1) + "@" \
- + unicode(JID(to_account.jid).domain)
- replyto_address_node.setProp("jid", replyto_jid)
- elif to_account.action == MailAccount.DIGEST:
- message = HeadlineSender.create_message(self, to_account,
- (subject, body))
- else:
- message = None
- return message
-
-class MailFeederHandler(FeederHandler):
- def filter(self, stanza, lang_class):
- """Return only email account type to check mail from
- """
- accounts = account.get_all_accounts(account_class=MailAccount)
- return accounts
diff --git a/src/jmc/jabber/tests/command.py b/src/jmc/jabber/tests/command.py
index 4064eca..1cff3b7 100644
--- a/src/jmc/jabber/tests/command.py
+++ b/src/jmc/jabber/tests/command.py
@@ -24,27 +24,31 @@ import unittest
import tempfile
from ConfigParser import ConfigParser
import os
+import sys
+import logging
from pyxmpp.iq import Iq
-from pyxmpp.jabber.dataforms import Form
+from pyxmpp.jabber.dataforms import Field
import jcl.tests
-from jcl.jabber.tests.command import JCLCommandManager_TestCase
+from jcl.tests import JCLTestCase
from jcl.jabber.feeder import Feeder
-from jcl.model.account import User
+from jcl.model.account import User, Account, PresenceAccount
+from jcl.jabber.tests.command import JCLCommandManagerTestCase
import jcl.jabber.command as command
from jmc.model.account import POP3Account, IMAPAccount, SMTPAccount, \
MailAccount
from jmc.jabber.component import MailComponent
-
+from jmc.lang import Lang
from jmc.jabber.tests.component import MockIMAPAccount
+from jmc.jabber.command import MailCommandManager
-class MailCommandManager_TestCase(JCLCommandManager_TestCase):
- def setUp(self):
- JCLCommandManager_TestCase.setUp(self, tables=[POP3Account, IMAPAccount,
- SMTPAccount, MailAccount,
- MockIMAPAccount])
+class MailCommandManagerTestCase(JCLCommandManagerTestCase):
+ def setUp(self, tables=[]):
+ tables += [POP3Account, IMAPAccount, SMTPAccount, MailAccount,
+ MockIMAPAccount, User, Account, PresenceAccount]
+ JCLTestCase.setUp(self, tables=tables)
self.config_file = tempfile.mktemp(".conf", "jmctest", jcl.tests.DB_DIR)
self.config = ConfigParser()
self.config.read(self.config_file)
@@ -55,182 +59,36 @@ class MailCommandManager_TestCase(JCLCommandManager_TestCase):
self.config,
self.config_file)
self.comp.set_admins(["admin@test.com"])
- self.command_manager = command.command_manager
-
- def tearDown(self):
- JCLCommandManager_TestCase.tearDown(self)
- if os.path.exists(self.config_file):
- os.unlink(self.config_file)
-
-# def test_execute_retrieve_attachment(self):
-# self.comp.account_manager.account_classes = (POP3Account, IMAPAccount,
-# SMTPAccount, MockIMAPAccount)
-# account1 = MockIMAPAccount(user=User(jid="test1@test.com"),
-# name="account1",
-# jid="account1@" + unicode(self.comp.jid))
-# info_query = Iq(stanza_type="set",
-# from_jid="test1@test.com",
-# to_jid="account1@" + unicode(self.comp.jid))
-# command_node = info_query.set_new_content(command.COMMAND_NS, "command")
-# command_node.setProp("node", "jmc#retrieve-attachment")
-# result = self.command_manager.apply_command_action(info_query,
-# "jmc#retrieve-attachment",
-# "execute")
-# self.assertNotEquals(result, None)
-# self.assertEquals(len(result), 1)
-# print str(result[0].xmlnode)
-# xml_command = result[0].xpath_eval("c:command",
-# {"c": "http://jabber.org/protocol/commands"})[0]
-# self.assertEquals(xml_command.prop("status"), "executing")
-# self.assertNotEquals(xml_command.prop("sessionid"), None)
-# self._check_actions(result[0], ["next"])
-# print str(result[0].xmlnode)
-# x_data = result[0].xpath_eval("c:command/data:x",
-# {"c": "http://jabber.org/protocol/commands",
-# "data": "jabber:x:data"})
-# self.assertEquals(len(x_data), 1)
-# self.assertEquals(x_data[0].prop("type"), "form")
-# options = result[0].xpath_eval("c:command/data:x/data:field[1]/data:option",
-# {"c": "http://jabber.org/protocol/commands",
-# "data": "jabber:x:data"})
-# self.assertEquals(len(options), 3)
-# self.assertEquals(options[0].prop("label"), "Next")
-# self.assertEquals(options[0].children.name, "value")
-# self.assertEquals(options[0].children.content, "-1")
-# self.assertEquals(options[1].prop("label"), "mail 1")
-# self.assertEquals(options[1].children.name, "value")
-# self.assertEquals(options[1].children.content, "1")
-# self.assertEquals(options[2].prop("label"), "mail 2")
-# self.assertEquals(options[2].children.name, "value")
-# self.assertEquals(options[2].children.content, "2")
-
-# # Delayed to JMC 0.3.1
-# return
-# # Second step: TODO
-# info_query = Iq(stanza_type="set",
-# from_jid="admin@test.com",
-# to_jid=self.comp.jid)
-# command_node = info_query.set_new_content(command.COMMAND_NS, "command")
-# command_node.setProp("node", "http://jabber.org/protocol/admin#add-user")
-# session_id = xml_command.prop("sessionid")
-# command_node.setProp("sessionid", session_id)
-# command_node.setProp("action", "next")
-# submit_form = Form(xmlnode_or_type="submit")
-# submit_form.add_field(field_type="list-single",
-# name="account_type",
-# value="Example")
-# submit_form.add_field(field_type="jid-single",
-# name="user_jid",
-# value="user2@test.com")
-# submit_form.as_xml(command_node)
-# result = self.command_manager.apply_command_action(info_query,
-# "http://jabber.org/protocol/admin#add-user",
-# "next")
-# self.assertNotEquals(result, None)
-# self.assertEquals(len(result), 1)
-# xml_command = result[0].xpath_eval("c:command",
-# {"c": "http://jabber.org/protocol/commands"})[0]
-# self.assertEquals(xml_command.prop("status"), "executing")
-# self.assertEquals(xml_command.prop("sessionid"), session_id)
-# self._check_actions(result[0], ["prev", "complete"], 1)
-# x_data = result[0].xpath_eval("c:command/data:x",
-# {"c": "http://jabber.org/protocol/commands",
-# "data": "jabber:x:data"})
-# self.assertEquals(len(x_data), 1)
-# self.assertEquals(x_data[0].prop("type"), "form")
-# fields = result[0].xpath_eval("c:command/data:x/data:field",
-# {"c": "http://jabber.org/protocol/commands",
-# "data": "jabber:x:data"})
-# self.assertEquals(len(fields), 6)
-# context_session = self.command_manager.sessions[session_id][1]
-# self.assertEquals(context_session["account_type"], ["Example"])
-# self.assertEquals(context_session["user_jid"], ["user2@test.com"])
-
-# # Third step
-# info_query = Iq(stanza_type="set",
-# from_jid="admin@test.com",
-# to_jid=self.comp.jid)
-# command_node = info_query.set_new_content(command.COMMAND_NS, "command")
-# command_node.setProp("node", "http://jabber.org/protocol/admin#add-user")
-# command_node.setProp("sessionid", session_id)
-# command_node.setProp("action", "complete")
-# submit_form = Form(xmlnode_or_type="submit")
-# submit_form.add_field(field_type="text-single",
-# name="name",
-# value="account1")
-# submit_form.add_field(field_type="text-single",
-# name="login",
-# value="login1")
-# submit_form.add_field(field_type="text-private",
-# name="password",
-# value="pass1")
-# submit_form.add_field(field_type="boolean",
-# name="store_password",
-# value="1")
-# submit_form.add_field(field_type="list-single",
-# name="test_enum",
-# value="choice2")
-# submit_form.add_field(field_type="text-single",
-# name="test_int",
-# value="42")
-# submit_form.as_xml(command_node)
-
-# result = self.command_manager.apply_command_action(info_query,
-# "http://jabber.org/protocol/admin#add-user",
-# "execute")
-# xml_command = result[0].xpath_eval("c:command",
-# {"c": "http://jabber.org/protocol/commands"})[0]
-# self.assertEquals(xml_command.prop("status"), "completed")
-# self.assertEquals(xml_command.prop("sessionid"), session_id)
-# self._check_actions(result[0])
-
-# self.assertEquals(context_session["name"], ["account1"])
-# self.assertEquals(context_session["login"], ["login1"])
-# self.assertEquals(context_session["password"], ["pass1"])
-# self.assertEquals(context_session["store_password"], ["1"])
-# self.assertEquals(context_session["test_enum"], ["choice2"])
-# self.assertEquals(context_session["test_int"], ["42"])
-
-# model.db_connect()
-# _account = account.get_account("user2@test.com",
-# "account1")
-# self.assertNotEquals(_account, None)
-# self.assertEquals(_account.user.jid, "user2@test.com")
-# self.assertEquals(_account.name, "account1")
-# self.assertEquals(_account.jid, "account1@" + unicode(self.comp.jid))
-# model.db_disconnect()
-
-# stanza_sent = result
-# self.assertEquals(len(stanza_sent), 4)
-# iq_result = stanza_sent[0]
-# self.assertTrue(isinstance(iq_result, Iq))
-# self.assertEquals(iq_result.get_node().prop("type"), "result")
-# self.assertEquals(iq_result.get_from(), self.comp.jid)
-# self.assertEquals(iq_result.get_to(), "admin@test.com")
-# presence_component = stanza_sent[1]
-# self.assertTrue(isinstance(presence_component, Presence))
-# self.assertEquals(presence_component.get_from(), self.comp.jid)
-# self.assertEquals(presence_component.get_to(), "user2@test.com")
-# self.assertEquals(presence_component.get_node().prop("type"),
-# "subscribe")
-# message = stanza_sent[2]
-# self.assertTrue(isinstance(message, Message))
-# self.assertEquals(message.get_from(), self.comp.jid)
-# self.assertEquals(message.get_to(), "user2@test.com")
-# self.assertEquals(message.get_subject(),
-# _account.get_new_message_subject(Lang.en))
-# self.assertEquals(message.get_body(),
-# _account.get_new_message_body(Lang.en))
-# presence_account = stanza_sent[3]
-# self.assertTrue(isinstance(presence_account, Presence))
-# self.assertEquals(presence_account.get_from(), "account1@" + unicode(self.comp.jid))
-# self.assertEquals(presence_account.get_to(), "user2@test.com")
-# self.assertEquals(presence_account.get_node().prop("type"),
-# "subscribe")
-
- def test_execute_force_check(self):
+ self.command_manager = MailCommandManager(self.comp,
+ self.comp.account_manager)
self.comp.account_manager.account_classes = (POP3Account, IMAPAccount,
SMTPAccount, MockIMAPAccount)
+ self.user1 = User(jid="test1@test.com")
+ self.account11 = MockIMAPAccount(user=self.user1,
+ name="account11",
+ jid="account11@" + unicode(self.comp.jid))
+ self.account12 = MockIMAPAccount(user=self.user1,
+ name="account12",
+ jid="account12@" + unicode(self.comp.jid))
+ self.user2 = User(jid="test2@test.com")
+ self.account21 = MockIMAPAccount(user=self.user2,
+ name="account21",
+ jid="account21@" + unicode(self.comp.jid))
+ self.account22 = MockIMAPAccount(user=self.user2,
+ name="account11",
+ jid="account11@" + unicode(self.comp.jid))
+ self.user3 = User(jid="test3@test.com")
+ self.account31 = MockIMAPAccount(user=self.user3,
+ name="account31",
+ jid="account31@" + unicode(self.comp.jid))
+ self.account32 = MockIMAPAccount(user=self.user3,
+ name="account32",
+ jid="account32@" + unicode(self.comp.jid))
+ self.info_query = Iq(stanza_type="set",
+ from_jid="admin@test.com",
+ to_jid=self.comp.jid)
+ self.command_node = self.info_query.set_new_content(command.COMMAND_NS,
+ "command")
class MockFeederHandler(Feeder):
def __init__(self, component):
Feeder.__init__(self, component)
@@ -242,120 +100,296 @@ class MailCommandManager_TestCase(JCLCommandManager_TestCase):
return []
self.comp.handler.feeder = MockFeederHandler(self.comp)
- user1 = User(jid="test1@test.com")
- user2 = User(jid="test2@test.com")
- account11 = MockIMAPAccount(user=user1,
- name="account11",
- jid="account11@" + unicode(self.comp.jid))
- account12 = MockIMAPAccount(user=user1,
- name="account12",
- jid="account12@" + unicode(self.comp.jid))
- account21 = MockIMAPAccount(user=user2,
- name="account21",
- jid="account21@" + unicode(self.comp.jid))
- account22 = MockIMAPAccount(user=user2,
- name="account11",
- jid="account11@" + unicode(self.comp.jid))
- info_query = Iq(stanza_type="set",
- from_jid="test1@test.com",
- to_jid="account11@" + unicode(self.comp.jid))
- command_node = info_query.set_new_content(command.COMMAND_NS, "command")
- command_node.setProp("node", "jmc#force-check")
- result = self.command_manager.apply_command_action(info_query,
- "jmc#force-check",
- "execute")
- self.assertNotEquals(result, None)
- self.assertEquals(len(result), 1)
- xml_command = result[0].xpath_eval("c:command",
- {"c": "http://jabber.org/protocol/commands"})[0]
- self.assertEquals(xml_command.prop("status"), "completed")
- self._check_actions(result[0])
+
+ def tearDown(self):
+ JCLTestCase.tearDown(self)
+ if os.path.exists(self.config_file):
+ os.unlink(self.config_file)
+
+class MailCommandManagerForceCheckCommand_TestCase(MailCommandManagerTestCase):
+ """
+ Test 'force-check' ad-hoc command
+ """
+
+ def setUp(self, tables=[]):
+ """
+ Prepare data
+ """
+ MailCommandManagerTestCase.setUp(self, tables)
+ self.command_node.setProp("node", "jmc#force-check")
+
+ def test_execute_force_check(self):
+ self.info_query.set_from("test1@test.com")
+ self.info_query.set_to("account11@" + unicode(self.comp.jid))
+ result = self.command_manager.apply_command_action(\
+ self.info_query,
+ "jmc#force-check",
+ "execute")
+ result_iq = result[0].xmlnode
+ result_iq.setNs(None)
+ self.assertTrue(jcl.tests.is_xml_equal(\
+ u"<iq from='account11@" + unicode(self.comp.jid)
+ + "' to='test1@test.com' type='result'>"
+ + "<command xmlns='http://jabber.org/protocol/commands' "
+ + "status='completed'>"
+ + "</command></iq>",
+ result_iq, True, test_sibling=False))
feeder = self.comp.handler.feeder
self.assertEquals(len(feeder.checked_accounts), 1)
- self.assertEquals(feeder.checked_accounts[0], account11)
+ self.assertEquals(feeder.checked_accounts[0], self.account11)
def test_execute_force_check_root_node(self):
- self.comp.account_manager.account_classes = (POP3Account, IMAPAccount,
- SMTPAccount, MockIMAPAccount)
- class MockFeederHandler(Feeder):
- def __init__(self, component):
- Feeder.__init__(self, component)
- self.checked_accounts = []
-
- def feed(self, _account):
- self.checked_accounts.append(_account)
- assert(_account.lastcheck == (_account.interval - 1))
- return []
-
- self.comp.handler.feeder = MockFeederHandler(self.comp)
- user1 = User(jid="test1@test.com")
- user2 = User(jid="test2@test.com")
- account11 = MockIMAPAccount(user=user1,
- name="account11",
- jid="account11@" + unicode(self.comp.jid))
- account12 = MockIMAPAccount(user=user1,
- name="account12",
- jid="account12@" + unicode(self.comp.jid))
- account21 = MockIMAPAccount(user=user2,
- name="account21",
- jid="account21@" + unicode(self.comp.jid))
- account22 = MockIMAPAccount(user=user2,
- name="account11",
- jid="account11@" + unicode(self.comp.jid))
- info_query = Iq(stanza_type="set",
- from_jid="test1@test.com",
- to_jid=self.comp.jid)
- command_node = info_query.set_new_content(command.COMMAND_NS, "command")
- command_node.setProp("node", "jmc#force-check")
- result = self.command_manager.apply_command_action(info_query,
- "jmc#force-check",
- "execute")
- self.assertNotEquals(result, None)
- self.assertEquals(len(result), 1)
- xml_command = result[0].xpath_eval("c:command",
- {"c": "http://jabber.org/protocol/commands"})[0]
- self.assertEquals(xml_command.prop("status"), "executing")
- self.assertNotEquals(xml_command.prop("sessionid"), None)
- self._check_actions(result[0], ["complete"])
- session_id = xml_command.prop("sessionid")
+ self.info_query.set_from("test1@test.com")
+ result = self.command_manager.apply_command_action(\
+ self.info_query,
+ "jmc#force-check",
+ "execute")
+ result_iq = result[0].xmlnode
+ result_iq.setNs(None)
+ self.assertTrue(jcl.tests.is_xml_equal(\
+ u"<iq from='jmc.test.com' to='test1@test.com' type='result'>"
+ + "<command xmlns='http://jabber.org/protocol/commands'"
+ + "status='executing'>"
+ + "<actions execute='complete'><complete/></actions>"
+ + "<x xmlns='jabber:x:data' type='form'>"
+ + "<title>" + Lang.en.command_force_check + "</title>"
+ + "<instructions>" + Lang.en.command_force_check_1_description
+ + "</instructions>"
+ + "<field var='account_names' type='list-multi' label='"
+ + Lang.en.field_accounts + "'>"
+ + "<option label=\"account11 (IMAP)\">"
+ + "<value>account11/test1@test.com</value></option>"
+ + "<option label=\"account12 (IMAP)\">"
+ + "<value>account12/test1@test.com</value></option>"
+ + "<option label=\"account11 (MockIMAP)\">"
+ + "<value>account11/test1@test.com</value></option>"
+ + "<option label=\"account12 (MockIMAP)\">"
+ + "<value>account12/test1@test.com</value></option>"
+ + "</field></x></command></iq>",
+ result_iq, True))
+ session_id = result_iq.children.prop("sessionid")
+ self.assertNotEquals(session_id, None)
context_session = self.command_manager.sessions[session_id][1]
self.assertEquals(context_session["user_jids"],
["test1@test.com"])
# Second step
- info_query = Iq(stanza_type="set",
- from_jid="admin@test.com",
- to_jid=self.comp.jid)
- command_node = info_query.set_new_content(command.COMMAND_NS, "command")
- command_node.setProp("node", "jmc#force-check")
- command_node.setProp("sessionid", session_id)
- command_node.setProp("action", "complete")
- submit_form = Form(xmlnode_or_type="submit")
- submit_form.add_field(field_type="list-multi",
- name="account_names",
- values=["account11/test1@test.com",
- "account12/test1@test.com"])
- submit_form.as_xml(command_node)
- result = self.command_manager.apply_command_action(info_query,
- "jmc#force-check",
- "execute")
- xml_command = result[0].xpath_eval("c:command",
- {"c": "http://jabber.org/protocol/commands"})[0]
- self.assertEquals(xml_command.prop("status"), "completed")
- self.assertEquals(xml_command.prop("sessionid"), session_id)
- self._check_actions(result[0])
+ info_query = self.prepare_submit(\
+ node="jmc#force-check",
+ session_id=session_id,
+ from_jid="test1@test.com",
+ to_jid=unicode(self.comp.jid),
+ fields=[Field(field_type="list-multi",
+ name="account_names",
+ values=["account11/test1@test.com",
+ "account12/test1@test.com"])],
+ action="complete")
+ result = self.command_manager.apply_command_action(\
+ info_query,
+ "jmc#force-check",
+ "execute")
+ result_iq = result[0].xmlnode
+ result_iq.setNs(None)
+ self.assertTrue(jcl.tests.is_xml_equal(\
+ u"<iq from='" + unicode(self.comp.jid)
+ + "' to='test1@test.com' type='result'>"
+ + "<command xmlns='http://jabber.org/protocol/commands' "
+ + "status='completed'>"
+ + "</command></iq>",
+ result_iq, True, test_sibling=False))
self.assertEquals(context_session["account_names"],
["account11/test1@test.com",
"account12/test1@test.com"])
feeder = self.comp.handler.feeder
self.assertEquals(len(feeder.checked_accounts), 2)
- self.assertEquals(feeder.checked_accounts[0], account11)
- self.assertEquals(feeder.checked_accounts[1], account12)
+ self.assertEquals(feeder.checked_accounts[0], self.account11)
+ self.assertEquals(feeder.checked_accounts[1], self.account12)
+
+class MailCommandManagerGetEmailCommand_TestCase(MailCommandManagerTestCase):
+ """
+ Test 'get-email' ad-hoc command
+ """
+
+ def setUp(self, tables=[]):
+ """
+ Prepare data
+ """
+ MailCommandManagerTestCase.setUp(self, tables)
+ self.command_node.setProp("node", "jmc#get-email")
+ def get_email(email_index):
+ """
+ Mock method for IMAPAccount.get_email
+ """
+ return ("mail body " + str(email_index),
+ "from" + str(email_index) + "@test.com")
+ self.account11.__dict__["get_mail"] = get_email
+
+ def check_step_1 (self, result):
+ """
+ Check first step result of get-email ad-hoc command
+ """
+ result_iq = result[0].xmlnode
+ result_iq.setNs(None)
+ self.assertTrue(jcl.tests.is_xml_equal(\
+ u"<iq from='account11@" + unicode(self.comp.jid)
+ + "' to='test1@test.com' type='result'>"
+ + "<command xmlns='http://jabber.org/protocol/commands'"
+ + "status='executing'>"
+ + "<actions execute='complete'><complete/></actions>"
+ + "<x xmlns='jabber:x:data' type='form'>"
+ + "<title>" + Lang.en.command_get_email + "</title>"
+ + "<instructions>" + Lang.en.command_get_email_1_description
+ + "</instructions>"
+ + "<field var='emails' type='list-multi' label='"
+ + Lang.en.field_email_subject + "'>"
+ + "<option label=\"mail 1\">"
+ + "<value>1</value></option>"
+ + "<option label=\"mail 2\">"
+ + "<value>2</value></option>"
+ + "</field><field var='fetch_more' type='boolean' label='"
+ + Lang.en.field_select_more_emails + "'>"
+ + "</field></x></command></iq>",
+ result_iq, True))
+ session_id = result_iq.children.prop("sessionid")
+ self.assertNotEquals(session_id, None)
+ return session_id
+
+ def test_execute_get_email(self):
+ """
+ Test single email retrieval
+ """
+ self.info_query.set_from("test1@test.com")
+ self.info_query.set_to("account11@" + unicode(self.comp.jid))
+ result = self.command_manager.apply_command_action(\
+ self.info_query,
+ "jmc#get-email",
+ "execute")
+ session_id = self.check_step_1(result)
+
+ # Second step
+ info_query = self.prepare_submit(\
+ node="jmc#get-email",
+ session_id=session_id,
+ from_jid="test1@test.com",
+ to_jid="account11@jmc.test.com",
+ fields=[Field(field_type="list-multi",
+ name="emails",
+ values=["1"]),
+ Field(field_type="boolean",
+ name="fetch_more",
+ value=False)],
+ action="complete")
+ result = self.command_manager.apply_command_action(\
+ info_query,
+ "jmc#get-email",
+ "execute")
+ self.assertEquals(len(result), 2)
+ result_iq = result[0].xmlnode
+ result_iq.setNs(None)
+ self.assertTrue(jcl.tests.is_xml_equal(\
+ u"<iq from='account11@" + unicode(self.comp.jid)
+ + "' to='test1@test.com' type='result'>"
+ + "<command xmlns='http://jabber.org/protocol/commands' "
+ + "status='completed'>"
+ + "<x xmlns='jabber:x:data' type='form'>"
+ + "<title>" + Lang.en.command_get_email + "</title>"
+ + "<instructions>" + Lang.en.command_get_email_2_description
+ % (1) + "</instructions>"
+ + "</x></command></iq>",
+ result_iq, True, test_sibling=False))
+ result_iq = result[1].xmlnode
+ self.assertTrue(jcl.tests.is_xml_equal(\
+ u"<message from='account11@" + unicode(self.comp.jid)
+ + "' to='test1@test.com' "
+ + "xmlns='http://pyxmpp.jabberstudio.org/xmlns/common'>"
+ + "<subject>" + Lang.en.mail_subject % ("from1@test.com")
+ + "</subject>"
+ + "<body>mail body 1</body>"
+ + "<addresses xmlns='http://jabber.org/protocol/address'>"
+ + "<address type='replyto' jid='from1%test.com@jmc.test.com'/>"
+ + "</addresses>"
+ + "</message>",
+ result_iq, True, test_sibling=False))
+
+ def test_execute_get_emails(self):
+ """
+ Test multiple emails retrieval
+ """
+ self.info_query.set_from("test1@test.com")
+ self.info_query.set_to("account11@" + unicode(self.comp.jid))
+ result = self.command_manager.apply_command_action(\
+ self.info_query,
+ "jmc#get-email",
+ "execute")
+ session_id = self.check_step_1(result)
+
+ # Second step
+ info_query = self.prepare_submit(\
+ node="jmc#get-email",
+ session_id=session_id,
+ from_jid="test1@test.com",
+ to_jid="account11@jmc.test.com",
+ fields=[Field(field_type="list-multi",
+ name="emails",
+ values=["1", "2"]),
+ Field(field_type="boolean",
+ name="fetch_more",
+ value=False)],
+ action="complete")
+ result = self.command_manager.apply_command_action(\
+ info_query,
+ "jmc#get-email",
+ "execute")
+ self.assertEquals(len(result), 3)
+ result_iq = result[0].xmlnode
+ result_iq.setNs(None)
+ self.assertTrue(jcl.tests.is_xml_equal(\
+ u"<iq from='account11@" + unicode(self.comp.jid)
+ + "' to='test1@test.com' type='result'>"
+ + "<command xmlns='http://jabber.org/protocol/commands' "
+ + "status='completed'>"
+ + "<x xmlns='jabber:x:data' type='form'>"
+ + "<title>" + Lang.en.command_get_email + "</title>"
+ + "<instructions>" + Lang.en.command_get_email_2_description
+ % (2) + "</instructions>"
+ + "</x></command></iq>",
+ result_iq, True, test_sibling=False))
+ result_iq = result[1].xmlnode
+ self.assertTrue(jcl.tests.is_xml_equal(\
+ u"<message from='account11@" + unicode(self.comp.jid)
+ + "' to='test1@test.com' "
+ + "xmlns='http://pyxmpp.jabberstudio.org/xmlns/common'>"
+ + "<subject>" + Lang.en.mail_subject % ("from1@test.com")
+ + "</subject>"
+ + "<body>mail body 1</body>"
+ + "<addresses xmlns='http://jabber.org/protocol/address'>"
+ + "<address type='replyto' jid='from1%test.com@jmc.test.com'/>"
+ + "</addresses>"
+ + "</message>",
+ result_iq, True, test_sibling=False))
+ result_iq = result[2].xmlnode
+ self.assertTrue(jcl.tests.is_xml_equal(\
+ u"<message from='account11@" + unicode(self.comp.jid)
+ + "' to='test1@test.com' "
+ + "xmlns='http://pyxmpp.jabberstudio.org/xmlns/common'>"
+ + "<subject>" + Lang.en.mail_subject % ("from2@test.com")
+ + "</subject>"
+ + "<body>mail body 2</body>"
+ + "<addresses xmlns='http://jabber.org/protocol/address'>"
+ + "<address type='replyto' jid='from2%test.com@jmc.test.com'/>"
+ + "</addresses>"
+ + "</message>",
+ result_iq, True, test_sibling=False))
def suite():
test_suite = unittest.TestSuite()
- test_suite.addTest(unittest.makeSuite(MailCommandManager_TestCase, 'test'))
+ test_suite.addTest(unittest.makeSuite(MailCommandManagerForceCheckCommand_TestCase, 'test'))
+ test_suite.addTest(unittest.makeSuite(MailCommandManagerGetEmailCommand_TestCase, 'test'))
return test_suite
if __name__ == '__main__':
+ if '-v' in sys.argv:
+ logger = logging.getLogger()
+ logger.addHandler(logging.StreamHandler())
+ logger.setLevel(logging.INFO)
unittest.main(defaultTest='suite')
diff --git a/src/jmc/jabber/tests/component.py b/src/jmc/jabber/tests/component.py
index c213382..564b75d 100644
--- a/src/jmc/jabber/tests/component.py
+++ b/src/jmc/jabber/tests/component.py
@@ -155,6 +155,11 @@ class MockIMAPAccount(MockMailAccount, IMAPAccount):
return [("1", "mail 1"),
("2", "mail 2")]
+ def get_mail_list_summary(self):
+ return [("1", "mail 1"),
+ ("2", "mail 2")]
+
+
class MockPOP3Account(MockMailAccount, POP3Account):
def _init(self, *args, **kw):
POP3Account._init(self, *args, **kw)
diff --git a/src/jmc/model/account.py b/src/jmc/model/account.py
index 1b81d1a..bdbded5 100644
--- a/src/jmc/model/account.py
+++ b/src/jmc/model/account.py
@@ -324,7 +324,7 @@ class MailAccount(PresenceAccount):
def disconnect(self):
raise NotImplementedError
- def get_mail_list_summary(self, start_index=0, end_index=20):
+ def get_mail_list_summary(self, start_index=1, end_index=20):
raise NotImplementedError
def get_new_mail_list(self):
@@ -411,6 +411,10 @@ class IMAPAccount(MailAccount):
self.connected = False
def get_mail_list_summary(self, start_index=1, end_index=20):
+ """
+ Get a list of emails starting from start_index and ending at end_index
+ of tuple (email_index, email_subject)
+ """
self.__logger.debug("Getting mail list summary")
typ, count = self.connection.select(self._get_real_mailbox(), True)
result = []
@@ -428,6 +432,9 @@ class IMAPAccount(MailAccount):
return result
def get_new_mail_list(self):
+ """
+ Get a list of new emails indexes
+ """
self.__logger.debug("Getting mail list")
typ, data = self.connection.select(self._get_real_mailbox(), True)
typ, data = self.connection.search(None, 'RECENT')
@@ -574,11 +581,13 @@ class POP3Account(MailAccount):
self.connection.quit()
self.connected = False
- def get_mail_list_summary(self, start_index=0, end_index=20):
+ def get_mail_list_summary(self, start_index=1, end_index=20):
self.__logger.debug("Getting mail list")
count, size = self.connection.stat()
result = []
- for index in xrange(1, count + 1):
+ if count < end_index:
+ end_index = count
+ for index in xrange(start_index, end_index + 1):
(ret, data, octets) = self.connection.top(index, 0)
if ret[0:3] == '+OK':
subject_header = self.get_decoded_header(email.message_from_string('\n'.join(data))["Subject"])[0]
diff --git a/src/jmc/model/tests/account.py b/src/jmc/model/tests/account.py
index 1cdef4d..d755f85 100644
--- a/src/jmc/model/tests/account.py
+++ b/src/jmc/model/tests/account.py
@@ -189,6 +189,42 @@ class POP3Account_TestCase(InheritableAccount_TestCase):
[("1", "mail subject 1"),
("2", "mail subject 2")]))
+ test_get_mail_list_summary_start_index = \
+ make_test(["+OK 3 30\r\n",
+ "+OK 10 octets\r\n" + \
+ "From: user@test.com\r\n" + \
+ "Subject: mail subject 2\r\n.\r\n",
+ "+OK 10 octets\r\n" + \
+ "From: user@test.com\r\n" + \
+ "Subject: mail subject 3\r\n.\r\n",
+ "+OK\r\n"],
+ ["STAT\r\n",
+ "TOP 2 0\r\n",
+ "TOP 3 0\r\n",
+ "RSET\r\n"],
+ lambda self: \
+ self.assertEquals(self.pop3_account.get_mail_list_summary(start_index=2),
+ [("2", "mail subject 2"),
+ ("3", "mail subject 3")]))
+
+ test_get_mail_list_summary_end_index = \
+ make_test(["+OK 3 30\r\n",
+ "+OK 10 octets\r\n" + \
+ "From: user@test.com\r\n" + \
+ "Subject: mail subject 1\r\n.\r\n",
+ "+OK 10 octets\r\n" + \
+ "From: user@test.com\r\n" + \
+ "Subject: mail subject 2\r\n.\r\n",
+ "+OK\r\n"],
+ ["STAT\r\n",
+ "TOP 1 0\r\n",
+ "TOP 2 0\r\n",
+ "RSET\r\n"],
+ lambda self: \
+ self.assertEquals(self.pop3_account.get_mail_list_summary(end_index=2),
+ [("1", "mail subject 1"),
+ ("2", "mail subject 2")]))
+
test_get_new_mail_list = \
make_test(["+OK 2 20\r\n"],
["STAT\r\n"],
@@ -324,6 +360,46 @@ class IMAPAccount_TestCase(InheritableAccount_TestCase):
('2', 'mail subject 2')]))
test_func()
+ def test_get_mail_list_summary_start_index(self):
+ test_func = self.make_test(\
+ [lambda data: "* 42 EXISTS\r\n* 1 RECENT\r\n* OK" +\
+ " [UNSEEN 9]\r\n* FLAGS (\Deleted \Seen\*)\r\n*" +\
+ " OK [PERMANENTFLAGS (\Deleted \Seen\*)\r\n" + \
+ data.split()[0] + \
+ " OK [READ-WRITE] SELECT completed\r\n",
+ lambda data: "* 2 FETCH ((RFC822.header) {38}\r\n" + \
+ "Subject: mail subject 2\r\n\r\nbody text\r\n)\r\n" + \
+ "* 3 FETCH ((RFC822.header) {38}\r\n" + \
+ "Subject: mail subject 3\r\n\r\nbody text\r\n)\r\n" + \
+ data.split()[0] + " OK FETCH completed\r\n"],
+ ["^[^ ]* EXAMINE INBOX",
+ "^[^ ]* FETCH 2:20 RFC822.header"],
+ lambda self: \
+ self.assertEquals(self.imap_account.get_mail_list_summary(start_index=2),
+ [('2', 'mail subject 2'),
+ ('3', 'mail subject 3')]))
+ test_func()
+
+ def test_get_mail_list_summary_end_index(self):
+ test_func = self.make_test(\
+ [lambda data: "* 42 EXISTS\r\n* 1 RECENT\r\n* OK" +\
+ " [UNSEEN 9]\r\n* FLAGS (\Deleted \Seen\*)\r\n*" +\
+ " OK [PERMANENTFLAGS (\Deleted \Seen\*)\r\n" + \
+ data.split()[0] + \
+ " OK [READ-WRITE] SELECT completed\r\n",
+ lambda data: "* 1 FETCH ((RFC822.header) {38}\r\n" + \
+ "Subject: mail subject 1\r\n\r\nbody text\r\n)\r\n" + \
+ "* 2 FETCH ((RFC822.header) {38}\r\n" + \
+ "Subject: mail subject 2\r\n\r\nbody text\r\n)\r\n" + \
+ data.split()[0] + " OK FETCH completed\r\n"],
+ ["^[^ ]* EXAMINE INBOX",
+ "^[^ ]* FETCH 1:2 RFC822.header"],
+ lambda self: \
+ self.assertEquals(self.imap_account.get_mail_list_summary(end_index=2),
+ [('1', 'mail subject 1'),
+ ('2', 'mail subject 2')]))
+ test_func()
+
def test_get_new_mail_list(self):
test_func = self.make_test(\
[lambda data: "* 42 EXISTS\n* 1 RECENT\n* OK" + \