From aefcd594df4acf4c6fd019773a92feb0ceba927e Mon Sep 17 00:00:00 2001 From: James Vega Date: Sat, 12 Mar 2005 18:01:47 +0000 Subject: [PATCH] Move structures to utils/ --- plugins/Google/plugin.py | 3 +- plugins/Herald/plugin.py | 2 +- plugins/Relay/plugin.py | 2 +- src/commands.py | 1 - src/irclib.py | 2 +- src/ircutils.py | 3 +- src/utils/str.py | 5 +- src/{ => utils}/structures.py | 0 test/test_structures.py | 622 ---------------------------------- test/test_utils.py | 588 +++++++++++++++++++++++++++++++- 10 files changed, 594 insertions(+), 634 deletions(-) rename src/{ => utils}/structures.py (100%) delete mode 100644 test/test_structures.py diff --git a/plugins/Google/plugin.py b/plugins/Google/plugin.py index fc3a70347..93ef42607 100644 --- a/plugins/Google/plugin.py +++ b/plugins/Google/plugin.py @@ -44,7 +44,6 @@ from supybot.commands import * import supybot.ircmsgs as ircmsgs import supybot.ircutils as ircutils import supybot.callbacks as callbacks -import supybot.structures as structures def search(log, queries, **kwargs): # We have to keep stats here, rather than in formatData or elsewhere, @@ -95,7 +94,7 @@ def search(log, queries, **kwargs): log.exception('Uncaught SOAP exception in Google.search:') raise callbacks.Error, 'Error connecting to Google.com.' -last24hours = structures.TimeoutQueue(86400) +last24hours = utils.structures.TimeoutQueue(86400) totalTime = conf.supybot.plugins.Google.state.time() searches = conf.supybot.plugins.Google.state.searches() diff --git a/plugins/Herald/plugin.py b/plugins/Herald/plugin.py index 2650f1595..477051a48 100644 --- a/plugins/Herald/plugin.py +++ b/plugins/Herald/plugin.py @@ -40,7 +40,7 @@ import supybot.ircmsgs as ircmsgs import supybot.plugins as plugins import supybot.ircutils as ircutils import supybot.callbacks as callbacks -from supybot.structures import TimeoutQueue +from supybot.utils.structures import TimeoutQueue filename = conf.supybot.directories.data.dirize('Herald.db') diff --git a/plugins/Relay/plugin.py b/plugins/Relay/plugin.py index 007a75450..173fc3b39 100644 --- a/plugins/Relay/plugin.py +++ b/plugins/Relay/plugin.py @@ -37,7 +37,7 @@ import supybot.irclib as irclib import supybot.ircmsgs as ircmsgs import supybot.ircutils as ircutils import supybot.callbacks as callbacks -from supybot.structures import MultiSet, TimeoutQueue +from supybot.utils.structures import MultiSet, TimeoutQueue class Relay(callbacks.Plugin): noIgnore = True diff --git a/src/commands.py b/src/commands.py index a822998bb..78d821bb7 100644 --- a/src/commands.py +++ b/src/commands.py @@ -45,7 +45,6 @@ import supybot.ircdb as ircdb import supybot.ircmsgs as ircmsgs import supybot.ircutils as ircutils import supybot.callbacks as callbacks -import supybot.structures as structures ### diff --git a/src/irclib.py b/src/irclib.py index 9b4511e02..1b61f9eb4 100644 --- a/src/irclib.py +++ b/src/irclib.py @@ -41,7 +41,7 @@ import supybot.ircutils as ircutils from utils.str import rsplit from utils.iter import imap, chain, cycle -from supybot.structures import queue, smallqueue, RingBuffer +from utils.structures import queue, smallqueue, RingBuffer ### # The base class for a callback to be registered with an Irc object. Shows diff --git a/src/ircutils.py b/src/ircutils.py index cd0b0e921..ce722c3fc 100644 --- a/src/ircutils.py +++ b/src/ircutils.py @@ -42,7 +42,6 @@ import textwrap from cStringIO import StringIO as sio import supybot.utils as utils -import supybot.structures as structures def debug(s, *args): """Prints a debug string. Most likely replaced by our logging debug.""" @@ -551,7 +550,7 @@ class FloodQueue(object): # calls structures.TimeoutQueue.__repr__, which calls # getTimeout.__repr__, which calls our __repr__, which calls... getTimeout = lambda : self.getTimeout() - q = structures.TimeoutQueue(getTimeout) + q = utils.structures.TimeoutQueue(getTimeout) self.queues[key] = q return q else: diff --git a/src/utils/str.py b/src/utils/str.py index 06e34f6da..bae40dfdf 100644 --- a/src/utils/str.py +++ b/src/utils/str.py @@ -37,9 +37,8 @@ import sys import string import textwrap -import supybot.structures as structures - from iter import all, any +from structures import TwoWayDictionary curry = new.instancemethod chars = string.maketrans('', '') @@ -234,7 +233,7 @@ def ellipsisify(s, n): else: return (textwrap.wrap(s, n-3)[0] + '...') -plurals = structures.TwoWayDictionary({}) +plurals = TwoWayDictionary({}) def matchCase(s1, s2): """Matches the case of s1 in s2""" if s1.isupper(): diff --git a/src/structures.py b/src/utils/structures.py similarity index 100% rename from src/structures.py rename to src/utils/structures.py diff --git a/test/test_structures.py b/test/test_structures.py deleted file mode 100644 index c14d0e09b..000000000 --- a/test/test_structures.py +++ /dev/null @@ -1,622 +0,0 @@ -### -# Copyright (c) 2002-2005, Jeremiah Fincher -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# -# * Redistributions of source code must retain the above copyright notice, -# this list of conditions, and the following disclaimer. -# * Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions, and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# * Neither the name of the author of this software nor the name of -# contributors to this software may be used to endorse or promote products -# derived from this software without specific prior written consent. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE -# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -# POSSIBILITY OF SUCH DAMAGE. -### - -from supybot.test import * - -import pickle - -from supybot.structures import * - -class RingBufferTestCase(SupyTestCase): - def testInit(self): - self.assertRaises(ValueError, RingBuffer, -1) - self.assertRaises(ValueError, RingBuffer, 0) - self.assertEqual(range(10), list(RingBuffer(10, range(10)))) - - def testLen(self): - b = RingBuffer(3) - self.assertEqual(0, len(b)) - b.append(1) - self.assertEqual(1, len(b)) - b.append(2) - self.assertEqual(2, len(b)) - b.append(3) - self.assertEqual(3, len(b)) - b.append(4) - self.assertEqual(3, len(b)) - b.append(5) - self.assertEqual(3, len(b)) - - def testNonzero(self): - b = RingBuffer(3) - self.failIf(b) - b.append(1) - self.failUnless(b) - - def testAppend(self): - b = RingBuffer(3) - self.assertEqual([], list(b)) - b.append(1) - self.assertEqual([1], list(b)) - b.append(2) - self.assertEqual([1, 2], list(b)) - b.append(3) - self.assertEqual([1, 2, 3], list(b)) - b.append(4) - self.assertEqual([2, 3, 4], list(b)) - b.append(5) - self.assertEqual([3, 4, 5], list(b)) - b.append(6) - self.assertEqual([4, 5, 6], list(b)) - - def testContains(self): - b = RingBuffer(3, range(3)) - self.failUnless(0 in b) - self.failUnless(1 in b) - self.failUnless(2 in b) - self.failIf(3 in b) - - def testGetitem(self): - L = range(10) - b = RingBuffer(len(L), L) - for i in range(len(b)): - self.assertEqual(L[i], b[i]) - for i in range(len(b)): - self.assertEqual(L[-i], b[-i]) - for i in range(len(b)): - b.append(i) - for i in range(len(b)): - self.assertEqual(L[i], b[i]) - for i in range(len(b)): - self.assertEqual(list(b), list(b[:i]) + list(b[i:])) - - def testSliceGetitem(self): - L = range(10) - b = RingBuffer(len(L), L) - for i in range(len(b)): - self.assertEqual(L[:i], b[:i]) - self.assertEqual(L[i:], b[i:]) - self.assertEqual(L[i:len(b)-i], b[i:len(b)-i]) - self.assertEqual(L[:-i], b[:-i]) - self.assertEqual(L[-i:], b[-i:]) - self.assertEqual(L[i:-i], b[i:-i]) - for i in range(len(b)): - b.append(i) - for i in range(len(b)): - self.assertEqual(L[:i], b[:i]) - self.assertEqual(L[i:], b[i:]) - self.assertEqual(L[i:len(b)-i], b[i:len(b)-i]) - self.assertEqual(L[:-i], b[:-i]) - self.assertEqual(L[-i:], b[-i:]) - self.assertEqual(L[i:-i], b[i:-i]) - - def testSetitem(self): - L = range(10) - b = RingBuffer(len(L), [0]*len(L)) - for i in range(len(b)): - b[i] = i - for i in range(len(b)): - self.assertEqual(b[i], i) - for i in range(len(b)): - b.append(0) - for i in range(len(b)): - b[i] = i - for i in range(len(b)): - self.assertEqual(b[i], i) - - def testSliceSetitem(self): - L = range(10) - b = RingBuffer(len(L), [0]*len(L)) - self.assertRaises(ValueError, b.__setitem__, slice(0, 10), []) - b[2:4] = L[2:4] - self.assertEquals(b[2:4], L[2:4]) - for _ in range(len(b)): - b.append(0) - b[2:4] = L[2:4] - self.assertEquals(b[2:4], L[2:4]) - - def testExtend(self): - b = RingBuffer(3, range(3)) - self.assertEqual(list(b), range(3)) - b.extend(range(6)) - self.assertEqual(list(b), range(6)[3:]) - - def testRepr(self): - b = RingBuffer(3) - self.assertEqual(repr(b), 'RingBuffer(3, [])') - b.append(1) - self.assertEqual(repr(b), 'RingBuffer(3, [1])') - b.append(2) - self.assertEqual(repr(b), 'RingBuffer(3, [1, 2])') - b.append(3) - self.assertEqual(repr(b), 'RingBuffer(3, [1, 2, 3])') - b.append(4) - self.assertEqual(repr(b), 'RingBuffer(3, [2, 3, 4])') - b.append(5) - self.assertEqual(repr(b), 'RingBuffer(3, [3, 4, 5])') - b.append(6) - self.assertEqual(repr(b), 'RingBuffer(3, [4, 5, 6])') - - def testPickleCopy(self): - b = RingBuffer(10, range(10)) - self.assertEqual(pickle.loads(pickle.dumps(b)), b) - - def testEq(self): - b = RingBuffer(3, range(3)) - self.failIf(b == range(3)) - b1 = RingBuffer(3) - self.failIf(b == b1) - b1.append(0) - self.failIf(b == b1) - b1.append(1) - self.failIf(b == b1) - b1.append(2) - self.failUnless(b == b1) - b = RingBuffer(100, range(10)) - b1 = RingBuffer(10, range(10)) - self.failIf(b == b1) - - def testIter(self): - b = RingBuffer(3, range(3)) - L = [] - for elt in b: - L.append(elt) - self.assertEqual(L, range(3)) - for elt in range(3): - b.append(elt) - del L[:] - for elt in b: - L.append(elt) - self.assertEqual(L, range(3)) - - -class QueueTest(SupyTestCase): - def testReset(self): - q = queue() - q.enqueue(1) - self.assertEqual(len(q), 1) - q.reset() - self.assertEqual(len(q), 0) - - def testGetitem(self): - q = queue() - n = 10 - self.assertRaises(IndexError, q.__getitem__, 0) - for i in xrange(n): - q.enqueue(i) - for i in xrange(n): - self.assertEqual(q[i], i) - for i in xrange(n, 0, -1): - self.assertEqual(q[-i], n-i) - for i in xrange(len(q)): - self.assertEqual(list(q), list(q[:i]) + list(q[i:])) - self.assertRaises(IndexError, q.__getitem__, -(n+1)) - self.assertRaises(IndexError, q.__getitem__, n) - self.assertEqual(q[3:7], queue([3, 4, 5, 6])) - - def testSetitem(self): - q1 = queue() - self.assertRaises(IndexError, q1.__setitem__, 0, 0) - for i in xrange(10): - q1.enqueue(i) - q2 = eval(repr(q1)) - for (i, elt) in enumerate(q2): - q2[i] = elt*2 - self.assertEqual([x*2 for x in q1], list(q2)) - - def testNonzero(self): - q = queue() - self.failIf(q, 'queue not zero after initialization') - q.enqueue(1) - self.failUnless(q, 'queue zero after adding element') - q.dequeue() - self.failIf(q, 'queue not zero after dequeue of only element') - - def testLen(self): - q = queue() - self.assertEqual(0, len(q), 'queue len not 0 after initialization') - q.enqueue(1) - self.assertEqual(1, len(q), 'queue len not 1 after enqueue') - q.enqueue(2) - self.assertEqual(2, len(q), 'queue len not 2 after enqueue') - q.dequeue() - self.assertEqual(1, len(q), 'queue len not 1 after dequeue') - q.dequeue() - self.assertEqual(0, len(q), 'queue len not 0 after dequeue') - for i in range(10): - L = range(i) - q = queue(L) - self.assertEqual(len(q), i) - - def testEq(self): - q1 = queue() - q2 = queue() - self.failUnless(q1 == q1, 'queue not equal to itself') - self.failUnless(q2 == q2, 'queue not equal to itself') - self.failUnless(q1 == q2, 'initialized queues not equal') - q1.enqueue(1) - self.failUnless(q1 == q1, 'queue not equal to itself') - self.failUnless(q2 == q2, 'queue not equal to itself') - q2.enqueue(1) - self.failUnless(q1 == q1, 'queue not equal to itself') - self.failUnless(q2 == q2, 'queue not equal to itself') - self.failUnless(q1 == q2, 'queues not equal after identical enqueue') - q1.dequeue() - self.failUnless(q1 == q1, 'queue not equal to itself') - self.failUnless(q2 == q2, 'queue not equal to itself') - self.failIf(q1 == q2, 'queues equal after one dequeue') - q2.dequeue() - self.failUnless(q1 == q2, 'queues not equal after both are dequeued') - self.failUnless(q1 == q1, 'queue not equal to itself') - self.failUnless(q2 == q2, 'queue not equal to itself') - - def testInit(self): - self.assertEqual(len(queue()), 0, 'queue len not 0 after init') - q = queue() - q.enqueue(1) - q.enqueue(2) - q.enqueue(3) - self.assertEqual(queue((1, 2, 3)),q, 'init not equivalent to enqueues') - q = queue((1, 2, 3)) - self.assertEqual(q.dequeue(), 1, 'values not returned in proper order') - self.assertEqual(q.dequeue(), 2, 'values not returned in proper order') - self.assertEqual(q.dequeue(), 3, 'values not returned in proper order') - - def testRepr(self): - q = queue() - q.enqueue(1) - self.assertEqual(q, eval(repr(q)), 'repr doesn\'t eval to same queue') - q.enqueue('foo') - self.assertEqual(q, eval(repr(q)), 'repr doesn\'t eval to same queue') - q.enqueue(None) - self.assertEqual(q, eval(repr(q)), 'repr doesn\'t eval to same queue') - q.enqueue(1.0) - self.assertEqual(q, eval(repr(q)), 'repr doesn\'t eval to same queue') - q.enqueue([]) - self.assertEqual(q, eval(repr(q)), 'repr doesn\'t eval to same queue') - q.enqueue(()) - self.assertEqual(q, eval(repr(q)), 'repr doesn\'t eval to same queue') - q.enqueue([1]) - self.assertEqual(q, eval(repr(q)), 'repr doesn\'t eval to same queue') - q.enqueue((1,)) - self.assertEqual(q, eval(repr(q)), 'repr doesn\'t eval to same queue') - - def testEnqueueDequeue(self): - q = queue() - self.assertRaises(IndexError, q.dequeue) - q.enqueue(1) - self.assertEqual(q.dequeue(), 1, - 'first dequeue didn\'t return same as first enqueue') - q.enqueue(1) - q.enqueue(2) - q.enqueue(3) - self.assertEqual(q.dequeue(), 1) - self.assertEqual(q.dequeue(), 2) - self.assertEqual(q.dequeue(), 3) - - def testPeek(self): - q = queue() - self.assertRaises(IndexError, q.peek) - q.enqueue(1) - self.assertEqual(q.peek(), 1, 'peek didn\'t return first enqueue') - q.enqueue(2) - self.assertEqual(q.peek(), 1, 'peek didn\'t return first enqueue') - q.dequeue() - self.assertEqual(q.peek(), 2, 'peek didn\'t return second enqueue') - q.dequeue() - self.assertRaises(IndexError, q.peek) - - def testContains(self): - q = queue() - self.failIf(1 in q, 'empty queue cannot have elements') - q.enqueue(1) - self.failUnless(1 in q, 'recent enqueued element not in q') - q.enqueue(2) - self.failUnless(1 in q, 'original enqueued element not in q') - self.failUnless(2 in q, 'second enqueued element not in q') - q.dequeue() - self.failIf(1 in q, 'dequeued element in q') - self.failUnless(2 in q, 'not dequeued element not in q') - q.dequeue() - self.failIf(2 in q, 'dequeued element in q') - - def testIter(self): - q1 = queue((1, 2, 3)) - q2 = queue() - for i in q1: - q2.enqueue(i) - self.assertEqual(q1, q2, 'iterate didn\'t return all elements') - for _ in queue(): - self.fail('no elements should be in empty queue') - - def testPickleCopy(self): - q = queue(range(10)) - self.assertEqual(q, pickle.loads(pickle.dumps(q))) - -queue = smallqueue - -class SmallQueueTest(SupyTestCase): - def testReset(self): - q = queue() - q.enqueue(1) - self.assertEqual(len(q), 1) - q.reset() - self.assertEqual(len(q), 0) - - def testGetitem(self): - q = queue() - n = 10 - self.assertRaises(IndexError, q.__getitem__, 0) - for i in xrange(n): - q.enqueue(i) - for i in xrange(n): - self.assertEqual(q[i], i) - for i in xrange(n, 0, -1): - self.assertEqual(q[-i], n-i) - for i in xrange(len(q)): - self.assertEqual(list(q), list(q[:i]) + list(q[i:])) - self.assertRaises(IndexError, q.__getitem__, -(n+1)) - self.assertRaises(IndexError, q.__getitem__, n) - self.assertEqual(q[3:7], queue([3, 4, 5, 6])) - - def testSetitem(self): - q1 = queue() - self.assertRaises(IndexError, q1.__setitem__, 0, 0) - for i in xrange(10): - q1.enqueue(i) - q2 = eval(repr(q1)) - for (i, elt) in enumerate(q2): - q2[i] = elt*2 - self.assertEqual([x*2 for x in q1], list(q2)) - - def testNonzero(self): - q = queue() - self.failIf(q, 'queue not zero after initialization') - q.enqueue(1) - self.failUnless(q, 'queue zero after adding element') - q.dequeue() - self.failIf(q, 'queue not zero after dequeue of only element') - - def testLen(self): - q = queue() - self.assertEqual(0, len(q), 'queue len not 0 after initialization') - q.enqueue(1) - self.assertEqual(1, len(q), 'queue len not 1 after enqueue') - q.enqueue(2) - self.assertEqual(2, len(q), 'queue len not 2 after enqueue') - q.dequeue() - self.assertEqual(1, len(q), 'queue len not 1 after dequeue') - q.dequeue() - self.assertEqual(0, len(q), 'queue len not 0 after dequeue') - for i in range(10): - L = range(i) - q = queue(L) - self.assertEqual(len(q), i) - - def testEq(self): - q1 = queue() - q2 = queue() - self.failUnless(q1 == q1, 'queue not equal to itself') - self.failUnless(q2 == q2, 'queue not equal to itself') - self.failUnless(q1 == q2, 'initialized queues not equal') - q1.enqueue(1) - self.failUnless(q1 == q1, 'queue not equal to itself') - self.failUnless(q2 == q2, 'queue not equal to itself') - q2.enqueue(1) - self.failUnless(q1 == q1, 'queue not equal to itself') - self.failUnless(q2 == q2, 'queue not equal to itself') - self.failUnless(q1 == q2, 'queues not equal after identical enqueue') - q1.dequeue() - self.failUnless(q1 == q1, 'queue not equal to itself') - self.failUnless(q2 == q2, 'queue not equal to itself') - self.failIf(q1 == q2, 'queues equal after one dequeue') - q2.dequeue() - self.failUnless(q1 == q2, 'queues not equal after both are dequeued') - self.failUnless(q1 == q1, 'queue not equal to itself') - self.failUnless(q2 == q2, 'queue not equal to itself') - - def testInit(self): - self.assertEqual(len(queue()), 0, 'queue len not 0 after init') - q = queue() - q.enqueue(1) - q.enqueue(2) - q.enqueue(3) - self.assertEqual(queue((1, 2, 3)),q, 'init not equivalent to enqueues') - q = queue((1, 2, 3)) - self.assertEqual(q.dequeue(), 1, 'values not returned in proper order') - self.assertEqual(q.dequeue(), 2, 'values not returned in proper order') - self.assertEqual(q.dequeue(), 3, 'values not returned in proper order') - - def testRepr(self): - q = queue() - q.enqueue(1) - self.assertEqual(q, eval(repr(q)), 'repr doesn\'t eval to same queue') - q.enqueue('foo') - self.assertEqual(q, eval(repr(q)), 'repr doesn\'t eval to same queue') - q.enqueue(None) - self.assertEqual(q, eval(repr(q)), 'repr doesn\'t eval to same queue') - q.enqueue(1.0) - self.assertEqual(q, eval(repr(q)), 'repr doesn\'t eval to same queue') - q.enqueue([]) - self.assertEqual(q, eval(repr(q)), 'repr doesn\'t eval to same queue') - q.enqueue(()) - self.assertEqual(q, eval(repr(q)), 'repr doesn\'t eval to same queue') - q.enqueue([1]) - self.assertEqual(q, eval(repr(q)), 'repr doesn\'t eval to same queue') - q.enqueue((1,)) - self.assertEqual(q, eval(repr(q)), 'repr doesn\'t eval to same queue') - - def testEnqueueDequeue(self): - q = queue() - self.assertRaises(IndexError, q.dequeue) - q.enqueue(1) - self.assertEqual(q.dequeue(), 1, - 'first dequeue didn\'t return same as first enqueue') - q.enqueue(1) - q.enqueue(2) - q.enqueue(3) - self.assertEqual(q.dequeue(), 1) - self.assertEqual(q.dequeue(), 2) - self.assertEqual(q.dequeue(), 3) - - def testPeek(self): - q = queue() - self.assertRaises(IndexError, q.peek) - q.enqueue(1) - self.assertEqual(q.peek(), 1, 'peek didn\'t return first enqueue') - q.enqueue(2) - self.assertEqual(q.peek(), 1, 'peek didn\'t return first enqueue') - q.dequeue() - self.assertEqual(q.peek(), 2, 'peek didn\'t return second enqueue') - q.dequeue() - self.assertRaises(IndexError, q.peek) - - def testContains(self): - q = queue() - self.failIf(1 in q, 'empty queue cannot have elements') - q.enqueue(1) - self.failUnless(1 in q, 'recent enqueued element not in q') - q.enqueue(2) - self.failUnless(1 in q, 'original enqueued element not in q') - self.failUnless(2 in q, 'second enqueued element not in q') - q.dequeue() - self.failIf(1 in q, 'dequeued element in q') - self.failUnless(2 in q, 'not dequeued element not in q') - q.dequeue() - self.failIf(2 in q, 'dequeued element in q') - - def testIter(self): - q1 = queue((1, 2, 3)) - q2 = queue() - for i in q1: - q2.enqueue(i) - self.assertEqual(q1, q2, 'iterate didn\'t return all elements') - for _ in queue(): - self.fail('no elements should be in empty queue') - - def testPickleCopy(self): - q = queue(range(10)) - self.assertEqual(q, pickle.loads(pickle.dumps(q))) - - -class MaxLengthQueueTestCase(SupyTestCase): - def testInit(self): - q = MaxLengthQueue(3, (1, 2, 3)) - self.assertEqual(list(q), [1, 2, 3]) - self.assertRaises(TypeError, MaxLengthQueue, 3, 1, 2, 3) - - def testMaxLength(self): - q = MaxLengthQueue(3) - q.enqueue(1) - self.assertEqual(len(q), 1) - q.enqueue(2) - self.assertEqual(len(q), 2) - q.enqueue(3) - self.assertEqual(len(q), 3) - q.enqueue(4) - self.assertEqual(len(q), 3) - self.assertEqual(q.peek(), 2) - q.enqueue(5) - self.assertEqual(len(q), 3) - self.assertEqual(q[0], 3) - - -class TwoWayDictionaryTestCase(SupyTestCase): - def testInit(self): - d = TwoWayDictionary(foo='bar') - self.failUnless('foo' in d) - self.failUnless('bar' in d) - - d = TwoWayDictionary({1: 2}) - self.failUnless(1 in d) - self.failUnless(2 in d) - - def testSetitem(self): - d = TwoWayDictionary() - d['foo'] = 'bar' - self.failUnless('foo' in d) - self.failUnless('bar' in d) - - def testDelitem(self): - d = TwoWayDictionary(foo='bar') - del d['foo'] - self.failIf('foo' in d) - self.failIf('bar' in d) - d = TwoWayDictionary(foo='bar') - del d['bar'] - self.failIf('bar' in d) - self.failIf('foo' in d) - - -class TestTimeoutQueue(SupyTestCase): - def test(self): - q = TimeoutQueue(1) - q.enqueue(1) - self.assertEqual(len(q), 1) - q.enqueue(2) - self.assertEqual(len(q), 2) - q.enqueue(3) - self.assertEqual(len(q), 3) - self.assertEqual(sum(q), 6) - time.sleep(1.1) - self.assertEqual(len(q), 0) - self.assertEqual(sum(q), 0) - - def testCallableTimeout(self): - q = TimeoutQueue(lambda : 1) - q.enqueue(1) - self.assertEqual(len(q), 1) - q.enqueue(2) - self.assertEqual(len(q), 2) - q.enqueue(3) - self.assertEqual(len(q), 3) - self.assertEqual(sum(q), 6) - time.sleep(1.1) - self.assertEqual(len(q), 0) - self.assertEqual(sum(q), 0) - - def testContains(self): - q = TimeoutQueue(1) - q.enqueue(1) - self.failUnless(1 in q) - self.failUnless(1 in q) # For some reason, the second one might fail. - self.failIf(2 in q) - time.sleep(1.1) - self.failIf(1 in q) - - def testReset(self): - q = TimeoutQueue(10) - q.enqueue(1) - self.failUnless(1 in q) - q.reset() - self.failIf(1 in q) - - -# vim:set shiftwidth=4 tabstop=8 expandtab textwidth=78: - diff --git a/test/test_utils.py b/test/test_utils.py index 9777827ea..31c628d0e 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -30,7 +30,9 @@ from supybot.test import * import time +import pickle import supybot.utils as utils +from supybot.utils.structures import * class UtilsTest(SupyTestCase): def testReversed(self): @@ -318,7 +320,7 @@ class StrTest(SupyTestCase): self.assertEqual(f(vars, '${f}'), 'called') self.assertEqual(f(vars, '${b a z}'), 'baz') self.assertEqual(f(vars, '$b:$i'), 'c:100') - + def testCommaAndify(self): f = utils.str.commaAndify L = ['foo'] @@ -524,5 +526,589 @@ class FormatTestCase(SupyTestCase): 'I have 3 kinds of fruit: ' 'apples, oranges, and watermelon.') +class RingBufferTestCase(SupyTestCase): + def testInit(self): + self.assertRaises(ValueError, RingBuffer, -1) + self.assertRaises(ValueError, RingBuffer, 0) + self.assertEqual(range(10), list(RingBuffer(10, range(10)))) + + def testLen(self): + b = RingBuffer(3) + self.assertEqual(0, len(b)) + b.append(1) + self.assertEqual(1, len(b)) + b.append(2) + self.assertEqual(2, len(b)) + b.append(3) + self.assertEqual(3, len(b)) + b.append(4) + self.assertEqual(3, len(b)) + b.append(5) + self.assertEqual(3, len(b)) + + def testNonzero(self): + b = RingBuffer(3) + self.failIf(b) + b.append(1) + self.failUnless(b) + + def testAppend(self): + b = RingBuffer(3) + self.assertEqual([], list(b)) + b.append(1) + self.assertEqual([1], list(b)) + b.append(2) + self.assertEqual([1, 2], list(b)) + b.append(3) + self.assertEqual([1, 2, 3], list(b)) + b.append(4) + self.assertEqual([2, 3, 4], list(b)) + b.append(5) + self.assertEqual([3, 4, 5], list(b)) + b.append(6) + self.assertEqual([4, 5, 6], list(b)) + + def testContains(self): + b = RingBuffer(3, range(3)) + self.failUnless(0 in b) + self.failUnless(1 in b) + self.failUnless(2 in b) + self.failIf(3 in b) + + def testGetitem(self): + L = range(10) + b = RingBuffer(len(L), L) + for i in range(len(b)): + self.assertEqual(L[i], b[i]) + for i in range(len(b)): + self.assertEqual(L[-i], b[-i]) + for i in range(len(b)): + b.append(i) + for i in range(len(b)): + self.assertEqual(L[i], b[i]) + for i in range(len(b)): + self.assertEqual(list(b), list(b[:i]) + list(b[i:])) + + def testSliceGetitem(self): + L = range(10) + b = RingBuffer(len(L), L) + for i in range(len(b)): + self.assertEqual(L[:i], b[:i]) + self.assertEqual(L[i:], b[i:]) + self.assertEqual(L[i:len(b)-i], b[i:len(b)-i]) + self.assertEqual(L[:-i], b[:-i]) + self.assertEqual(L[-i:], b[-i:]) + self.assertEqual(L[i:-i], b[i:-i]) + for i in range(len(b)): + b.append(i) + for i in range(len(b)): + self.assertEqual(L[:i], b[:i]) + self.assertEqual(L[i:], b[i:]) + self.assertEqual(L[i:len(b)-i], b[i:len(b)-i]) + self.assertEqual(L[:-i], b[:-i]) + self.assertEqual(L[-i:], b[-i:]) + self.assertEqual(L[i:-i], b[i:-i]) + + def testSetitem(self): + L = range(10) + b = RingBuffer(len(L), [0]*len(L)) + for i in range(len(b)): + b[i] = i + for i in range(len(b)): + self.assertEqual(b[i], i) + for i in range(len(b)): + b.append(0) + for i in range(len(b)): + b[i] = i + for i in range(len(b)): + self.assertEqual(b[i], i) + + def testSliceSetitem(self): + L = range(10) + b = RingBuffer(len(L), [0]*len(L)) + self.assertRaises(ValueError, b.__setitem__, slice(0, 10), []) + b[2:4] = L[2:4] + self.assertEquals(b[2:4], L[2:4]) + for _ in range(len(b)): + b.append(0) + b[2:4] = L[2:4] + self.assertEquals(b[2:4], L[2:4]) + + def testExtend(self): + b = RingBuffer(3, range(3)) + self.assertEqual(list(b), range(3)) + b.extend(range(6)) + self.assertEqual(list(b), range(6)[3:]) + + def testRepr(self): + b = RingBuffer(3) + self.assertEqual(repr(b), 'RingBuffer(3, [])') + b.append(1) + self.assertEqual(repr(b), 'RingBuffer(3, [1])') + b.append(2) + self.assertEqual(repr(b), 'RingBuffer(3, [1, 2])') + b.append(3) + self.assertEqual(repr(b), 'RingBuffer(3, [1, 2, 3])') + b.append(4) + self.assertEqual(repr(b), 'RingBuffer(3, [2, 3, 4])') + b.append(5) + self.assertEqual(repr(b), 'RingBuffer(3, [3, 4, 5])') + b.append(6) + self.assertEqual(repr(b), 'RingBuffer(3, [4, 5, 6])') + + def testPickleCopy(self): + b = RingBuffer(10, range(10)) + self.assertEqual(pickle.loads(pickle.dumps(b)), b) + + def testEq(self): + b = RingBuffer(3, range(3)) + self.failIf(b == range(3)) + b1 = RingBuffer(3) + self.failIf(b == b1) + b1.append(0) + self.failIf(b == b1) + b1.append(1) + self.failIf(b == b1) + b1.append(2) + self.failUnless(b == b1) + b = RingBuffer(100, range(10)) + b1 = RingBuffer(10, range(10)) + self.failIf(b == b1) + + def testIter(self): + b = RingBuffer(3, range(3)) + L = [] + for elt in b: + L.append(elt) + self.assertEqual(L, range(3)) + for elt in range(3): + b.append(elt) + del L[:] + for elt in b: + L.append(elt) + self.assertEqual(L, range(3)) + + +class QueueTest(SupyTestCase): + def testReset(self): + q = queue() + q.enqueue(1) + self.assertEqual(len(q), 1) + q.reset() + self.assertEqual(len(q), 0) + + def testGetitem(self): + q = queue() + n = 10 + self.assertRaises(IndexError, q.__getitem__, 0) + for i in xrange(n): + q.enqueue(i) + for i in xrange(n): + self.assertEqual(q[i], i) + for i in xrange(n, 0, -1): + self.assertEqual(q[-i], n-i) + for i in xrange(len(q)): + self.assertEqual(list(q), list(q[:i]) + list(q[i:])) + self.assertRaises(IndexError, q.__getitem__, -(n+1)) + self.assertRaises(IndexError, q.__getitem__, n) + self.assertEqual(q[3:7], queue([3, 4, 5, 6])) + + def testSetitem(self): + q1 = queue() + self.assertRaises(IndexError, q1.__setitem__, 0, 0) + for i in xrange(10): + q1.enqueue(i) + q2 = eval(repr(q1)) + for (i, elt) in enumerate(q2): + q2[i] = elt*2 + self.assertEqual([x*2 for x in q1], list(q2)) + + def testNonzero(self): + q = queue() + self.failIf(q, 'queue not zero after initialization') + q.enqueue(1) + self.failUnless(q, 'queue zero after adding element') + q.dequeue() + self.failIf(q, 'queue not zero after dequeue of only element') + + def testLen(self): + q = queue() + self.assertEqual(0, len(q), 'queue len not 0 after initialization') + q.enqueue(1) + self.assertEqual(1, len(q), 'queue len not 1 after enqueue') + q.enqueue(2) + self.assertEqual(2, len(q), 'queue len not 2 after enqueue') + q.dequeue() + self.assertEqual(1, len(q), 'queue len not 1 after dequeue') + q.dequeue() + self.assertEqual(0, len(q), 'queue len not 0 after dequeue') + for i in range(10): + L = range(i) + q = queue(L) + self.assertEqual(len(q), i) + + def testEq(self): + q1 = queue() + q2 = queue() + self.failUnless(q1 == q1, 'queue not equal to itself') + self.failUnless(q2 == q2, 'queue not equal to itself') + self.failUnless(q1 == q2, 'initialized queues not equal') + q1.enqueue(1) + self.failUnless(q1 == q1, 'queue not equal to itself') + self.failUnless(q2 == q2, 'queue not equal to itself') + q2.enqueue(1) + self.failUnless(q1 == q1, 'queue not equal to itself') + self.failUnless(q2 == q2, 'queue not equal to itself') + self.failUnless(q1 == q2, 'queues not equal after identical enqueue') + q1.dequeue() + self.failUnless(q1 == q1, 'queue not equal to itself') + self.failUnless(q2 == q2, 'queue not equal to itself') + self.failIf(q1 == q2, 'queues equal after one dequeue') + q2.dequeue() + self.failUnless(q1 == q2, 'queues not equal after both are dequeued') + self.failUnless(q1 == q1, 'queue not equal to itself') + self.failUnless(q2 == q2, 'queue not equal to itself') + + def testInit(self): + self.assertEqual(len(queue()), 0, 'queue len not 0 after init') + q = queue() + q.enqueue(1) + q.enqueue(2) + q.enqueue(3) + self.assertEqual(queue((1, 2, 3)),q, 'init not equivalent to enqueues') + q = queue((1, 2, 3)) + self.assertEqual(q.dequeue(), 1, 'values not returned in proper order') + self.assertEqual(q.dequeue(), 2, 'values not returned in proper order') + self.assertEqual(q.dequeue(), 3, 'values not returned in proper order') + + def testRepr(self): + q = queue() + q.enqueue(1) + self.assertEqual(q, eval(repr(q)), 'repr doesn\'t eval to same queue') + q.enqueue('foo') + self.assertEqual(q, eval(repr(q)), 'repr doesn\'t eval to same queue') + q.enqueue(None) + self.assertEqual(q, eval(repr(q)), 'repr doesn\'t eval to same queue') + q.enqueue(1.0) + self.assertEqual(q, eval(repr(q)), 'repr doesn\'t eval to same queue') + q.enqueue([]) + self.assertEqual(q, eval(repr(q)), 'repr doesn\'t eval to same queue') + q.enqueue(()) + self.assertEqual(q, eval(repr(q)), 'repr doesn\'t eval to same queue') + q.enqueue([1]) + self.assertEqual(q, eval(repr(q)), 'repr doesn\'t eval to same queue') + q.enqueue((1,)) + self.assertEqual(q, eval(repr(q)), 'repr doesn\'t eval to same queue') + + def testEnqueueDequeue(self): + q = queue() + self.assertRaises(IndexError, q.dequeue) + q.enqueue(1) + self.assertEqual(q.dequeue(), 1, + 'first dequeue didn\'t return same as first enqueue') + q.enqueue(1) + q.enqueue(2) + q.enqueue(3) + self.assertEqual(q.dequeue(), 1) + self.assertEqual(q.dequeue(), 2) + self.assertEqual(q.dequeue(), 3) + + def testPeek(self): + q = queue() + self.assertRaises(IndexError, q.peek) + q.enqueue(1) + self.assertEqual(q.peek(), 1, 'peek didn\'t return first enqueue') + q.enqueue(2) + self.assertEqual(q.peek(), 1, 'peek didn\'t return first enqueue') + q.dequeue() + self.assertEqual(q.peek(), 2, 'peek didn\'t return second enqueue') + q.dequeue() + self.assertRaises(IndexError, q.peek) + + def testContains(self): + q = queue() + self.failIf(1 in q, 'empty queue cannot have elements') + q.enqueue(1) + self.failUnless(1 in q, 'recent enqueued element not in q') + q.enqueue(2) + self.failUnless(1 in q, 'original enqueued element not in q') + self.failUnless(2 in q, 'second enqueued element not in q') + q.dequeue() + self.failIf(1 in q, 'dequeued element in q') + self.failUnless(2 in q, 'not dequeued element not in q') + q.dequeue() + self.failIf(2 in q, 'dequeued element in q') + + def testIter(self): + q1 = queue((1, 2, 3)) + q2 = queue() + for i in q1: + q2.enqueue(i) + self.assertEqual(q1, q2, 'iterate didn\'t return all elements') + for _ in queue(): + self.fail('no elements should be in empty queue') + + def testPickleCopy(self): + q = queue(range(10)) + self.assertEqual(q, pickle.loads(pickle.dumps(q))) + +queue = smallqueue + +class SmallQueueTest(SupyTestCase): + def testReset(self): + q = queue() + q.enqueue(1) + self.assertEqual(len(q), 1) + q.reset() + self.assertEqual(len(q), 0) + + def testGetitem(self): + q = queue() + n = 10 + self.assertRaises(IndexError, q.__getitem__, 0) + for i in xrange(n): + q.enqueue(i) + for i in xrange(n): + self.assertEqual(q[i], i) + for i in xrange(n, 0, -1): + self.assertEqual(q[-i], n-i) + for i in xrange(len(q)): + self.assertEqual(list(q), list(q[:i]) + list(q[i:])) + self.assertRaises(IndexError, q.__getitem__, -(n+1)) + self.assertRaises(IndexError, q.__getitem__, n) + self.assertEqual(q[3:7], queue([3, 4, 5, 6])) + + def testSetitem(self): + q1 = queue() + self.assertRaises(IndexError, q1.__setitem__, 0, 0) + for i in xrange(10): + q1.enqueue(i) + q2 = eval(repr(q1)) + for (i, elt) in enumerate(q2): + q2[i] = elt*2 + self.assertEqual([x*2 for x in q1], list(q2)) + + def testNonzero(self): + q = queue() + self.failIf(q, 'queue not zero after initialization') + q.enqueue(1) + self.failUnless(q, 'queue zero after adding element') + q.dequeue() + self.failIf(q, 'queue not zero after dequeue of only element') + + def testLen(self): + q = queue() + self.assertEqual(0, len(q), 'queue len not 0 after initialization') + q.enqueue(1) + self.assertEqual(1, len(q), 'queue len not 1 after enqueue') + q.enqueue(2) + self.assertEqual(2, len(q), 'queue len not 2 after enqueue') + q.dequeue() + self.assertEqual(1, len(q), 'queue len not 1 after dequeue') + q.dequeue() + self.assertEqual(0, len(q), 'queue len not 0 after dequeue') + for i in range(10): + L = range(i) + q = queue(L) + self.assertEqual(len(q), i) + + def testEq(self): + q1 = queue() + q2 = queue() + self.failUnless(q1 == q1, 'queue not equal to itself') + self.failUnless(q2 == q2, 'queue not equal to itself') + self.failUnless(q1 == q2, 'initialized queues not equal') + q1.enqueue(1) + self.failUnless(q1 == q1, 'queue not equal to itself') + self.failUnless(q2 == q2, 'queue not equal to itself') + q2.enqueue(1) + self.failUnless(q1 == q1, 'queue not equal to itself') + self.failUnless(q2 == q2, 'queue not equal to itself') + self.failUnless(q1 == q2, 'queues not equal after identical enqueue') + q1.dequeue() + self.failUnless(q1 == q1, 'queue not equal to itself') + self.failUnless(q2 == q2, 'queue not equal to itself') + self.failIf(q1 == q2, 'queues equal after one dequeue') + q2.dequeue() + self.failUnless(q1 == q2, 'queues not equal after both are dequeued') + self.failUnless(q1 == q1, 'queue not equal to itself') + self.failUnless(q2 == q2, 'queue not equal to itself') + + def testInit(self): + self.assertEqual(len(queue()), 0, 'queue len not 0 after init') + q = queue() + q.enqueue(1) + q.enqueue(2) + q.enqueue(3) + self.assertEqual(queue((1, 2, 3)),q, 'init not equivalent to enqueues') + q = queue((1, 2, 3)) + self.assertEqual(q.dequeue(), 1, 'values not returned in proper order') + self.assertEqual(q.dequeue(), 2, 'values not returned in proper order') + self.assertEqual(q.dequeue(), 3, 'values not returned in proper order') + + def testRepr(self): + q = queue() + q.enqueue(1) + self.assertEqual(q, eval(repr(q)), 'repr doesn\'t eval to same queue') + q.enqueue('foo') + self.assertEqual(q, eval(repr(q)), 'repr doesn\'t eval to same queue') + q.enqueue(None) + self.assertEqual(q, eval(repr(q)), 'repr doesn\'t eval to same queue') + q.enqueue(1.0) + self.assertEqual(q, eval(repr(q)), 'repr doesn\'t eval to same queue') + q.enqueue([]) + self.assertEqual(q, eval(repr(q)), 'repr doesn\'t eval to same queue') + q.enqueue(()) + self.assertEqual(q, eval(repr(q)), 'repr doesn\'t eval to same queue') + q.enqueue([1]) + self.assertEqual(q, eval(repr(q)), 'repr doesn\'t eval to same queue') + q.enqueue((1,)) + self.assertEqual(q, eval(repr(q)), 'repr doesn\'t eval to same queue') + + def testEnqueueDequeue(self): + q = queue() + self.assertRaises(IndexError, q.dequeue) + q.enqueue(1) + self.assertEqual(q.dequeue(), 1, + 'first dequeue didn\'t return same as first enqueue') + q.enqueue(1) + q.enqueue(2) + q.enqueue(3) + self.assertEqual(q.dequeue(), 1) + self.assertEqual(q.dequeue(), 2) + self.assertEqual(q.dequeue(), 3) + + def testPeek(self): + q = queue() + self.assertRaises(IndexError, q.peek) + q.enqueue(1) + self.assertEqual(q.peek(), 1, 'peek didn\'t return first enqueue') + q.enqueue(2) + self.assertEqual(q.peek(), 1, 'peek didn\'t return first enqueue') + q.dequeue() + self.assertEqual(q.peek(), 2, 'peek didn\'t return second enqueue') + q.dequeue() + self.assertRaises(IndexError, q.peek) + + def testContains(self): + q = queue() + self.failIf(1 in q, 'empty queue cannot have elements') + q.enqueue(1) + self.failUnless(1 in q, 'recent enqueued element not in q') + q.enqueue(2) + self.failUnless(1 in q, 'original enqueued element not in q') + self.failUnless(2 in q, 'second enqueued element not in q') + q.dequeue() + self.failIf(1 in q, 'dequeued element in q') + self.failUnless(2 in q, 'not dequeued element not in q') + q.dequeue() + self.failIf(2 in q, 'dequeued element in q') + + def testIter(self): + q1 = queue((1, 2, 3)) + q2 = queue() + for i in q1: + q2.enqueue(i) + self.assertEqual(q1, q2, 'iterate didn\'t return all elements') + for _ in queue(): + self.fail('no elements should be in empty queue') + + def testPickleCopy(self): + q = queue(range(10)) + self.assertEqual(q, pickle.loads(pickle.dumps(q))) + + +class MaxLengthQueueTestCase(SupyTestCase): + def testInit(self): + q = MaxLengthQueue(3, (1, 2, 3)) + self.assertEqual(list(q), [1, 2, 3]) + self.assertRaises(TypeError, MaxLengthQueue, 3, 1, 2, 3) + + def testMaxLength(self): + q = MaxLengthQueue(3) + q.enqueue(1) + self.assertEqual(len(q), 1) + q.enqueue(2) + self.assertEqual(len(q), 2) + q.enqueue(3) + self.assertEqual(len(q), 3) + q.enqueue(4) + self.assertEqual(len(q), 3) + self.assertEqual(q.peek(), 2) + q.enqueue(5) + self.assertEqual(len(q), 3) + self.assertEqual(q[0], 3) + + +class TwoWayDictionaryTestCase(SupyTestCase): + def testInit(self): + d = TwoWayDictionary(foo='bar') + self.failUnless('foo' in d) + self.failUnless('bar' in d) + + d = TwoWayDictionary({1: 2}) + self.failUnless(1 in d) + self.failUnless(2 in d) + + def testSetitem(self): + d = TwoWayDictionary() + d['foo'] = 'bar' + self.failUnless('foo' in d) + self.failUnless('bar' in d) + + def testDelitem(self): + d = TwoWayDictionary(foo='bar') + del d['foo'] + self.failIf('foo' in d) + self.failIf('bar' in d) + d = TwoWayDictionary(foo='bar') + del d['bar'] + self.failIf('bar' in d) + self.failIf('foo' in d) + + +class TestTimeoutQueue(SupyTestCase): + def test(self): + q = TimeoutQueue(1) + q.enqueue(1) + self.assertEqual(len(q), 1) + q.enqueue(2) + self.assertEqual(len(q), 2) + q.enqueue(3) + self.assertEqual(len(q), 3) + self.assertEqual(sum(q), 6) + time.sleep(1.1) + self.assertEqual(len(q), 0) + self.assertEqual(sum(q), 0) + + def testCallableTimeout(self): + q = TimeoutQueue(lambda : 1) + q.enqueue(1) + self.assertEqual(len(q), 1) + q.enqueue(2) + self.assertEqual(len(q), 2) + q.enqueue(3) + self.assertEqual(len(q), 3) + self.assertEqual(sum(q), 6) + time.sleep(1.1) + self.assertEqual(len(q), 0) + self.assertEqual(sum(q), 0) + + def testContains(self): + q = TimeoutQueue(1) + q.enqueue(1) + self.failUnless(1 in q) + self.failUnless(1 in q) # For some reason, the second one might fail. + self.failIf(2 in q) + time.sleep(1.1) + self.failIf(1 in q) + + def testReset(self): + q = TimeoutQueue(10) + q.enqueue(1) + self.failUnless(1 in q) + q.reset() + self.failIf(1 in q) + # vim:set shiftwidth=4 tabstop=8 expandtab textwidth=78: