mirror of
https://github.com/Mikaela/Limnoria.git
synced 2024-11-16 07:29:23 +01:00
636 lines
24 KiB
Python
636 lines
24 KiB
Python
###
|
||
# Copyright (c) 2002-2005, Jeremiah Fincher
|
||
# Copyright (c) 2011, James McCoy
|
||
# All rights reserved.
|
||
#
|
||
# Redistribution and use in source and binary forms, with or without
|
||
# modification, are permitted provided that the following conditions are met:
|
||
#
|
||
# * Redistributions of source code must retain the above copyright notice,
|
||
# this list of conditions, and the following disclaimer.
|
||
# * Redistributions in binary form must reproduce the above copyright notice,
|
||
# this list of conditions, and the following disclaimer in the
|
||
# documentation and/or other materials provided with the distribution.
|
||
# * Neither the name of the author of this software nor the name of
|
||
# contributors to this software may be used to endorse or promote products
|
||
# derived from this software without specific prior written consent.
|
||
#
|
||
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
||
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
||
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
||
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
||
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
||
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
||
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
||
# POSSIBILITY OF SUCH DAMAGE.
|
||
###
|
||
|
||
import gc
|
||
import os
|
||
import re
|
||
import sys
|
||
import time
|
||
import shutil
|
||
import urllib
|
||
import httplib
|
||
import unittest
|
||
import threading
|
||
import StringIO
|
||
|
||
import supybot.log as log
|
||
import supybot.i18n as i18n
|
||
import supybot.conf as conf
|
||
import supybot.utils as utils
|
||
import supybot.ircdb as ircdb
|
||
import supybot.world as world
|
||
import supybot.irclib as irclib
|
||
import supybot.plugin as plugin
|
||
import supybot.drivers as drivers
|
||
import supybot.ircmsgs as ircmsgs
|
||
import supybot.registry as registry
|
||
import supybot.ircutils as ircutils
|
||
import supybot.callbacks as callbacks
|
||
import supybot.httpserver as httpserver
|
||
|
||
i18n.import_conf()
|
||
network = True
|
||
|
||
# This is the global list of suites that are to be run.
|
||
suites = []
|
||
|
||
timeout = 10
|
||
|
||
originalCallbacksGetHelp = callbacks.getHelp
|
||
lastGetHelp = 'x' * 1000
|
||
def cachingGetHelp(method, name=None, doc=None):
|
||
global lastGetHelp
|
||
lastGetHelp = originalCallbacksGetHelp(method, name, doc)
|
||
return lastGetHelp
|
||
callbacks.getHelp = cachingGetHelp
|
||
|
||
def getTestIrc():
|
||
irc = irclib.Irc('test')
|
||
# Gotta clear the connect messages (USER, NICK, etc.)
|
||
while irc.takeMsg():
|
||
pass
|
||
return irc
|
||
|
||
class TimeoutError(AssertionError):
|
||
def __str__(self):
|
||
return '%r timed out' % self.args[0]
|
||
|
||
class TestPlugin(callbacks.Plugin):
|
||
def eval(self, irc, msg, args):
|
||
"""<text>
|
||
|
||
This is the help for eval. Since Owner doesn't have an eval command
|
||
anymore, we needed to add this so as not to invalidate any of the tests
|
||
that depended on that eval command.
|
||
"""
|
||
try:
|
||
irc.reply(repr(eval(' '.join(args))))
|
||
except callbacks.ArgumentError:
|
||
raise
|
||
except Exception, e:
|
||
irc.reply(utils.exnToString(e))
|
||
# Since we know we don't now need the Irc object, we just give None. This
|
||
# might break if callbacks.Privmsg ever *requires* the Irc object.
|
||
TestInstance = TestPlugin(None)
|
||
conf.registerPlugin('TestPlugin', True, public=False)
|
||
|
||
class SupyTestCase(unittest.TestCase):
|
||
"""This class exists simply for extra logging. It's come in useful in the
|
||
past."""
|
||
def setUp(self):
|
||
log.critical('Beginning test case %s', self.id())
|
||
threads = [t.getName() for t in threading.enumerate()]
|
||
log.critical('Threads: %L', threads)
|
||
unittest.TestCase.setUp(self)
|
||
|
||
def tearDown(self):
|
||
for irc in world.ircs[:]:
|
||
irc._reallyDie()
|
||
|
||
if sys.version_info < (2, 7, 0):
|
||
def assertIn(self, member, container, msg=None):
|
||
"""Just like self.assertTrue(a in b), but with a nicer default message."""
|
||
if member not in container:
|
||
standardMsg = '%s not found in %s' % (safe_repr(member),
|
||
safe_repr(container))
|
||
self.fail(self._formatMessage(msg, standardMsg))
|
||
|
||
def assertNotIn(self, member, container, msg=None):
|
||
"""Just like self.assertTrue(a not in b), but with a nicer default message."""
|
||
if member in container:
|
||
standardMsg = '%s unexpectedly found in %s' % (safe_repr(member),
|
||
safe_repr(container))
|
||
self.fail(self._formatMessage(msg, standardMsg))
|
||
|
||
def assertIs(self, expr1, expr2, msg=None):
|
||
"""Just like self.assertTrue(a is b), but with a nicer default message."""
|
||
if expr1 is not expr2:
|
||
standardMsg = '%s is not %s' % (safe_repr(expr1),
|
||
safe_repr(expr2))
|
||
self.fail(self._formatMessage(msg, standardMsg))
|
||
|
||
def assertIsNot(self, expr1, expr2, msg=None):
|
||
"""Just like self.assertTrue(a is not b), but with a nicer default message."""
|
||
if expr1 is expr2:
|
||
standardMsg = 'unexpectedly identical: %s' % (safe_repr(expr1),)
|
||
self.fail(self._formatMessage(msg, standardMsg))
|
||
|
||
|
||
class PluginTestCase(SupyTestCase):
|
||
"""Subclass this to write a test case for a plugin. See
|
||
plugins/Plugin/test.py for an example.
|
||
"""
|
||
plugins = None
|
||
cleanConfDir = True
|
||
cleanDataDir = True
|
||
config = {}
|
||
def __init__(self, methodName='runTest'):
|
||
self.timeout = timeout
|
||
originalRunTest = getattr(self, methodName)
|
||
def runTest(self):
|
||
run = True
|
||
if hasattr(self, 'irc') and self.irc:
|
||
for cb in self.irc.callbacks:
|
||
cbModule = sys.modules[cb.__class__.__module__]
|
||
if hasattr(cbModule, 'deprecated') and cbModule.deprecated:
|
||
print
|
||
print 'Ignored, %s is deprecated.' % cb.name()
|
||
run = False
|
||
if run:
|
||
originalRunTest()
|
||
runTest = utils.python.changeFunctionName(runTest, methodName)
|
||
setattr(self.__class__, methodName, runTest)
|
||
SupyTestCase.__init__(self, methodName=methodName)
|
||
self.originals = {}
|
||
|
||
def setUp(self, nick='test', forceSetup=False):
|
||
if not forceSetup and \
|
||
self.__class__ in (PluginTestCase, ChannelPluginTestCase):
|
||
# Necessary because there's a test in here that shouldn\'t run.
|
||
return
|
||
SupyTestCase.setUp(self)
|
||
# Just in case, let's do this. Too many people forget to call their
|
||
# super methods.
|
||
for irc in world.ircs[:]:
|
||
irc._reallyDie()
|
||
# Set conf variables appropriately.
|
||
conf.supybot.reply.whenAddressedBy.chars.setValue('@')
|
||
conf.supybot.reply.error.detailed.setValue(True)
|
||
conf.supybot.reply.whenNotCommand.setValue(True)
|
||
self.myVerbose = world.myVerbose
|
||
def rmFiles(dir):
|
||
for filename in os.listdir(dir):
|
||
file = os.path.join(dir, filename)
|
||
if os.path.isfile(file):
|
||
os.remove(file)
|
||
else:
|
||
shutil.rmtree(file)
|
||
if self.cleanConfDir:
|
||
rmFiles(conf.supybot.directories.conf())
|
||
if self.cleanDataDir:
|
||
rmFiles(conf.supybot.directories.data())
|
||
ircdb.users.reload()
|
||
ircdb.ignores.reload()
|
||
ircdb.channels.reload()
|
||
if self.plugins is None:
|
||
raise ValueError, 'PluginTestCase must have a "plugins" attribute.'
|
||
self.nick = nick
|
||
self.prefix = ircutils.joinHostmask(nick, 'user', 'host.domain.tld')
|
||
self.irc = getTestIrc()
|
||
MiscModule = plugin.loadPluginModule('Misc')
|
||
OwnerModule = plugin.loadPluginModule('Owner')
|
||
ConfigModule = plugin.loadPluginModule('Config')
|
||
_ = plugin.loadPluginClass(self.irc, MiscModule)
|
||
_ = plugin.loadPluginClass(self.irc, OwnerModule)
|
||
_ = plugin.loadPluginClass(self.irc, ConfigModule)
|
||
if isinstance(self.plugins, str):
|
||
self.plugins = [self.plugins]
|
||
else:
|
||
for name in self.plugins:
|
||
if name not in ('Owner', 'Misc', 'Config'):
|
||
module = plugin.loadPluginModule(name,
|
||
ignoreDeprecation=True)
|
||
cb = plugin.loadPluginClass(self.irc, module)
|
||
self.irc.addCallback(TestInstance)
|
||
for (name, value) in self.config.iteritems():
|
||
group = conf.supybot
|
||
parts = registry.split(name)
|
||
if parts[0] == 'supybot':
|
||
parts.pop(0)
|
||
for part in parts:
|
||
group = group.get(part)
|
||
self.originals[group] = group()
|
||
group.setValue(value)
|
||
|
||
def tearDown(self):
|
||
if self.__class__ in (PluginTestCase, ChannelPluginTestCase):
|
||
# Necessary because there's a test in here that shouldn\'t run.
|
||
return
|
||
for (group, original) in self.originals.iteritems():
|
||
group.setValue(original)
|
||
ircdb.users.close()
|
||
ircdb.ignores.close()
|
||
ircdb.channels.close()
|
||
SupyTestCase.tearDown(self)
|
||
self.irc = None
|
||
gc.collect()
|
||
|
||
def _feedMsg(self, query, timeout=None, to=None, frm=None,
|
||
usePrefixChar=True):
|
||
if to is None:
|
||
to = self.irc.nick
|
||
if frm is None:
|
||
frm = self.prefix
|
||
if timeout is None:
|
||
timeout = self.timeout
|
||
if self.myVerbose:
|
||
print # Extra newline, so it's pretty.
|
||
prefixChars = conf.supybot.reply.whenAddressedBy.chars()
|
||
if not usePrefixChar and query[0] in prefixChars:
|
||
query = query[1:]
|
||
if sys.version_info[0] < 3:
|
||
query = query.encode('utf8') # unicode->str
|
||
msg = ircmsgs.privmsg(to, query, prefix=frm)
|
||
if self.myVerbose:
|
||
print 'Feeding: %r' % msg
|
||
self.irc.feedMsg(msg)
|
||
fed = time.time()
|
||
response = self.irc.takeMsg()
|
||
while response is None and time.time() - fed < timeout:
|
||
time.sleep(0.1) # So it doesn't suck up 100% cpu.
|
||
drivers.run()
|
||
response = self.irc.takeMsg()
|
||
if self.myVerbose:
|
||
print 'Response: %r' % response
|
||
return response
|
||
|
||
def getMsg(self, query, **kwargs):
|
||
return self._feedMsg(query, **kwargs)
|
||
|
||
def feedMsg(self, query, to=None, frm=None):
|
||
"""Just feeds it a message, that's all."""
|
||
if to is None:
|
||
to = self.irc.nick
|
||
if frm is None:
|
||
frm = self.prefix
|
||
self.irc.feedMsg(ircmsgs.privmsg(to, query, prefix=frm))
|
||
|
||
# These assertError/assertNoError are somewhat fragile. The proper way to
|
||
# do them would be to use a proxy for the irc object and intercept .error.
|
||
# But that would be hard, so I don't bother. When this breaks, it'll get
|
||
# fixed, but not until then.
|
||
def assertError(self, query, **kwargs):
|
||
m = self._feedMsg(query, **kwargs)
|
||
if m is None:
|
||
raise TimeoutError, query
|
||
if lastGetHelp not in m.args[1]:
|
||
self.failUnless(m.args[1].startswith('Error:'),
|
||
'%r did not error: %s' % (query, m.args[1]))
|
||
return m
|
||
|
||
def assertSnarfError(self, query, **kwargs):
|
||
return self.assertError(query, usePrefixChar=False, **kwargs)
|
||
|
||
def assertNotError(self, query, **kwargs):
|
||
m = self._feedMsg(query, **kwargs)
|
||
if m is None:
|
||
raise TimeoutError, query
|
||
self.failIf(m.args[1].startswith('Error:'),
|
||
'%r errored: %s' % (query, m.args[1]))
|
||
self.failIf(lastGetHelp in m.args[1],
|
||
'%r returned the help string.' % query)
|
||
return m
|
||
|
||
def assertSnarfNotError(self, query, **kwargs):
|
||
return self.assertNotError(query, usePrefixChar=False, **kwargs)
|
||
|
||
def assertHelp(self, query, **kwargs):
|
||
m = self._feedMsg(query, **kwargs)
|
||
if m is None:
|
||
raise TimeoutError, query
|
||
msg = m.args[1]
|
||
if 'more message' in msg:
|
||
msg = msg[0:-27] # Strip (XXX more messages)
|
||
self.failUnless(msg in lastGetHelp,
|
||
'%s is not the help (%s)' % (m.args[1], lastGetHelp))
|
||
return m
|
||
|
||
def assertNoResponse(self, query, timeout=0, **kwargs):
|
||
m = self._feedMsg(query, timeout=timeout, **kwargs)
|
||
self.failIf(m, 'Unexpected response: %r' % m)
|
||
return m
|
||
|
||
def assertSnarfNoResponse(self, query, timeout=0, **kwargs):
|
||
return self.assertNoResponse(query, timeout=timeout,
|
||
usePrefixChar=False, **kwargs)
|
||
|
||
def assertResponse(self, query, expectedResponse, **kwargs):
|
||
m = self._feedMsg(query, **kwargs)
|
||
if m is None:
|
||
raise TimeoutError, query
|
||
self.assertEqual(m.args[1], expectedResponse,
|
||
'%r != %r' % (expectedResponse, m.args[1]))
|
||
return m
|
||
|
||
def assertSnarfResponse(self, query, expectedResponse, **kwargs):
|
||
return self.assertResponse(query, expectedResponse,
|
||
usePrefixChar=False, **kwargs)
|
||
|
||
def assertRegexp(self, query, regexp, flags=re.I, **kwargs):
|
||
m = self._feedMsg(query, **kwargs)
|
||
if m is None:
|
||
raise TimeoutError, query
|
||
self.failUnless(re.search(regexp, m.args[1], flags),
|
||
'%r does not match %r' % (m.args[1], regexp))
|
||
return m
|
||
|
||
def assertSnarfRegexp(self, query, regexp, flags=re.I, **kwargs):
|
||
return self.assertRegexp(query, regexp, flags=re.I,
|
||
usePrefixChar=False, **kwargs)
|
||
|
||
def assertNotRegexp(self, query, regexp, flags=re.I, **kwargs):
|
||
m = self._feedMsg(query, **kwargs)
|
||
if m is None:
|
||
raise TimeoutError, query
|
||
self.failUnless(re.search(regexp, m.args[1], flags) is None,
|
||
'%r matched %r' % (m.args[1], regexp))
|
||
return m
|
||
|
||
def assertSnarfNotRegexp(self, query, regexp, flags=re.I, **kwargs):
|
||
return self.assertNotRegexp(query, regexp, flags=re.I,
|
||
usePrefixChar=False, **kwargs)
|
||
|
||
def assertAction(self, query, expectedResponse=None, **kwargs):
|
||
m = self._feedMsg(query, **kwargs)
|
||
if m is None:
|
||
raise TimeoutError, query
|
||
self.failUnless(ircmsgs.isAction(m), '%r is not an action.' % m)
|
||
if expectedResponse is not None:
|
||
s = ircmsgs.unAction(m)
|
||
self.assertEqual(s, expectedResponse,
|
||
'%r != %r' % (s, expectedResponse))
|
||
return m
|
||
|
||
def assertSnarfAction(self, query, expectedResponse=None, **kwargs):
|
||
return self.assertAction(query, expectedResponse=None,
|
||
usePrefixChar=False, **kwargs)
|
||
|
||
def assertActionRegexp(self, query, regexp, flags=re.I, **kwargs):
|
||
m = self._feedMsg(query, **kwargs)
|
||
if m is None:
|
||
raise TimeoutError, query
|
||
self.failUnless(ircmsgs.isAction(m))
|
||
s = ircmsgs.unAction(m)
|
||
self.failUnless(re.search(regexp, s, flags),
|
||
'%r does not match %r' % (s, regexp))
|
||
|
||
def assertSnarfActionRegexp(self, query, regexp, flags=re.I, **kwargs):
|
||
return self.assertActionRegexp(query, regexp, flags=re.I,
|
||
usePrefixChar=False, **kwargs)
|
||
|
||
_noTestDoc = ('Admin', 'Channel', 'Config',
|
||
'Misc', 'Owner', 'User', 'TestPlugin')
|
||
def TestDocumentation(self):
|
||
if self.__class__ in (PluginTestCase, ChannelPluginTestCase):
|
||
return
|
||
for cb in self.irc.callbacks:
|
||
name = cb.name()
|
||
if ((name in self._noTestDoc) and \
|
||
not name.lower() in self.__class__.__name__.lower()):
|
||
continue
|
||
self.failUnless(sys.modules[cb.__class__.__name__].__doc__,
|
||
'%s has no module documentation.' % name)
|
||
if hasattr(cb, 'isCommandMethod'):
|
||
for attr in dir(cb):
|
||
if cb.isCommandMethod(attr) and \
|
||
attr == callbacks.canonicalName(attr):
|
||
self.failUnless(getattr(cb, attr, None).__doc__,
|
||
'%s.%s has no help.' % (name, attr))
|
||
|
||
|
||
|
||
class ChannelPluginTestCase(PluginTestCase):
|
||
channel = '#test'
|
||
def setUp(self, nick='test', forceSetup=False):
|
||
if not forceSetup and \
|
||
self.__class__ in (PluginTestCase, ChannelPluginTestCase):
|
||
return
|
||
PluginTestCase.setUp(self)
|
||
self.irc.feedMsg(ircmsgs.join(self.channel, prefix=self.prefix))
|
||
m = self.irc.takeMsg()
|
||
self.failIf(m is None, 'No message back from joining channel.')
|
||
self.assertEqual(m.command, 'MODE')
|
||
m = self.irc.takeMsg()
|
||
self.failIf(m is None, 'No message back from joining channel.')
|
||
self.assertEqual(m.command, 'MODE')
|
||
m = self.irc.takeMsg()
|
||
self.failIf(m is None, 'No message back from joining channel.')
|
||
self.assertEqual(m.command, 'WHO')
|
||
|
||
def _feedMsg(self, query, timeout=None, to=None, frm=None, private=False,
|
||
usePrefixChar=True):
|
||
if to is None:
|
||
if private:
|
||
to = self.irc.nick
|
||
else:
|
||
to = self.channel
|
||
if frm is None:
|
||
frm = self.prefix
|
||
if timeout is None:
|
||
timeout = self.timeout
|
||
if self.myVerbose:
|
||
print # Newline, just like PluginTestCase.
|
||
prefixChars = conf.supybot.reply.whenAddressedBy.chars()
|
||
if query[0] not in prefixChars and usePrefixChar:
|
||
query = prefixChars[0] + query
|
||
if sys.version_info[0] < 3 and isinstance(query, unicode):
|
||
query = query.encode('utf8') # unicode->str
|
||
msg = ircmsgs.privmsg(to, query, prefix=frm)
|
||
if self.myVerbose:
|
||
print 'Feeding: %r' % msg
|
||
self.irc.feedMsg(msg)
|
||
fed = time.time()
|
||
response = self.irc.takeMsg()
|
||
while response is None and time.time() - fed < timeout:
|
||
time.sleep(0.1)
|
||
drivers.run()
|
||
response = self.irc.takeMsg()
|
||
if response is not None:
|
||
if response.command == 'PRIVMSG':
|
||
args = list(response.args)
|
||
# Strip off nick: at beginning of response.
|
||
if args[1].startswith(self.nick) or \
|
||
args[1].startswith(ircutils.nickFromHostmask(self.prefix)):
|
||
try:
|
||
args[1] = args[1].split(' ', 1)[1]
|
||
except IndexError:
|
||
# Odd. We'll skip this.
|
||
pass
|
||
ret = ircmsgs.privmsg(*args)
|
||
else:
|
||
ret = response
|
||
else:
|
||
ret = None
|
||
if self.myVerbose:
|
||
print 'Returning: %r' % ret
|
||
return ret
|
||
|
||
def feedMsg(self, query, to=None, frm=None, private=False):
|
||
"""Just feeds it a message, that's all."""
|
||
if to is None:
|
||
if private:
|
||
to = self.irc.nick
|
||
else:
|
||
to = self.channel
|
||
if frm is None:
|
||
frm = self.prefix
|
||
self.irc.feedMsg(ircmsgs.privmsg(to, query, prefix=frm))
|
||
|
||
class TestSupyHTTPServer(httpserver.SupyHTTPServer):
|
||
def __init__(self, *args, **kwargs):
|
||
pass
|
||
def serve_forever(self, *args, **kwargs):
|
||
pass
|
||
def shutdown(self, *args, **kwargs):
|
||
pass
|
||
|
||
class TestRequestHandler(httpserver.SupyHTTPRequestHandler):
|
||
def __init__(self, rfile, wfile, *args, **kwargs):
|
||
self._headers_mode = True
|
||
self.rfile = rfile
|
||
self.wfile = wfile
|
||
self.handle_one_request()
|
||
|
||
def send_response(self, code):
|
||
assert self._headers_mode
|
||
self._response = code
|
||
def send_headers(self, name, value):
|
||
assert self._headers_mode
|
||
self._headers[name] = value
|
||
def end_headers(self):
|
||
assert self._headers_mode
|
||
self._headers_mode = False
|
||
|
||
def do_X(self, *args, **kwargs):
|
||
assert httpserver.http_servers, \
|
||
'The HTTP server is not started.'
|
||
self.server = httpserver.http_servers[0]
|
||
httpserver.SupyHTTPRequestHandler.do_X(self, *args, **kwargs)
|
||
|
||
# Partially stolen from the standart Python library :)
|
||
def open_http(url, data=None):
|
||
"""Use HTTP protocol."""
|
||
import httplib
|
||
user_passwd = None
|
||
proxy_passwd= None
|
||
if isinstance(url, str):
|
||
host, selector = urllib.splithost(url)
|
||
if host:
|
||
user_passwd, host = urllib.splituser(host)
|
||
host = urllib.unquote(host)
|
||
realhost = host
|
||
else:
|
||
host, selector = url
|
||
# check whether the proxy contains authorization information
|
||
proxy_passwd, host = urllib.splituser(host)
|
||
# now we proceed with the url we want to obtain
|
||
urltype, rest = urllib.splittype(selector)
|
||
url = rest
|
||
user_passwd = None
|
||
if urltype.lower() != 'http':
|
||
realhost = None
|
||
else:
|
||
realhost, rest = urllib.splithost(rest)
|
||
if realhost:
|
||
user_passwd, realhost = urllib.splituser(realhost)
|
||
if user_passwd:
|
||
selector = "%s://%s%s" % (urltype, realhost, rest)
|
||
if urllib.proxy_bypass(realhost):
|
||
host = realhost
|
||
|
||
#print "proxy via http:", host, selector
|
||
if not host: raise IOError, ('http error', 'no host given')
|
||
|
||
if proxy_passwd:
|
||
import base64
|
||
proxy_auth = base64.b64encode(proxy_passwd).strip()
|
||
else:
|
||
proxy_auth = None
|
||
|
||
if user_passwd:
|
||
import base64
|
||
auth = base64.b64encode(user_passwd).strip()
|
||
else:
|
||
auth = None
|
||
c = FakeHTTPConnection(host)
|
||
if data is not None:
|
||
c.putrequest('POST', selector)
|
||
c.putheader('Content-Type', 'application/x-www-form-urlencoded')
|
||
c.putheader('Content-Length', '%d' % len(data))
|
||
else:
|
||
c.putrequest('GET', selector)
|
||
if proxy_auth: c.putheader('Proxy-Authorization', 'Basic %s' % proxy_auth)
|
||
if auth: c.putheader('Authorization', 'Basic %s' % auth)
|
||
if realhost: c.putheader('Host', realhost)
|
||
for args in urllib.URLopener().addheaders: c.putheader(*args)
|
||
c.endheaders()
|
||
return c
|
||
|
||
class FakeHTTPConnection(httplib.HTTPConnection):
|
||
_data = ''
|
||
_headers = {}
|
||
def __init__(self, rfile, wfile):
|
||
httplib.HTTPConnection.__init__(self, 'localhost')
|
||
self.rfile = rfile
|
||
self.wfile = wfile
|
||
self.connect()
|
||
def send(self, data):
|
||
self.wfile.write(data)
|
||
#def putheader(self, name, value):
|
||
# self._headers[name] = value
|
||
#def connect(self, *args, **kwargs):
|
||
# self.sock = self.wfile
|
||
#def getresponse(self, *args, **kwargs):
|
||
# pass
|
||
|
||
class HTTPPluginTestCase(PluginTestCase):
|
||
def setUp(self):
|
||
PluginTestCase.setUp(self, forceSetup=True)
|
||
|
||
def request(self, url, method='GET', read=True, data={}):
|
||
assert url.startswith('/')
|
||
wfile = StringIO.StringIO()
|
||
rfile = StringIO.StringIO()
|
||
connection = FakeHTTPConnection(wfile, rfile)
|
||
connection.putrequest(method, url)
|
||
connection.endheaders()
|
||
rfile.seek(0)
|
||
wfile.seek(0)
|
||
handler = TestRequestHandler(rfile, wfile)
|
||
if read:
|
||
return (handler._response, wfile.read())
|
||
else:
|
||
return handler._response
|
||
|
||
def assertHTTPResponse(self, uri, expectedResponse, **kwargs):
|
||
response = self.request(uri, read=False, **kwargs)
|
||
self.assertEqual(response, expectedResponse)
|
||
|
||
def assertNotHTTPResponse(self, uri, expectedResponse, **kwargs):
|
||
response = self.request(uri, read=False, **kwargs)
|
||
self.assertNotEqual(response, expectedResponse)
|
||
|
||
class ChannelHTTPPluginTestCase(ChannelPluginTestCase, HTTPPluginTestCase):
|
||
def setUp(self):
|
||
ChannelPluginTestCase.setUp(self, forceSetup=True)
|
||
|
||
# vim:set shiftwidth=4 softtabstop=4 expandtab textwidth=79:
|
||
|