Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/dax/jmc.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Rousselie <dax@happycoders.org>2007-05-13 22:33:02 +0400
committerDavid Rousselie <dax@happycoders.org>2007-05-13 22:33:02 +0400
commit0f95cc4678fc7c8c0188c842027739fd77aaad4d (patch)
tree68efbbdc8539e7203a15717fb0fc3d9b8aa57224 /src/jmc/model/tests
parentcde3a9f16f6ef924a28e326e806ebafa0cb74e84 (diff)
Move unit tests in source folder
darcs-hash:20070513183302-86b55-98a5e67621ece44958f215e98ba1c92e32c4ea51.gz
Diffstat (limited to 'src/jmc/model/tests')
-rw-r--r--src/jmc/model/tests/__init__.py14
-rw-r--r--src/jmc/model/tests/account.py400
-rw-r--r--src/jmc/model/tests/email_generator.py66
-rw-r--r--src/jmc/model/tests/server.py209
4 files changed, 689 insertions, 0 deletions
diff --git a/src/jmc/model/tests/__init__.py b/src/jmc/model/tests/__init__.py
new file mode 100644
index 0000000..c0494a1
--- /dev/null
+++ b/src/jmc/model/tests/__init__.py
@@ -0,0 +1,14 @@
+"""JMC test module"""
+__revision__ = ""
+
+import unittest
+
+from jmc.model.tests import account
+
+def suite():
+ suite = unittest.TestSuite()
+ suite.addTest(account.suite())
+ return suite
+
+if __name__ == '__main__':
+ unittest.main(defaultTest='suite')
diff --git a/src/jmc/model/tests/account.py b/src/jmc/model/tests/account.py
new file mode 100644
index 0000000..e7ca4c9
--- /dev/null
+++ b/src/jmc/model/tests/account.py
@@ -0,0 +1,400 @@
+# -*- coding: utf-8 -*-
+##
+## test_account.py
+## Login : <dax@happycoders.org>
+## Started on Wed Feb 14 08:23:17 2007 David Rousselie
+## $Id$
+##
+## Copyright (C) 2007 David Rousselie
+## This program is free software; you can redistribute it and/or modify
+## it under the terms of the GNU General Public License as published by
+## the Free Software Foundation; either version 2 of the License, or
+## (at your option) any later version.
+##
+## This program is distributed in the hope that it will be useful,
+## but WITHOUT ANY WARRANTY; without even the implied warranty of
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+## GNU General Public License for more details.
+##
+## You should have received a copy of the GNU General Public License
+## along with this program; if not, write to the Free Software
+## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+##
+
+import unittest
+import os
+import thread
+import sys
+
+from sqlobject import *
+from sqlobject.dbconnection import TheURIOpener
+
+from jcl.model import account
+from jcl.model.account import Account, PresenceAccount
+from jmc.model.account import MailAccount, POP3Account, IMAPAccount
+
+from jcl.model.tests.account import PresenceAccount_TestCase
+from jmc.model.tests import email_generator, server
+
+if sys.platform == "win32":
+ DB_PATH = "/c|/temp/test.db"
+else:
+ DB_PATH = "/tmp/test.db"
+DB_URL = DB_PATH # + "?debug=1&debugThreading=1"
+
+class MailAccount_TestCase(PresenceAccount_TestCase):
+ def setUp(self):
+ if os.path.exists(DB_PATH):
+ os.unlink(DB_PATH)
+ account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
+ Account.createTable(ifNotExists = True)
+ PresenceAccount.createTable(ifNotExists = True)
+ MailAccount.createTable(ifNotExists = True)
+ self.account = MailAccount(user_jid = "user1@test.com", \
+ name = "account1", \
+ jid = "account1@jmc.test.com")
+ del account.hub.threadConnection
+ self.account_class = MailAccount
+
+ def tearDown(self):
+ account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
+ MailAccount.dropTable(ifExists = True)
+ PresenceAccount.dropTable(ifExists = True)
+ Account.dropTable(ifExists = True)
+ del TheURIOpener.cachedURIs['sqlite://' + DB_URL]
+ account.hub.threadConnection.close()
+ del account.hub.threadConnection
+ if os.path.exists(DB_PATH):
+ os.unlink(DB_PATH)
+
+ def make_test(email_type, tested_func, expected_res):
+ def inner(self):
+ encoded, multipart, header = email_type
+ email = email_generator.generate(encoded, \
+ multipart, \
+ header)
+ part = tested_func(self, email)
+ self.assertEquals(part, expected_res)
+ return inner
+
+ test_get_decoded_part_not_encoded = \
+ make_test((False, False, False), \
+ lambda self, email: self.account.get_decoded_part(email, None), \
+ u"Not encoded single part")
+
+ test_get_decoded_part_encoded = \
+ make_test((True, False, False), \
+ lambda self, email: self.account.get_decoded_part(email, None), \
+ u"Encoded single part with 'iso-8859-15' charset (éàê)")
+
+ test_format_message_summary_not_encoded = \
+ make_test((False, False, True), \
+ lambda self, email: self.account.format_message_summary(email), \
+ (u"From : not encoded from\nSubject : not encoded subject\n\n", \
+ u"not encoded from"))
+
+ test_format_message_summary_encoded = \
+ make_test((True, False, True), \
+ lambda self, email: self.account.format_message_summary(email), \
+ (u"From : encoded from (éàê)\nSubject : encoded subject " + \
+ u"(éàê)\n\n", \
+ u"encoded from (éàê)"))
+
+ test_format_message_summary_partial_encoded = \
+ make_test((True, False, True), \
+ lambda self, email: \
+ email.replace_header("Subject", \
+ "\" " + str(email["Subject"]) \
+ + " \" not encoded part") or \
+ email.replace_header("From", \
+ "\" " + str(email["From"]) \
+ + " \" not encoded part") or \
+ self.account.format_message_summary(email), \
+ (u"From : \"encoded from (éàê)\" not encoded part\nSubject " + \
+ u": \"encoded subject (éàê)\" not encoded part\n\n", \
+ u"\"encoded from (éàê)\" not encoded part"))
+
+ test_format_message_single_not_encoded = \
+ make_test((False, False, True), \
+ lambda self, email: self.account.format_message(email), \
+ (u"From : not encoded from\nSubject : not encoded subject" + \
+ u"\n\nNot encoded single part\n", \
+ u"not encoded from"))
+
+ test_format_message_single_encoded = \
+ make_test((True, False, True), \
+ lambda self, email: self.account.format_message(email), \
+ (u"From : encoded from (éàê)\nSubject : encoded subject " + \
+ u"(éàê)\n\nEncoded single part with 'iso-8859-15' charset" + \
+ u" (éàê)\n", \
+ u"encoded from (éàê)"))
+
+ test_format_message_multi_not_encoded = \
+ make_test((False, True, True), \
+ lambda self, email: self.account.format_message(email), \
+ (u"From : not encoded from\nSubject : not encoded subject" + \
+ u"\n\nNot encoded multipart1\nNot encoded multipart2\n", \
+ u"not encoded from"))
+
+ test_format_message_multi_encoded = \
+ make_test((True, True, True), \
+ lambda self, email: self.account.format_message(email), \
+ (u"From : encoded from (éàê)\nSubject : encoded subject (éà" + \
+ u"ê)\n\nutf-8 multipart1 with no charset (éàê)" + \
+ u"\nEncoded multipart2 with 'iso-8859-15' charset (éàê)\n" + \
+ u"Encoded multipart3 with no charset (éàê)\n", \
+ u"encoded from (éàê)"))
+
+ def test_get_register_fields(self):
+ register_fields = MailAccount.get_register_fields()
+ self.assertEquals(len(register_fields), 14)
+
+class POP3Account_TestCase(unittest.TestCase):
+ def setUp(self):
+ if os.path.exists(DB_PATH):
+ os.unlink(DB_PATH)
+ account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
+ Account.createTable(ifNotExists = True)
+ PresenceAccount.createTable(ifNotExists = True)
+ MailAccount.createTable(ifNotExists = True)
+ POP3Account.createTable(ifNotExists = True)
+ self.pop3_account = POP3Account(user_jid = "user1@test.com", \
+ name = "account1", \
+ jid = "account1@jmc.test.com", \
+ login = "login")
+ self.pop3_account.password = "pass"
+ self.pop3_account.host = "localhost"
+ self.pop3_account.port = 1110
+ self.pop3_account.ssl = False
+ del account.hub.threadConnection
+
+ def tearDown(self):
+ account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
+ POP3Account.dropTable(ifExists = True)
+ MailAccount.dropTable(ifExists = True)
+ PresenceAccount.dropTable(ifExists = True)
+ Account.dropTable(ifExists = True)
+ del TheURIOpener.cachedURIs['sqlite://' + DB_URL]
+ account.hub.threadConnection.close()
+ del account.hub.threadConnection
+ if os.path.exists(DB_PATH):
+ os.unlink(DB_PATH)
+ self.server = None
+ self.pop3_account = None
+
+ def make_test(responses = None, queries = None, core = None):
+ def inner(self):
+ self.server = server.DummyServer("localhost", 1110)
+ thread.start_new_thread(self.server.serve, ())
+ self.server.responses = ["+OK connected\r\n", \
+ "+OK name is a valid mailbox\r\n", \
+ "+OK pass\r\n"]
+ if responses:
+ self.server.responses += responses
+ self.server.queries = ["USER login\r\n", \
+ "PASS pass\r\n"]
+ if queries:
+ self.server.queries += queries
+ self.server.queries += ["QUIT\r\n"]
+ self.pop3_account.connect()
+ self.failUnless(self.pop3_account.connection, \
+ "Cannot establish connection")
+ if core:
+ account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
+ core(self)
+ del account.hub.threadConnection
+ self.pop3_account.disconnect()
+ self.failUnless(self.server.verify_queries(), \
+ "Sended queries does not match expected queries.")
+ return inner
+
+ test_connection = make_test()
+
+ test_get_mail_list = \
+ make_test(["+OK 2 20\r\n"], \
+ ["STAT\r\n"], \
+ lambda self: \
+ self.assertEquals(self.pop3_account.get_mail_list(), \
+ ["1", "2"]))
+
+ test_get_mail_summary = \
+ make_test(["+OK 10 octets\r\n" + \
+ "From: user@test.com\r\n" + \
+ "Subject: subject test\r\n\r\n" + \
+ "mymessage\r\n.\r\n",
+ "+OK\r\n"], \
+ ["RETR 1\r\n",
+ "RSET\r\n"], \
+ lambda self: \
+ self.assertEquals(self.pop3_account.get_mail_summary(1), \
+ (u"From : user@test.com\n" + \
+ u"Subject : subject test\n\n", \
+ u"user@test.com")))
+
+ test_get_mail = \
+ make_test(["+OK 10 octets\r\n" + \
+ "From: user@test.com\r\n" + \
+ "Subject: subject test\r\n\r\n" + \
+ "mymessage\r\n.\r\n",
+ "+OK\r\n"], \
+ ["RETR 1\r\n",
+ "RSET\r\n"], \
+ lambda self: \
+ self.assertEquals(self.pop3_account.get_mail(1), \
+ (u"From : user@test.com\n" + \
+ u"Subject : subject test\n\n" + \
+ u"mymessage\n", \
+ u"user@test.com")))
+
+ test_unsupported_reset_command_get_mail_summary = \
+ make_test(["+OK 10 octets\r\n" + \
+ "From: user@test.com\r\n" + \
+ "Subject: subject test\r\n\r\n" + \
+ "mymessage\r\n.\r\n",
+ "-ERR unknown command\r\n"], \
+ ["RETR 1\r\n",
+ "RSET\r\n"], \
+ lambda self: \
+ self.assertEquals(self.pop3_account.get_mail_summary(1), \
+ (u"From : user@test.com\n" + \
+ u"Subject : subject test\n\n", \
+ u"user@test.com")))
+
+ test_unsupported_reset_command_get_mail = \
+ make_test(["+OK 10 octets\r\n" + \
+ "From: user@test.com\r\n" + \
+ "Subject: subject test\r\n\r\n" + \
+ "mymessage\r\n.\r\n",
+ "-ERR unknown command\r\n"], \
+ ["RETR 1\r\n",
+ "RSET\r\n"], \
+ lambda self: \
+ self.assertEquals(self.pop3_account.get_mail(1), \
+ (u"From : user@test.com\n" + \
+ u"Subject : subject test\n\n" + \
+ u"mymessage\n", \
+ u"user@test.com")))
+
+ def test_get_register_fields(self):
+ register_fields = POP3Account.get_register_fields()
+ self.assertEquals(len(register_fields), 14)
+
+class IMAPAccount_TestCase(unittest.TestCase):
+ def setUp(self):
+ if os.path.exists(DB_PATH):
+ os.unlink(DB_PATH)
+ account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
+ Account.createTable(ifNotExists = True)
+ PresenceAccount.createTable(ifNotExists = True)
+ MailAccount.createTable(ifNotExists = True)
+ IMAPAccount.createTable(ifNotExists = True)
+ self.imap_account = IMAPAccount(user_jid = "user1@test.com", \
+ name = "account1", \
+ jid = "account1@jmc.test.com", \
+ login = "login")
+ self.imap_account.password = "pass"
+ self.imap_account.host = "localhost"
+ self.imap_account.port = 1143
+ self.imap_account.ssl = False
+ del account.hub.threadConnection
+
+ def tearDown(self):
+ account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
+ IMAPAccount.dropTable(ifExists = True)
+ MailAccount.dropTable(ifExists = True)
+ PresenceAccount.dropTable(ifExists = True)
+ Account.dropTable(ifExists = True)
+ del TheURIOpener.cachedURIs['sqlite://' + DB_URL]
+ account.hub.threadConnection.close()
+ del account.hub.threadConnection
+ if os.path.exists(DB_PATH):
+ os.unlink(DB_PATH)
+ self.server = None
+ self.imap_account = None
+
+ def make_test(responses = None, queries = None, core = None):
+ def inner(self):
+ self.server = server.DummyServer("localhost", 1143)
+ thread.start_new_thread(self.server.serve, ())
+ self.server.responses = ["* OK [CAPABILITY IMAP4 LOGIN-REFERRALS " + \
+ "AUTH=PLAIN]\n", \
+ lambda data: "* CAPABILITY IMAP4 " + \
+ "LOGIN-REFERRALS AUTH=PLAIN\n" + \
+ data.split()[0] + \
+ " OK CAPABILITY completed\n", \
+ lambda data: data.split()[0] + \
+ " OK LOGIN completed\n"]
+ if responses:
+ self.server.responses += responses
+ self.server.queries = ["^[^ ]* CAPABILITY", \
+ "^[^ ]* LOGIN login \"pass\""]
+ if queries:
+ self.server.queries += queries
+ self.server.queries += ["^[^ ]* LOGOUT"]
+ self.imap_account.connect()
+ self.failUnless(self.imap_account.connection, \
+ "Cannot establish connection")
+ if core:
+ account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL)
+ core(self)
+ del account.hub.threadConnection
+ self.imap_account.disconnect()
+ self.failUnless(self.server.verify_queries())
+ return inner
+
+ test_connection = make_test()
+
+ test_get_mail_list = make_test([lambda data: "* 42 EXISTS\n* 1 RECENT\n* OK" +\
+ " [UNSEEN 9]\n* FLAGS (\Deleted \Seen\*)\n*" +\
+ " OK [PERMANENTFLAGS (\Deleted \Seen\*)\n" + \
+ data.split()[0] + \
+ " OK [READ-WRITE] SELECT completed\n", \
+ lambda data: "* SEARCH 9 10 \n" + \
+ data.split()[0] + " OK SEARCH completed\n"], \
+ ["^[^ ]* SELECT INBOX", \
+ "^[^ ]* SEARCH RECENT"], \
+ lambda self: \
+ self.assertEquals(self.imap_account.get_mail_list(), ['9', '10']))
+
+ test_get_mail_summary = make_test([lambda data: "* 42 EXISTS\r\n* 1 RECENT\r\n* OK" +\
+ " [UNSEEN 9]\r\n* FLAGS (\Deleted \Seen\*)\r\n*" +\
+ " OK [PERMANENTFLAGS (\Deleted \Seen\*)\r\n" + \
+ data.split()[0] + \
+ " OK [READ-WRITE] SELECT completed\r\n", \
+ lambda data: "* 1 FETCH ((RFC822) {12}\r\nbody" + \
+ " text\r\n)\r\n" + \
+ data.split()[0] + " OK FETCH completed\r\n"], \
+ ["^[^ ]* EXAMINE INBOX", \
+ "^[^ ]* FETCH 1 \(RFC822\)"], \
+ lambda self: self.assertEquals(self.imap_account.get_mail_summary(1), \
+ (u"From : None\nSubject : None\n\n", \
+ u"None")))
+
+ test_get_mail = make_test([lambda data: "* 42 EXISTS\r\n* 1 RECENT\r\n* OK" +\
+ " [UNSEEN 9]\r\n* FLAGS (\Deleted \Seen\*)\r\n*" +\
+ " OK [PERMANENTFLAGS (\Deleted \Seen\*)\r\n" + \
+ data.split()[0] + \
+ " OK [READ-WRITE] SELECT completed\r\n", \
+ lambda data: "* 1 FETCH ((RFC822) {11}\r\nbody" + \
+ " text\r\n)\r\n" + \
+ data.split()[0] + " OK FETCH completed\r\n"], \
+ ["^[^ ]* EXAMINE INBOX", \
+ "^[^ ]* FETCH 1 \(RFC822\)",], \
+ lambda self: self.assertEquals(self.imap_account.get_mail(1), \
+ (u"From : None\nSubject : None\n\nbody text\r\n\n", \
+ u"None")))
+
+ def test_get_register_fields(self):
+ register_fields = IMAPAccount.get_register_fields()
+ self.assertEquals(len(register_fields), 15)
+
+
+def suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(MailAccount_TestCase, 'test'))
+ suite.addTest(unittest.makeSuite(POP3Account_TestCase, 'test'))
+ suite.addTest(unittest.makeSuite(IMAPAccount_TestCase, 'test'))
+ return suite
+
+if __name__ == '__main__':
+ unittest.main(defaultTest='suite')
diff --git a/src/jmc/model/tests/email_generator.py b/src/jmc/model/tests/email_generator.py
new file mode 100644
index 0000000..0af84dc
--- /dev/null
+++ b/src/jmc/model/tests/email_generator.py
@@ -0,0 +1,66 @@
+# -*- coding: utf-8 -*-
+## email_generator.py
+## Login : David Rousselie <david.rousselie@happycoders.org>
+## Started on Tue May 17 15:33:35 2005
+## $Id: email_generator.py,v 1.1 2005/07/11 20:39:31 dax Exp $
+##
+## Copyright (C) 2005
+## This program is free software; you can redistribute it and/or modify
+## it under the terms of the GNU General Public License as published by
+## the Free Software Foundation; either version 2 of the License, or
+## (at your option) any later version.
+##
+## This program is distributed in the hope that it will be useful,
+## but WITHOUT ANY WARRANTY; without even the implied warranty of
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+## GNU General Public License for more details.
+##
+## You should have received a copy of the GNU General Public License
+## along with this program; if not, write to the Free Software
+## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+##
+
+from email.Header import Header
+from email.MIMEText import MIMEText
+from email.MIMEMultipart import MIMEMultipart
+
+def _create_multipart(encoded):
+ msg = MIMEMultipart()
+ if encoded:
+ part1 = MIMEText("utf-8 multipart1 with no charset (éàê)", _charset = "")
+ msg.attach(part1)
+ part2 = MIMEText("Encoded multipart2 with 'iso-8859-15' charset (éàê)", \
+ _charset = "iso-8859-15")
+ msg.attach(part2)
+ part3 = MIMEText("Encoded multipart3 with no charset (éàê)", \
+ _charset = "")
+ msg.attach(part3)
+ else:
+ part1 = MIMEText("Not encoded multipart1")
+ msg.attach(part1)
+ part2 = MIMEText("Not encoded multipart2")
+ msg.attach(part2)
+ return msg
+
+def _create_singlepart(encoded):
+ if encoded:
+ return MIMEText("Encoded single part with 'iso-8859-15' charset (éàê)", \
+ _charset = "iso-8859-15")
+ else:
+ return MIMEText("Not encoded single part")
+
+def generate(encoded, multipart, header):
+ msg = None
+ if multipart:
+ msg = _create_multipart(encoded)
+ else:
+ msg = _create_singlepart(encoded)
+ if header:
+ if encoded:
+ msg['Subject'] = Header("encoded subject (éàê)", "iso-8859-15")
+ msg['From'] = Header("encoded from (éàê)", "iso-8859-15")
+ else:
+ msg['Subject'] = Header("not encoded subject")
+ msg['From'] = Header("not encoded from")
+ return msg
+
diff --git a/src/jmc/model/tests/server.py b/src/jmc/model/tests/server.py
new file mode 100644
index 0000000..1b939aa
--- /dev/null
+++ b/src/jmc/model/tests/server.py
@@ -0,0 +1,209 @@
+##
+## dummy_server.py
+## Login : David Rousselie <david.rousselie@happycoders.org>
+## Started on Fri May 13 12:53:17 2005
+## $Id: dummy_server.py,v 1.1 2005/07/11 20:39:31 dax Exp $
+##
+## Copyright (C) 2005
+## This program is free software; you can redistribute it and/or modify
+## it under the terms of the GNU General Public License as published by
+## the Free Software Foundation; either version 2 of the License, or
+## (at your option) any later version.
+##
+## This program is distributed in the hope that it will be useful,
+## but WITHOUT ANY WARRANTY; without even the implied warranty of
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+## GNU General Public License for more details.
+##
+## You should have received a copy of the GNU General Public License
+## along with this program; if not, write to the Free Software
+## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+##
+
+import sys
+import time
+import traceback
+import re
+import os
+import socket
+import types
+import select
+import xml.dom.minidom
+from pyxmpp import xmlextra
+
+def xmldiff(node1, node2):
+ if node1.nodeType == node1.TEXT_NODE:
+ if not node2.nodeType == node2.TEXT_NODE \
+ or re.compile(node2.data + "$").match(node1.data) is None:
+ raise Exception("data in text node " + node1.data + " does not match " + node2.data)
+ elif node1.nodeType == node1.DOCUMENT_NODE:
+ if not node2.nodeType == node2.DOCUMENT_NODE:
+ raise Exception("node1 is Document but not node2 (" + node2.nodeType + ")")
+ elif node1.tagName != node2.tagName:
+ raise Exception("Different tag name : " + node1.tagName + " != " + node2.tagName)
+ else:
+ for attr in node1._get_attributes().keys():
+ if not node2.hasAttribute(attr) \
+ or node1.getAttribute(attr) != node2.getAttribute(attr):
+ raise Exception("(" + node1.tagName + ") Different attributes : " + node1.getAttribute(attr) + " != " + node2.getAttribute(attr))
+ if len(node1.childNodes) != len(node2.childNodes):
+ raise Exception("(" + node1.tagName + ") Different children number : " + str(len(node1.childNodes)) + " != " + str(len(node2.childNodes)))
+ for i in range(len(node1.childNodes)):
+ xmldiff(node1.childNodes[i], node2.childNodes[i])
+
+class DummyServer:
+ def __init__(self, host, port, responses = None):
+ for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, socket.AI_PASSIVE):
+ af, socktype, proto, canonname, sa = res
+ try:
+ s = socket.socket(af, socktype, proto)
+ except socket.error, msg:
+ print >>sys.stderr, msg
+ s = None
+ raise socket.error
+ try:
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ s.bind(sa)
+ s.listen(1)
+ except socket.error, msg:
+ print >>sys.stderr, msg
+ s.close()
+ s = None
+ raise socket.error
+ break
+ self.socket = s
+ self.responses = None
+ self.queries = None
+ self.real_queries = []
+
+ def serve(self):
+ conn = None
+ try:
+ conn, addr = self.socket.accept()
+ rfile = conn.makefile('rb', -1)
+ if self.responses:
+ data = None
+ for idx in range(len(self.responses)):
+ # if response is a function apply it (it must return a string)
+ # it is given previous received data
+ if isinstance(self.responses[idx], types.FunctionType):
+ response = self.responses[idx](data)
+ else:
+ response = self.responses[idx]
+ if response is not None:
+ #print >>sys.stderr, 'Sending : ', response
+ conn.send(response)
+ data = rfile.readline()
+ if not data:
+ break
+ else:
+ self.real_queries.append(data)
+ #print >>sys.stderr, 'Receive : ', data
+ if conn is not None:
+ conn.close()
+ if self.socket is not None:
+ self.socket.close()
+ self.socket = None
+ except:
+ type, value, stack = sys.exc_info()
+ print >>sys.stderr, "".join(traceback.format_exception
+ (type, value, stack, 5))
+
+ def verify_queries(self):
+ result = True
+ queries_len = len(self.queries)
+ if queries_len == len(self.real_queries):
+ for idx in range(queries_len):
+ real_query = self.real_queries[idx]
+ match = (re.compile(self.queries[idx], re.M).match(real_query) is not None)
+ if not match:
+ result = False
+ print >>sys.stderr, "Unexpected query :\n" + \
+ "Expected query : _" + self.queries[idx] + "_\n" + \
+ "Receive query : _" + real_query + "_\n"
+ else:
+ result = False
+ print >>sys.stderr, "Expected " + str(queries_len) + \
+ " queries, got " + str(len(self.real_queries)) + \
+ "\t" + str(self.real_queries)
+ return result
+
+class XMLDummyServer(DummyServer):
+ def __init__(self, host, port, responses, stream_handler):
+ DummyServer.__init__(self, host, port, responses)
+ self._reader = xmlextra.StreamReader(stream_handler)
+
+ def serve(self):
+ try:
+ conn, addr = self.socket.accept()
+ if self.responses:
+ data = None
+ for idx in range(len(self.responses)):
+ try:
+ # TODO : this approximation is not clean
+ # received size is based on the expected size in self.queries
+ data = conn.recv(1024 + len(self.queries[idx]))
+# print "receive : " + data
+ if data:
+ ## TODO : without this log, test_set_register in test_component wait forever
+ #print "-----------RECEIVE1 " + data
+ r = self._reader.feed(data)
+ except:
+ type, value, stack = sys.exc_info()
+ print "".join (traceback.format_exception
+ (type, value, stack, 5))
+ raise
+ # TODO verify got all data </stream>
+ if data:
+ self.real_queries.append(data)
+ # if response is a function apply it (it must return a string)
+ # it is given previous received data
+ if isinstance(self.responses[idx], types.FunctionType):
+ response = self.responses[idx](data)
+ else:
+ response = self.responses[idx]
+ if response is not None:
+# print >>sys.stderr, '---------SENDING : ', response
+ conn.send(response)
+ data = conn.recv(1024)
+ if data:
+# print "-----------RECEIVE2 " + data
+ r = self._reader.feed(data)
+ self.real_queries.append(data)
+ conn.close()
+ self.socket.close()
+ self.socket = None
+ except:
+ type, value, stack = sys.exc_info()
+ print "".join (traceback.format_exception
+ (type, value, stack, 5))
+ raise
+
+ def verify_queries(self):
+ result = True
+ full_real_queries = ""
+ full_recv_queries = ""
+ for idx in range(len(self.real_queries)):
+ full_real_queries += self.real_queries[idx].rstrip(os.linesep)
+ for idx in range(len(self.queries)):
+ full_recv_queries += self.queries[idx].rstrip(os.linesep)
+ # Do not receive it but add it so that xml parsing can succeed
+ #full_real_queries += "</stream:stream>"
+ real_query = xml.dom.minidom.parseString(full_real_queries)
+ recv_query = xml.dom.minidom.parseString(full_recv_queries)
+ try:
+ utils.xmldiff(real_query, recv_query)
+ except Exception, msg:
+ result = False
+ print >>sys.stderr, msg
+ return result
+
+def test():
+ server = DummyServer(("localhost", 4242))
+ server.responses = ["rep1\n", "rep2\n"]
+ server.serve()
+
+if __name__ == '__main__':
+ test()
+
+