Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/Jajcus/pyxmpp.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorJacek Konieczny <jajcus@jajcus.net>2005-01-03 01:08:49 +0300
committerJacek Konieczny <jajcus@jajcus.net>2005-01-03 01:08:49 +0300
commit6bd68ceb2707890093235f149f2c9052de10cca2 (patch)
treea1fd9a4c8c057dd211cda83e0c884378b81c32ed /tests
parenta101aeccdfc673f73acf98c4d49c3f48a42235b0 (diff)
- cache for things like Service Discovery replies. Still needs some testing.
Diffstat (limited to 'tests')
-rwxr-xr-xtests/cache.py260
1 files changed, 260 insertions, 0 deletions
diff --git a/tests/cache.py b/tests/cache.py
new file mode 100755
index 0000000..5540297
--- /dev/null
+++ b/tests/cache.py
@@ -0,0 +1,260 @@
+#!/usr/bin/python
+# -*- coding: UTF-8 -*-
+
+import unittest
+import libxml2
+import threading
+
+from pyxmpp.cache import Cache,CacheSuite,CacheFetcher,CacheItem
+from datetime import timedelta,datetime
+from time import sleep
+
+class TestClass1:
+ def __init__(self,adr):
+ self.adr=adr
+ def __repr__(self):
+ return "<TestClass1 %r>" % (self.adr,)
+
+class TestClass2:
+ def __init__(self,adr):
+ self.adr=adr
+ def __repr__(self):
+ return "<TestClass2 %r>" % (self.adr,)
+
+def sec(x):
+ return timedelta(seconds=x)
+
+class TestCacheItem(unittest.TestCase):
+ def test_item(self):
+ pre = datetime.utcnow()
+ o = TestClass1("test_addr")
+ item = CacheItem("test_addr",o, sec(0.1), sec(0.2), sec(0.3))
+ post = datetime.utcnow()
+
+ self.failUnless(item.timestamp >= pre and item.timestamp <= post)
+ self.failUnless(item.value is o)
+ self.failUnless(item.address == "test_addr")
+
+ def test_item_state_transitions(self):
+ item = CacheItem("test_addr","test_value", sec(0.1), sec(0.2), sec(0.3))
+
+ # t+0
+ self.failUnlessEqual(item.state,"new")
+ state = item.update_state()
+ self.failUnlessEqual(state,item.state)
+ self.failUnlessEqual(item.state,"fresh")
+
+ sleep(0.01)
+ # t+~0.01
+ state = item.update_state()
+ self.failUnlessEqual(state,item.state)
+ self.failUnlessEqual(item.state,"fresh")
+
+ sleep(0.1)
+ # t+~0.11
+ state = item.update_state()
+ self.failUnlessEqual(state,item.state)
+ self.failUnlessEqual(item.state,"old")
+
+ sleep(0.01)
+ # t+~0.12
+ state = item.update_state()
+ self.failUnlessEqual(state,item.state)
+ self.failUnlessEqual(item.state,"old")
+
+ sleep(0.1)
+ # t+~0.22
+ state = item.update_state()
+ self.failUnlessEqual(state,item.state)
+ self.failUnlessEqual(item.state,"stale")
+
+ sleep(0.01)
+ # t+~0.23
+ state = item.update_state()
+ self.failUnlessEqual(state,item.state)
+ self.failUnlessEqual(item.state,"stale")
+
+ sleep(0.1)
+ # t+~0.33
+ state = item.update_state()
+ self.failUnlessEqual(state,item.state)
+ self.failUnlessEqual(item.state,"purged")
+
+ sleep(0.1)
+ # t+~0.43
+ state = item.update_state()
+ self.failUnlessEqual(state,item.state)
+ self.failUnlessEqual(item.state,"purged")
+
+class TestCacheFetcher(unittest.TestCase):
+ def setUp(self):
+ self.cache = Cache(100)
+ self.event = None
+
+ def make_fetcher(self):
+ return CacheFetcher(self.cache, "test_addr", sec(1), sec(2), sec(3),
+ self.object_handler, self.error_handler, self.timeout_handler,
+ sec(1))
+
+ def object_handler(self, address, value, state):
+ self.event = ("success", address, value, state)
+
+ def error_handler(self, address, error_data):
+ self.event = ("error", address, error_data)
+
+ def timeout_handler(self, address):
+ self.event = ("timeout", address)
+
+ def test_fetcher_fetch(self):
+ fetcher = self.make_fetcher()
+ self.failUnlessRaises(RuntimeError,fetcher.fetch)
+
+ def test_success(self):
+ fetcher = self.make_fetcher()
+ fetcher.got_it("test_value")
+ self.failUnlessEqual(self.event,("success", "test_addr", "test_value", "new"))
+ self.event = None
+ fetcher.got_it("test_value")
+ self.failUnlessEqual(self.event, None)
+
+ def test_success_other_state(self):
+ fetcher = self.make_fetcher()
+ fetcher.got_it("test_value","stale")
+ self.failUnlessEqual(self.event,("success", "test_addr", "test_value", "stale"))
+
+ def test_error(self):
+ fetcher = self.make_fetcher()
+ fetcher.error("test_error")
+ self.failUnlessEqual(self.event,("error", "test_addr", "test_error"))
+ self.event = None
+ fetcher.error("test_error")
+ self.failUnlessEqual(self.event, None)
+
+ def test_timeout(self):
+ fetcher = self.make_fetcher()
+ fetcher.timeout()
+ self.failUnlessEqual(self.event,("timeout", "test_addr"))
+ self.event = None
+ fetcher.timeout()
+ self.failUnlessEqual(self.event, None)
+
+class TestCache(unittest.TestCase):
+ def setUp(self):
+ self.event = None
+ class MyFetcher(CacheFetcher):
+ def fetch(self):
+ thread = threading.Thread(target=self.thread)
+ thread.start()
+ def thread(self):
+ sleep(0.2)
+ if self.address >= 0:
+ self.got_it("value%i" % (self.address,))
+ else:
+ self.error("Negative address")
+ self.Fetcher = MyFetcher
+
+ def test_add_get_item(self):
+ cache = Cache(100)
+ item1 = CacheItem("test_addr1","test_value1", sec(1), sec(2), sec(3))
+ cache.add_item(item1)
+ item2 = CacheItem("test_addr2","test_value2", sec(1), sec(2), sec(3))
+ cache.add_item(item2)
+ item3 = CacheItem("test_addr3","test_value3", sec(1), sec(2), sec(3))
+ cache.add_item(item3)
+ gitem2 = cache.get_item("test_addr2","fresh")
+ self.failUnless(gitem2 is item2, "gitem2=%r item2=%r" % (gitem2, item2))
+ gitem1 = cache.get_item("test_addr1","old")
+ self.failUnless(gitem1 is item1, "gitem2=%r item2=%r" % (gitem2, item2))
+ gitem3 = cache.get_item("test_addr3","stale")
+ self.failUnless(gitem3 is item3, "gitem2=%r item2=%r" % (gitem2, item2))
+ gitem1 = cache.get_item("test_addr1","new")
+ self.failUnless(gitem1 is None, "gitem1=%r" % (gitem1,))
+
+ def test_max_items(self):
+ cache = Cache(10)
+ self.failUnlessEqual(cache.num_items(),0,"number of items: "+`cache.num_items()`)
+ for i in range(10):
+ item = CacheItem(i,i, sec(1), sec(2), sec(3))
+ cache.add_item(item)
+ self.failUnlessEqual(cache.num_items(),10,"number of items: "+`cache.num_items()`)
+ for i in range(10,30):
+ item = CacheItem(i,i, sec(1), sec(2), sec(3))
+ cache.add_item(item)
+ self.failUnless(cache.num_items() > 7,"number of items: "+`cache.num_items()`)
+ self.failUnless(cache.num_items() <= 10,"number of items: "+`cache.num_items()`)
+ self.failUnlessEqual(cache.num_items(),len(cache._items),
+ "number of items: %r, in dict: %r" % (cache.num_items(), cache._items))
+
+ def test_item_expiration(self):
+ cache = Cache(100)
+ item = CacheItem("test_addr","test_value", sec(0.1), sec(0.2), sec(0.3))
+ cache.add_item(item)
+ gitem = cache.get_item("test_addr","fresh")
+ self.failUnlessEqual(gitem,item)
+ sleep(0.01)
+ gitem = cache.get_item("test_addr","fresh")
+ self.failUnlessEqual(gitem,item)
+ sleep(0.1)
+ gitem = cache.get_item("test_addr","fresh")
+ self.failUnlessEqual(gitem,None)
+ cache.tick()
+ gitem = cache.get_item("test_addr","old")
+ self.failUnlessEqual(gitem,item)
+ sleep(0.3)
+ cache.tick()
+ gitem = cache.get_item("test_addr","stale")
+ self.failUnlessEqual(gitem,None)
+ self.failUnlessEqual(cache.num_items(),0,"number of items: "+`cache.num_items()`)
+
+ def test_request_object(self):
+ cache = Cache(100)
+ cache.set_fetcher(self.Fetcher)
+ cache.request_object(1, "fresh", self.object_handler,
+ self.error_handler, self.timeout_handler)
+ self.failUnlessEqual(self.event,None,"Early event: %r" % (self.event,))
+ sleep(0.05)
+ self.failUnlessEqual(self.event,None,"Early event: %r" % (self.event,))
+ sleep(0.3)
+ self.failUnlessEqual(self.event,("success", 1, "value1", "new"))
+ self.event = None
+ cache.request_object(1, "fresh", self.object_handler,
+ self.error_handler, self.timeout_handler)
+ self.failUnlessEqual(self.event,("success", 1, "value1", "fresh"))
+ self.event = None
+ cache.request_object(2, "fresh", self.object_handler,
+ self.error_handler, self.timeout_handler)
+ self.failUnlessEqual(self.event,None,"Early event: %r" % (self.event,))
+ sleep(0.05)
+ self.failUnlessEqual(self.event,None,"Early event: %r" % (self.event,))
+ sleep(0.3)
+ self.failUnlessEqual(self.event,("success", 2, "value2", "new"))
+ self.event = None
+ cache.request_object(2, "fresh", self.object_handler,
+ self.error_handler, self.timeout_handler)
+ self.failUnlessEqual(self.event,("success", 2, "value2", "fresh"))
+ self.event = None
+ cache.request_object(1, "fresh", self.object_handler,
+ self.error_handler, self.timeout_handler)
+ self.failUnlessEqual(self.event,("success", 1, "value1", "fresh"))
+
+ def object_handler(self, address, value, state):
+ self.event = ("success", address, value, state)
+
+ def error_handler(self, address, error_data):
+ self.event = ("error", address, error_data)
+
+ def timeout_handler(self, address):
+ self.event = ("timeout", address)
+
+
+def suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(TestCacheItem))
+ suite.addTest(unittest.makeSuite(TestCacheFetcher))
+ suite.addTest(unittest.makeSuite(TestCache))
+ return suite
+
+if __name__ == '__main__':
+ unittest.TextTestRunner(verbosity=2).run(suite())
+
+# vi: sts=4 et sw=4 encoding=utf-8