From 0f95cc4678fc7c8c0188c842027739fd77aaad4d Mon Sep 17 00:00:00 2001 From: David Rousselie Date: Sun, 13 May 2007 20:33:02 +0200 Subject: Move unit tests in source folder darcs-hash:20070513183302-86b55-98a5e67621ece44958f215e98ba1c92e32c4ea51.gz --- src/jmc/model/tests/__init__.py | 14 ++ src/jmc/model/tests/account.py | 400 +++++++++++++++++++++++++++++++++ src/jmc/model/tests/email_generator.py | 66 ++++++ src/jmc/model/tests/server.py | 209 +++++++++++++++++ 4 files changed, 689 insertions(+) create mode 100644 src/jmc/model/tests/__init__.py create mode 100644 src/jmc/model/tests/account.py create mode 100644 src/jmc/model/tests/email_generator.py create mode 100644 src/jmc/model/tests/server.py (limited to 'src/jmc/model/tests') diff --git a/src/jmc/model/tests/__init__.py b/src/jmc/model/tests/__init__.py new file mode 100644 index 0000000..c0494a1 --- /dev/null +++ b/src/jmc/model/tests/__init__.py @@ -0,0 +1,14 @@ +"""JMC test module""" +__revision__ = "" + +import unittest + +from jmc.model.tests import account + +def suite(): + suite = unittest.TestSuite() + suite.addTest(account.suite()) + return suite + +if __name__ == '__main__': + unittest.main(defaultTest='suite') diff --git a/src/jmc/model/tests/account.py b/src/jmc/model/tests/account.py new file mode 100644 index 0000000..e7ca4c9 --- /dev/null +++ b/src/jmc/model/tests/account.py @@ -0,0 +1,400 @@ +# -*- coding: utf-8 -*- +## +## test_account.py +## Login : +## Started on Wed Feb 14 08:23:17 2007 David Rousselie +## $Id$ +## +## Copyright (C) 2007 David Rousselie +## This program is free software; you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published by +## the Free Software Foundation; either version 2 of the License, or +## (at your option) any later version. +## +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. +## +## You should have received a copy of the GNU General Public License +## along with this program; if not, write to the Free Software +## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +## + +import unittest +import os +import thread +import sys + +from sqlobject import * +from sqlobject.dbconnection import TheURIOpener + +from jcl.model import account +from jcl.model.account import Account, PresenceAccount +from jmc.model.account import MailAccount, POP3Account, IMAPAccount + +from jcl.model.tests.account import PresenceAccount_TestCase +from jmc.model.tests import email_generator, server + +if sys.platform == "win32": + DB_PATH = "/c|/temp/test.db" +else: + DB_PATH = "/tmp/test.db" +DB_URL = DB_PATH # + "?debug=1&debugThreading=1" + +class MailAccount_TestCase(PresenceAccount_TestCase): + def setUp(self): + if os.path.exists(DB_PATH): + os.unlink(DB_PATH) + account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL) + Account.createTable(ifNotExists = True) + PresenceAccount.createTable(ifNotExists = True) + MailAccount.createTable(ifNotExists = True) + self.account = MailAccount(user_jid = "user1@test.com", \ + name = "account1", \ + jid = "account1@jmc.test.com") + del account.hub.threadConnection + self.account_class = MailAccount + + def tearDown(self): + account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL) + MailAccount.dropTable(ifExists = True) + PresenceAccount.dropTable(ifExists = True) + Account.dropTable(ifExists = True) + del TheURIOpener.cachedURIs['sqlite://' + DB_URL] + account.hub.threadConnection.close() + del account.hub.threadConnection + if os.path.exists(DB_PATH): + os.unlink(DB_PATH) + + def make_test(email_type, tested_func, expected_res): + def inner(self): + encoded, multipart, header = email_type + email = email_generator.generate(encoded, \ + multipart, \ + header) + part = tested_func(self, email) + self.assertEquals(part, expected_res) + return inner + + test_get_decoded_part_not_encoded = \ + make_test((False, False, False), \ + lambda self, email: self.account.get_decoded_part(email, None), \ + u"Not encoded single part") + + test_get_decoded_part_encoded = \ + make_test((True, False, False), \ + lambda self, email: self.account.get_decoded_part(email, None), \ + u"Encoded single part with 'iso-8859-15' charset (éàê)") + + test_format_message_summary_not_encoded = \ + make_test((False, False, True), \ + lambda self, email: self.account.format_message_summary(email), \ + (u"From : not encoded from\nSubject : not encoded subject\n\n", \ + u"not encoded from")) + + test_format_message_summary_encoded = \ + make_test((True, False, True), \ + lambda self, email: self.account.format_message_summary(email), \ + (u"From : encoded from (éàê)\nSubject : encoded subject " + \ + u"(éàê)\n\n", \ + u"encoded from (éàê)")) + + test_format_message_summary_partial_encoded = \ + make_test((True, False, True), \ + lambda self, email: \ + email.replace_header("Subject", \ + "\" " + str(email["Subject"]) \ + + " \" not encoded part") or \ + email.replace_header("From", \ + "\" " + str(email["From"]) \ + + " \" not encoded part") or \ + self.account.format_message_summary(email), \ + (u"From : \"encoded from (éàê)\" not encoded part\nSubject " + \ + u": \"encoded subject (éàê)\" not encoded part\n\n", \ + u"\"encoded from (éàê)\" not encoded part")) + + test_format_message_single_not_encoded = \ + make_test((False, False, True), \ + lambda self, email: self.account.format_message(email), \ + (u"From : not encoded from\nSubject : not encoded subject" + \ + u"\n\nNot encoded single part\n", \ + u"not encoded from")) + + test_format_message_single_encoded = \ + make_test((True, False, True), \ + lambda self, email: self.account.format_message(email), \ + (u"From : encoded from (éàê)\nSubject : encoded subject " + \ + u"(éàê)\n\nEncoded single part with 'iso-8859-15' charset" + \ + u" (éàê)\n", \ + u"encoded from (éàê)")) + + test_format_message_multi_not_encoded = \ + make_test((False, True, True), \ + lambda self, email: self.account.format_message(email), \ + (u"From : not encoded from\nSubject : not encoded subject" + \ + u"\n\nNot encoded multipart1\nNot encoded multipart2\n", \ + u"not encoded from")) + + test_format_message_multi_encoded = \ + make_test((True, True, True), \ + lambda self, email: self.account.format_message(email), \ + (u"From : encoded from (éàê)\nSubject : encoded subject (éà" + \ + u"ê)\n\nutf-8 multipart1 with no charset (éàê)" + \ + u"\nEncoded multipart2 with 'iso-8859-15' charset (éàê)\n" + \ + u"Encoded multipart3 with no charset (éàê)\n", \ + u"encoded from (éàê)")) + + def test_get_register_fields(self): + register_fields = MailAccount.get_register_fields() + self.assertEquals(len(register_fields), 14) + +class POP3Account_TestCase(unittest.TestCase): + def setUp(self): + if os.path.exists(DB_PATH): + os.unlink(DB_PATH) + account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL) + Account.createTable(ifNotExists = True) + PresenceAccount.createTable(ifNotExists = True) + MailAccount.createTable(ifNotExists = True) + POP3Account.createTable(ifNotExists = True) + self.pop3_account = POP3Account(user_jid = "user1@test.com", \ + name = "account1", \ + jid = "account1@jmc.test.com", \ + login = "login") + self.pop3_account.password = "pass" + self.pop3_account.host = "localhost" + self.pop3_account.port = 1110 + self.pop3_account.ssl = False + del account.hub.threadConnection + + def tearDown(self): + account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL) + POP3Account.dropTable(ifExists = True) + MailAccount.dropTable(ifExists = True) + PresenceAccount.dropTable(ifExists = True) + Account.dropTable(ifExists = True) + del TheURIOpener.cachedURIs['sqlite://' + DB_URL] + account.hub.threadConnection.close() + del account.hub.threadConnection + if os.path.exists(DB_PATH): + os.unlink(DB_PATH) + self.server = None + self.pop3_account = None + + def make_test(responses = None, queries = None, core = None): + def inner(self): + self.server = server.DummyServer("localhost", 1110) + thread.start_new_thread(self.server.serve, ()) + self.server.responses = ["+OK connected\r\n", \ + "+OK name is a valid mailbox\r\n", \ + "+OK pass\r\n"] + if responses: + self.server.responses += responses + self.server.queries = ["USER login\r\n", \ + "PASS pass\r\n"] + if queries: + self.server.queries += queries + self.server.queries += ["QUIT\r\n"] + self.pop3_account.connect() + self.failUnless(self.pop3_account.connection, \ + "Cannot establish connection") + if core: + account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL) + core(self) + del account.hub.threadConnection + self.pop3_account.disconnect() + self.failUnless(self.server.verify_queries(), \ + "Sended queries does not match expected queries.") + return inner + + test_connection = make_test() + + test_get_mail_list = \ + make_test(["+OK 2 20\r\n"], \ + ["STAT\r\n"], \ + lambda self: \ + self.assertEquals(self.pop3_account.get_mail_list(), \ + ["1", "2"])) + + test_get_mail_summary = \ + make_test(["+OK 10 octets\r\n" + \ + "From: user@test.com\r\n" + \ + "Subject: subject test\r\n\r\n" + \ + "mymessage\r\n.\r\n", + "+OK\r\n"], \ + ["RETR 1\r\n", + "RSET\r\n"], \ + lambda self: \ + self.assertEquals(self.pop3_account.get_mail_summary(1), \ + (u"From : user@test.com\n" + \ + u"Subject : subject test\n\n", \ + u"user@test.com"))) + + test_get_mail = \ + make_test(["+OK 10 octets\r\n" + \ + "From: user@test.com\r\n" + \ + "Subject: subject test\r\n\r\n" + \ + "mymessage\r\n.\r\n", + "+OK\r\n"], \ + ["RETR 1\r\n", + "RSET\r\n"], \ + lambda self: \ + self.assertEquals(self.pop3_account.get_mail(1), \ + (u"From : user@test.com\n" + \ + u"Subject : subject test\n\n" + \ + u"mymessage\n", \ + u"user@test.com"))) + + test_unsupported_reset_command_get_mail_summary = \ + make_test(["+OK 10 octets\r\n" + \ + "From: user@test.com\r\n" + \ + "Subject: subject test\r\n\r\n" + \ + "mymessage\r\n.\r\n", + "-ERR unknown command\r\n"], \ + ["RETR 1\r\n", + "RSET\r\n"], \ + lambda self: \ + self.assertEquals(self.pop3_account.get_mail_summary(1), \ + (u"From : user@test.com\n" + \ + u"Subject : subject test\n\n", \ + u"user@test.com"))) + + test_unsupported_reset_command_get_mail = \ + make_test(["+OK 10 octets\r\n" + \ + "From: user@test.com\r\n" + \ + "Subject: subject test\r\n\r\n" + \ + "mymessage\r\n.\r\n", + "-ERR unknown command\r\n"], \ + ["RETR 1\r\n", + "RSET\r\n"], \ + lambda self: \ + self.assertEquals(self.pop3_account.get_mail(1), \ + (u"From : user@test.com\n" + \ + u"Subject : subject test\n\n" + \ + u"mymessage\n", \ + u"user@test.com"))) + + def test_get_register_fields(self): + register_fields = POP3Account.get_register_fields() + self.assertEquals(len(register_fields), 14) + +class IMAPAccount_TestCase(unittest.TestCase): + def setUp(self): + if os.path.exists(DB_PATH): + os.unlink(DB_PATH) + account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL) + Account.createTable(ifNotExists = True) + PresenceAccount.createTable(ifNotExists = True) + MailAccount.createTable(ifNotExists = True) + IMAPAccount.createTable(ifNotExists = True) + self.imap_account = IMAPAccount(user_jid = "user1@test.com", \ + name = "account1", \ + jid = "account1@jmc.test.com", \ + login = "login") + self.imap_account.password = "pass" + self.imap_account.host = "localhost" + self.imap_account.port = 1143 + self.imap_account.ssl = False + del account.hub.threadConnection + + def tearDown(self): + account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL) + IMAPAccount.dropTable(ifExists = True) + MailAccount.dropTable(ifExists = True) + PresenceAccount.dropTable(ifExists = True) + Account.dropTable(ifExists = True) + del TheURIOpener.cachedURIs['sqlite://' + DB_URL] + account.hub.threadConnection.close() + del account.hub.threadConnection + if os.path.exists(DB_PATH): + os.unlink(DB_PATH) + self.server = None + self.imap_account = None + + def make_test(responses = None, queries = None, core = None): + def inner(self): + self.server = server.DummyServer("localhost", 1143) + thread.start_new_thread(self.server.serve, ()) + self.server.responses = ["* OK [CAPABILITY IMAP4 LOGIN-REFERRALS " + \ + "AUTH=PLAIN]\n", \ + lambda data: "* CAPABILITY IMAP4 " + \ + "LOGIN-REFERRALS AUTH=PLAIN\n" + \ + data.split()[0] + \ + " OK CAPABILITY completed\n", \ + lambda data: data.split()[0] + \ + " OK LOGIN completed\n"] + if responses: + self.server.responses += responses + self.server.queries = ["^[^ ]* CAPABILITY", \ + "^[^ ]* LOGIN login \"pass\""] + if queries: + self.server.queries += queries + self.server.queries += ["^[^ ]* LOGOUT"] + self.imap_account.connect() + self.failUnless(self.imap_account.connection, \ + "Cannot establish connection") + if core: + account.hub.threadConnection = connectionForURI('sqlite://' + DB_URL) + core(self) + del account.hub.threadConnection + self.imap_account.disconnect() + self.failUnless(self.server.verify_queries()) + return inner + + test_connection = make_test() + + test_get_mail_list = make_test([lambda data: "* 42 EXISTS\n* 1 RECENT\n* OK" +\ + " [UNSEEN 9]\n* FLAGS (\Deleted \Seen\*)\n*" +\ + " OK [PERMANENTFLAGS (\Deleted \Seen\*)\n" + \ + data.split()[0] + \ + " OK [READ-WRITE] SELECT completed\n", \ + lambda data: "* SEARCH 9 10 \n" + \ + data.split()[0] + " OK SEARCH completed\n"], \ + ["^[^ ]* SELECT INBOX", \ + "^[^ ]* SEARCH RECENT"], \ + lambda self: \ + self.assertEquals(self.imap_account.get_mail_list(), ['9', '10'])) + + test_get_mail_summary = make_test([lambda data: "* 42 EXISTS\r\n* 1 RECENT\r\n* OK" +\ + " [UNSEEN 9]\r\n* FLAGS (\Deleted \Seen\*)\r\n*" +\ + " OK [PERMANENTFLAGS (\Deleted \Seen\*)\r\n" + \ + data.split()[0] + \ + " OK [READ-WRITE] SELECT completed\r\n", \ + lambda data: "* 1 FETCH ((RFC822) {12}\r\nbody" + \ + " text\r\n)\r\n" + \ + data.split()[0] + " OK FETCH completed\r\n"], \ + ["^[^ ]* EXAMINE INBOX", \ + "^[^ ]* FETCH 1 \(RFC822\)"], \ + lambda self: self.assertEquals(self.imap_account.get_mail_summary(1), \ + (u"From : None\nSubject : None\n\n", \ + u"None"))) + + test_get_mail = make_test([lambda data: "* 42 EXISTS\r\n* 1 RECENT\r\n* OK" +\ + " [UNSEEN 9]\r\n* FLAGS (\Deleted \Seen\*)\r\n*" +\ + " OK [PERMANENTFLAGS (\Deleted \Seen\*)\r\n" + \ + data.split()[0] + \ + " OK [READ-WRITE] SELECT completed\r\n", \ + lambda data: "* 1 FETCH ((RFC822) {11}\r\nbody" + \ + " text\r\n)\r\n" + \ + data.split()[0] + " OK FETCH completed\r\n"], \ + ["^[^ ]* EXAMINE INBOX", \ + "^[^ ]* FETCH 1 \(RFC822\)",], \ + lambda self: self.assertEquals(self.imap_account.get_mail(1), \ + (u"From : None\nSubject : None\n\nbody text\r\n\n", \ + u"None"))) + + def test_get_register_fields(self): + register_fields = IMAPAccount.get_register_fields() + self.assertEquals(len(register_fields), 15) + + +def suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(MailAccount_TestCase, 'test')) + suite.addTest(unittest.makeSuite(POP3Account_TestCase, 'test')) + suite.addTest(unittest.makeSuite(IMAPAccount_TestCase, 'test')) + return suite + +if __name__ == '__main__': + unittest.main(defaultTest='suite') diff --git a/src/jmc/model/tests/email_generator.py b/src/jmc/model/tests/email_generator.py new file mode 100644 index 0000000..0af84dc --- /dev/null +++ b/src/jmc/model/tests/email_generator.py @@ -0,0 +1,66 @@ +# -*- coding: utf-8 -*- +## email_generator.py +## Login : David Rousselie +## Started on Tue May 17 15:33:35 2005 +## $Id: email_generator.py,v 1.1 2005/07/11 20:39:31 dax Exp $ +## +## Copyright (C) 2005 +## This program is free software; you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published by +## the Free Software Foundation; either version 2 of the License, or +## (at your option) any later version. +## +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. +## +## You should have received a copy of the GNU General Public License +## along with this program; if not, write to the Free Software +## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +## + +from email.Header import Header +from email.MIMEText import MIMEText +from email.MIMEMultipart import MIMEMultipart + +def _create_multipart(encoded): + msg = MIMEMultipart() + if encoded: + part1 = MIMEText("utf-8 multipart1 with no charset (éàê)", _charset = "") + msg.attach(part1) + part2 = MIMEText("Encoded multipart2 with 'iso-8859-15' charset (éàê)", \ + _charset = "iso-8859-15") + msg.attach(part2) + part3 = MIMEText("Encoded multipart3 with no charset (éàê)", \ + _charset = "") + msg.attach(part3) + else: + part1 = MIMEText("Not encoded multipart1") + msg.attach(part1) + part2 = MIMEText("Not encoded multipart2") + msg.attach(part2) + return msg + +def _create_singlepart(encoded): + if encoded: + return MIMEText("Encoded single part with 'iso-8859-15' charset (éàê)", \ + _charset = "iso-8859-15") + else: + return MIMEText("Not encoded single part") + +def generate(encoded, multipart, header): + msg = None + if multipart: + msg = _create_multipart(encoded) + else: + msg = _create_singlepart(encoded) + if header: + if encoded: + msg['Subject'] = Header("encoded subject (éàê)", "iso-8859-15") + msg['From'] = Header("encoded from (éàê)", "iso-8859-15") + else: + msg['Subject'] = Header("not encoded subject") + msg['From'] = Header("not encoded from") + return msg + diff --git a/src/jmc/model/tests/server.py b/src/jmc/model/tests/server.py new file mode 100644 index 0000000..1b939aa --- /dev/null +++ b/src/jmc/model/tests/server.py @@ -0,0 +1,209 @@ +## +## dummy_server.py +## Login : David Rousselie +## Started on Fri May 13 12:53:17 2005 +## $Id: dummy_server.py,v 1.1 2005/07/11 20:39:31 dax Exp $ +## +## Copyright (C) 2005 +## This program is free software; you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published by +## the Free Software Foundation; either version 2 of the License, or +## (at your option) any later version. +## +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. +## +## You should have received a copy of the GNU General Public License +## along with this program; if not, write to the Free Software +## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +## + +import sys +import time +import traceback +import re +import os +import socket +import types +import select +import xml.dom.minidom +from pyxmpp import xmlextra + +def xmldiff(node1, node2): + if node1.nodeType == node1.TEXT_NODE: + if not node2.nodeType == node2.TEXT_NODE \ + or re.compile(node2.data + "$").match(node1.data) is None: + raise Exception("data in text node " + node1.data + " does not match " + node2.data) + elif node1.nodeType == node1.DOCUMENT_NODE: + if not node2.nodeType == node2.DOCUMENT_NODE: + raise Exception("node1 is Document but not node2 (" + node2.nodeType + ")") + elif node1.tagName != node2.tagName: + raise Exception("Different tag name : " + node1.tagName + " != " + node2.tagName) + else: + for attr in node1._get_attributes().keys(): + if not node2.hasAttribute(attr) \ + or node1.getAttribute(attr) != node2.getAttribute(attr): + raise Exception("(" + node1.tagName + ") Different attributes : " + node1.getAttribute(attr) + " != " + node2.getAttribute(attr)) + if len(node1.childNodes) != len(node2.childNodes): + raise Exception("(" + node1.tagName + ") Different children number : " + str(len(node1.childNodes)) + " != " + str(len(node2.childNodes))) + for i in range(len(node1.childNodes)): + xmldiff(node1.childNodes[i], node2.childNodes[i]) + +class DummyServer: + def __init__(self, host, port, responses = None): + for res in socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, socket.AI_PASSIVE): + af, socktype, proto, canonname, sa = res + try: + s = socket.socket(af, socktype, proto) + except socket.error, msg: + print >>sys.stderr, msg + s = None + raise socket.error + try: + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + s.bind(sa) + s.listen(1) + except socket.error, msg: + print >>sys.stderr, msg + s.close() + s = None + raise socket.error + break + self.socket = s + self.responses = None + self.queries = None + self.real_queries = [] + + def serve(self): + conn = None + try: + conn, addr = self.socket.accept() + rfile = conn.makefile('rb', -1) + if self.responses: + data = None + for idx in range(len(self.responses)): + # if response is a function apply it (it must return a string) + # it is given previous received data + if isinstance(self.responses[idx], types.FunctionType): + response = self.responses[idx](data) + else: + response = self.responses[idx] + if response is not None: + #print >>sys.stderr, 'Sending : ', response + conn.send(response) + data = rfile.readline() + if not data: + break + else: + self.real_queries.append(data) + #print >>sys.stderr, 'Receive : ', data + if conn is not None: + conn.close() + if self.socket is not None: + self.socket.close() + self.socket = None + except: + type, value, stack = sys.exc_info() + print >>sys.stderr, "".join(traceback.format_exception + (type, value, stack, 5)) + + def verify_queries(self): + result = True + queries_len = len(self.queries) + if queries_len == len(self.real_queries): + for idx in range(queries_len): + real_query = self.real_queries[idx] + match = (re.compile(self.queries[idx], re.M).match(real_query) is not None) + if not match: + result = False + print >>sys.stderr, "Unexpected query :\n" + \ + "Expected query : _" + self.queries[idx] + "_\n" + \ + "Receive query : _" + real_query + "_\n" + else: + result = False + print >>sys.stderr, "Expected " + str(queries_len) + \ + " queries, got " + str(len(self.real_queries)) + \ + "\t" + str(self.real_queries) + return result + +class XMLDummyServer(DummyServer): + def __init__(self, host, port, responses, stream_handler): + DummyServer.__init__(self, host, port, responses) + self._reader = xmlextra.StreamReader(stream_handler) + + def serve(self): + try: + conn, addr = self.socket.accept() + if self.responses: + data = None + for idx in range(len(self.responses)): + try: + # TODO : this approximation is not clean + # received size is based on the expected size in self.queries + data = conn.recv(1024 + len(self.queries[idx])) +# print "receive : " + data + if data: + ## TODO : without this log, test_set_register in test_component wait forever + #print "-----------RECEIVE1 " + data + r = self._reader.feed(data) + except: + type, value, stack = sys.exc_info() + print "".join (traceback.format_exception + (type, value, stack, 5)) + raise + # TODO verify got all data + if data: + self.real_queries.append(data) + # if response is a function apply it (it must return a string) + # it is given previous received data + if isinstance(self.responses[idx], types.FunctionType): + response = self.responses[idx](data) + else: + response = self.responses[idx] + if response is not None: +# print >>sys.stderr, '---------SENDING : ', response + conn.send(response) + data = conn.recv(1024) + if data: +# print "-----------RECEIVE2 " + data + r = self._reader.feed(data) + self.real_queries.append(data) + conn.close() + self.socket.close() + self.socket = None + except: + type, value, stack = sys.exc_info() + print "".join (traceback.format_exception + (type, value, stack, 5)) + raise + + def verify_queries(self): + result = True + full_real_queries = "" + full_recv_queries = "" + for idx in range(len(self.real_queries)): + full_real_queries += self.real_queries[idx].rstrip(os.linesep) + for idx in range(len(self.queries)): + full_recv_queries += self.queries[idx].rstrip(os.linesep) + # Do not receive it but add it so that xml parsing can succeed + #full_real_queries += "" + real_query = xml.dom.minidom.parseString(full_real_queries) + recv_query = xml.dom.minidom.parseString(full_recv_queries) + try: + utils.xmldiff(real_query, recv_query) + except Exception, msg: + result = False + print >>sys.stderr, msg + return result + +def test(): + server = DummyServer(("localhost", 4242)) + server.responses = ["rep1\n", "rep2\n"] + server.serve() + +if __name__ == '__main__': + test() + + -- cgit v1.2.3