mirror of
https://github.com/Mikaela/Limnoria.git
synced 2025-01-11 12:42:34 +01:00
Removed assertRegexps and assertResponses, added RFE #836316 (target keyword) and frm keyword.
This commit is contained in:
parent
bddac5c382
commit
e3661e3ba5
@ -146,12 +146,16 @@ class PluginTestCase(unittest.TestCase):
|
||||
self.irc.die()
|
||||
gc.collect()
|
||||
|
||||
def _feedMsg(self, query, timeout=None):
|
||||
def _feedMsg(self, query, timeout=None, to=None, frm=None):
|
||||
if to is None:
|
||||
to = self.irc.nick
|
||||
if frm is None:
|
||||
frm = self.prefix
|
||||
if timeout is None:
|
||||
timeout = self.timeout
|
||||
if self.myVerbose:
|
||||
print # Extra newline, so it's pretty.
|
||||
msg = ircmsgs.privmsg(self.irc.nick, query, prefix=self.prefix)
|
||||
msg = ircmsgs.privmsg(to, query, prefix=frm)
|
||||
if self.myVerbose:
|
||||
print 'Feeding: %r' % msg
|
||||
self.irc.feedMsg(msg)
|
||||
@ -168,99 +172,85 @@ class PluginTestCase(unittest.TestCase):
|
||||
def getMsg(self, query, timeout=None):
|
||||
return self._feedMsg(query, timeout=timeout)
|
||||
|
||||
def feedMsg(self, query):
|
||||
def feedMsg(self, query, to=None, frm=None):
|
||||
"""Just feeds it a message, that's all."""
|
||||
self.irc.feedMsg(ircmsgs.privmsg(self.irc.nick, query,
|
||||
prefix=self.prefix))
|
||||
if to is None:
|
||||
to = self.irc.nick
|
||||
if frm is None:
|
||||
frm = self.prefix
|
||||
self.irc.feedMsg(ircmsgs.privmsg(to, query, prefix=frm))
|
||||
|
||||
# These assertError/assertNoError are somewhat fragile. The proper way to
|
||||
# do them would be to use a proxy for the irc object and intercept .error.
|
||||
# But that would be hard, so I don't bother. When this breaks, it'll get
|
||||
# fixed, but not until then.
|
||||
def assertError(self, query):
|
||||
m = self._feedMsg(query)
|
||||
def assertError(self, query, to=None, frm=None):
|
||||
m = self._feedMsg(query, to=to, frm=frm)
|
||||
if m is None:
|
||||
raise TimeoutError, query
|
||||
if lastGetHelp not in m.args[1]:
|
||||
self.failUnless(m.args[1].startswith('Error:'),
|
||||
'%r did not error: %s' % (query, m.args[1]))
|
||||
return m
|
||||
|
||||
def assertNotError(self, query):
|
||||
m = self._feedMsg(query)
|
||||
def assertNotError(self, query, to=None, frm=None):
|
||||
m = self._feedMsg(query, to=to, frm=frm)
|
||||
if m is None:
|
||||
raise TimeoutError, query
|
||||
self.failIf(m.args[1].startswith('Error:'),
|
||||
'%r errored: %s' % (query, m.args[1]))
|
||||
self.failIf(lastGetHelp in m.args[1],
|
||||
'%r returned the help string.' % query)
|
||||
return m
|
||||
|
||||
def assertHelp(self, query):
|
||||
m = self._feedMsg(query)
|
||||
def assertHelp(self, query, to=None, frm=None):
|
||||
m = self._feedMsg(query, to=to, frm=frm)
|
||||
if m is None:
|
||||
raise TimeoutError, query
|
||||
self.failUnless(lastGetHelp in m.args[1],
|
||||
'%s is not the help (%s)' % (m.args[1], lastGetHelp))
|
||||
return m
|
||||
|
||||
def assertNoResponse(self, query, timeout=None):
|
||||
m = self._feedMsg(query, timeout)
|
||||
def assertNoResponse(self, query, timeout=None, to=None, frm=None):
|
||||
m = self._feedMsg(query, timeout=timeout, to=to, frm=frm)
|
||||
self.failIf(m, 'Unexpected response: %r' % m)
|
||||
return m
|
||||
|
||||
def assertResponse(self, query, expectedResponse):
|
||||
m = self._feedMsg(query)
|
||||
def assertResponse(self, query, expectedResponse, to=None, frm=None):
|
||||
m = self._feedMsg(query, to=to, frm=frm)
|
||||
if m is None:
|
||||
raise TimeoutError, query
|
||||
self.assertEqual(m.args[1], expectedResponse,
|
||||
'%r != %r' % (expectedResponse, m.args[1]))
|
||||
return m
|
||||
|
||||
def assertRegexp(self, query, regexp, flags=re.I):
|
||||
m = self._feedMsg(query)
|
||||
def assertRegexp(self, query, regexp, flags=re.I, to=None, frm=None):
|
||||
m = self._feedMsg(query, to=to, frm=frm)
|
||||
if m is None:
|
||||
raise TimeoutError, query
|
||||
self.failUnless(re.search(regexp, m.args[1], flags),
|
||||
'%r does not match %r' % (m.args[1], regexp))
|
||||
return m
|
||||
|
||||
def assertNotRegexp(self, query, regexp, flags=re.I):
|
||||
m = self._feedMsg(query)
|
||||
def assertNotRegexp(self, query, regexp, flags=re.I, to=None, frm=None):
|
||||
m = self._feedMsg(query, to=to, frm=frm)
|
||||
if m is None:
|
||||
raise TimeoutError, query
|
||||
self.failUnless(re.search(regexp, m.args[1], flags) is None,
|
||||
'%r matched %r' % (m.args[1], regexp))
|
||||
return m
|
||||
|
||||
def assertRegexps(self, query, regexps):
|
||||
started = time.time()
|
||||
total = len(regexps)*self.timeout
|
||||
while regexps and time.time() - started < total:
|
||||
m = self._feedMsg(query)
|
||||
if m is None:
|
||||
raise TimeoutError, query
|
||||
regexp = regexps.pop(0)
|
||||
self.failUnless(re.search(regexp, m.args[1]),
|
||||
'%r does not match %r' % (m.args[1], regexp))
|
||||
self.failIf(time.time() - started > total)
|
||||
|
||||
def assertResponses(self, query, expectedResponses):
|
||||
responses = []
|
||||
started = time.time()
|
||||
while len(responses) < len(expectedResponses) and \
|
||||
time.time() - started > len(expectedResponses)*self.timeout:
|
||||
m = self._feedMsg(query)
|
||||
if m is None:
|
||||
raise TimeoutError, query
|
||||
responses.append(m)
|
||||
self.assertEqual(len(expectedResponses), len(responses))
|
||||
for (m, expected) in zip(responses, expectedResponses):
|
||||
self.assertEqual(m.args[1], expected)
|
||||
|
||||
def assertAction(self, query, expectedResponse=None):
|
||||
m = self._feedMsg(query)
|
||||
def assertAction(self, query, expectedResponse=None, to=None, frm=None):
|
||||
m = self._feedMsg(query, to=to, frm=frm)
|
||||
if m is None:
|
||||
raise TimeoutError, query
|
||||
self.failUnless(ircmsgs.isAction(m))
|
||||
if expectedResponse is not None:
|
||||
self.assertEqual(ircmsgs.unAction(m), expectedResponse)
|
||||
return m
|
||||
|
||||
def assertActionRegexp(self, query, regexp, flags=re.I):
|
||||
m = self._feedMsg(query)
|
||||
def assertActionRegexp(self, query, regexp, flags=re.I, to=None, frm=None):
|
||||
m = self._feedMsg(query, to=to, frm=frm)
|
||||
if m is None:
|
||||
raise TimeoutError, query
|
||||
self.failUnless(ircmsgs.isAction(m))
|
||||
@ -294,14 +284,18 @@ class ChannelPluginTestCase(PluginTestCase):
|
||||
PluginTestCase.setUp(self)
|
||||
self.irc.feedMsg(ircmsgs.join(self.channel, prefix=self.prefix))
|
||||
|
||||
def _feedMsg(self, query, timeout=None):
|
||||
def _feedMsg(self, query, timeout=None, to=None, frm=None):
|
||||
if to is None:
|
||||
to = self.channel
|
||||
if frm is None:
|
||||
frm = self.prefix
|
||||
if timeout is None:
|
||||
timeout = self.timeout
|
||||
if self.myVerbose:
|
||||
print # Newline, just like PluginTestCase.
|
||||
if query[0] not in conf.prefixChars:
|
||||
query = conf.prefixChars[0] + query
|
||||
msg = ircmsgs.privmsg(self.channel, query, prefix=self.prefix)
|
||||
msg = ircmsgs.privmsg(to, query, prefix=frm)
|
||||
if self.myVerbose:
|
||||
print 'Feeding: %r' % msg
|
||||
self.irc.feedMsg(msg)
|
||||
|
Loading…
Reference in New Issue
Block a user