Add unit tests to the HTTP server.

This commit is contained in:
Valentin Lorentz 2011-07-03 16:16:19 +02:00
parent a729a0a4f3
commit f4b81659af
3 changed files with 170 additions and 30 deletions

View File

@ -36,6 +36,8 @@ from threading import Event, Thread
from cStringIO import StringIO
from SocketServer import ThreadingMixIn
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
# For testing purposes
from SocketServer import StreamRequestHandler
import supybot.log as log
import supybot.conf as conf
@ -45,29 +47,10 @@ _ = PluginInternationalization()
configGroup = conf.supybot.servers.http
if world.testing:
class TestHTTPServer(HTTPServer):
"""A fake HTTP server for testing purpose."""
def __init__(self, address, handler):
self.server_address = address
self.RequestHandlerClass = handler
self.socket = StringIO()
self._notServing = Event()
self._notServing.set()
class RequestNotHandled(Exception):
pass
def fileno(self):
return hash(self)
def serve_forever(self, poll_interval=None):
self._notServing.clear()
self._notServing.wait()
def shutdown(self):
self._notServing.set()
HTTPServer = TestHTTPServer
class SupyHTTPServer(HTTPServer):
class RealSupyHTTPServer(HTTPServer):
# TODO: make this configurable
timeout = 0.5
callbacks = {}
@ -82,6 +65,19 @@ class SupyHTTPServer(HTTPServer):
callback.doUnhook(self)
return callback
class TestSupyHTTPServer(RealSupyHTTPServer):
def __init__(self, *args, **kwargs):
pass
def serve_forever(self, *args, **kwargs):
pass
def shutdown(self, *args, **kwargs):
pass
if world.testing:
SupyHTTPServer = TestSupyHTTPServer
else:
SupyHTTPServer = RealSupyHTTPServer
class SupyHTTPRequestHandler(BaseHTTPRequestHandler):
def do_X(self, callbackMethod, *args, **kwargs):
if self.path == '/':
@ -106,6 +102,8 @@ class SupyHTTPRequestHandler(BaseHTTPRequestHandler):
self.do_X('doGet')
def do_POST(self):
if 'Content-Type' not in self.headers:
self.headers['Content-Type'] = 'application/x-www-form-urlencoded'
form = cgi.FieldStorage(
fp=self.rfile,
headers=self.headers,
@ -122,7 +120,6 @@ class SupyHTTPRequestHandler(BaseHTTPRequestHandler):
log.info('HTTP request: %s - %s' %
(self.address_string(), format % args))
class SupyHTTPServerCallback:
"""This is a base class that should be overriden by any plugin that want
to have a Web interface."""

View File

@ -33,8 +33,11 @@ import re
import sys
import time
import shutil
import urllib
import httplib
import unittest
import threading
import StringIO
import supybot.log as log
import supybot.i18n as i18n
@ -49,7 +52,9 @@ import supybot.ircmsgs as ircmsgs
import supybot.registry as registry
import supybot.ircutils as ircutils
import supybot.callbacks as callbacks
import supybot.httpserver as httpserver
i18n.import_conf()
network = True
# This is the global list of suites that are to be run.
@ -134,8 +139,9 @@ class PluginTestCase(SupyTestCase):
SupyTestCase.__init__(self, methodName=methodName)
self.originals = {}
def setUp(self, nick='test'):
if self.__class__ in (PluginTestCase, ChannelPluginTestCase):
def setUp(self, nick='test', forceSetup=False):
if not forceSetup and \
self.__class__ in (PluginTestCase, ChannelPluginTestCase):
# Necessary because there's a test in here that shouldn\'t run.
return
SupyTestCase.setUp(self)
@ -355,7 +361,7 @@ class PluginTestCase(SupyTestCase):
_noTestDoc = ('Admin', 'Channel', 'Config',
'Misc', 'Owner', 'User', 'TestPlugin')
def testDocumentation(self):
def TestDocumentation(self):
if self.__class__ in (PluginTestCase, ChannelPluginTestCase):
return
for cb in self.irc.callbacks:
@ -371,13 +377,14 @@ class PluginTestCase(SupyTestCase):
attr == callbacks.canonicalName(attr):
self.failUnless(getattr(cb, attr, None).__doc__,
'%s.%s has no help.' % (name, attr))
class ChannelPluginTestCase(PluginTestCase):
channel = '#test'
def setUp(self):
if self.__class__ in (PluginTestCase, ChannelPluginTestCase):
def setUp(self, nick='test', forceSetup=False):
if not forceSetup and \
self.__class__ in (PluginTestCase, ChannelPluginTestCase):
return
PluginTestCase.setUp(self)
self.irc.feedMsg(ircmsgs.join(self.channel, prefix=self.prefix))
@ -445,6 +452,142 @@ class ChannelPluginTestCase(PluginTestCase):
frm = self.prefix
self.irc.feedMsg(ircmsgs.privmsg(to, query, prefix=frm))
class TestSupyHTTPServer(httpserver.SupyHTTPServer):
def __init__(self, *args, **kwargs):
pass
def serve_forever(self, *args, **kwargs):
pass
def shutdown(self, *args, **kwargs):
pass
class TestRequestHandler(httpserver.SupyHTTPRequestHandler):
def __init__(self, rfile, wfile, *args, **kwargs):
self._headers_mode = True
self.rfile = rfile
self.wfile = wfile
self.handle_one_request()
def send_response(self, code):
assert self._headers_mode
self._response = code
def send_headers(self, name, value):
assert self._headers_mode
self._headers[name] = value
def end_headers(self):
assert self._headers_mode
self._headers_mode = False
def do_X(self, *args, **kwargs):
assert httpserver.httpServer is not None, \
'The HTTP server is not started.'
self.server = httpserver.httpServer
httpserver.SupyHTTPRequestHandler.do_X(self, *args, **kwargs)
# Partially stolen from the standart Python library :)
def open_http(url, data=None):
"""Use HTTP protocol."""
import httplib
user_passwd = None
proxy_passwd= None
if isinstance(url, str):
host, selector = urllib.splithost(url)
if host:
user_passwd, host = urllib.splituser(host)
host = urllib.unquote(host)
realhost = host
else:
host, selector = url
# check whether the proxy contains authorization information
proxy_passwd, host = urllib.splituser(host)
# now we proceed with the url we want to obtain
urltype, rest = splittype(selector)
url = rest
user_passwd = None
if urltype.lower() != 'http':
realhost = None
else:
realhost, rest = splithost(rest)
if realhost:
user_passwd, realhost = urllib.splituser(realhost)
if user_passwd:
selector = "%s://%s%s" % (urltype, realhost, rest)
if proxy_bypass(realhost):
host = realhost
#print "proxy via http:", host, selector
if not host: raise IOError, ('http error', 'no host given')
if proxy_passwd:
import base64
proxy_auth = base64.b64encode(proxy_passwd).strip()
else:
proxy_auth = None
if user_passwd:
import base64
auth = base64.b64encode(user_passwd).strip()
else:
auth = None
h = HTTP(host)
if data is not None:
h.putrequest('POST', selector)
h.putheader('Content-Type', 'application/x-www-form-urlencoded')
h.putheader('Content-Length', '%d' % len(data))
else:
h.putrequest('GET', selector)
if proxy_auth: h.putheader('Proxy-Authorization', 'Basic %s' % proxy_auth)
if auth: h.putheader('Authorization', 'Basic %s' % auth)
if realhost: h.putheader('Host', realhost)
for args in urllib.URLopener().addheaders: h.putheader(*args)
h.endheaders()
return h
class FakeHTTPConnection(httplib.HTTPConnection):
_data = ''
_headers = {}
def __init__(self, rfile, wfile):
httplib.HTTPConnection.__init__(self, 'localhost')
self.rfile = rfile
self.wfile = wfile
self.connect()
def send(self, data):
self.wfile.write(data)
#def putheader(self, name, value):
# self._headers[name] = value
#def connect(self, *args, **kwargs):
# self.sock = self.wfile
#def getresponse(self, *args, **kwargs):
# pass
class HTTP(httplib.HTTP):
_connection_class = FakeHTTPConnection
class HTTPPluginTestCase(PluginTestCase):
def setUp(self):
PluginTestCase.setUp(self, forceSetup=True)
def request(self, url, method='GET', read=True, data={}):
assert url.startswith('/')
wfile = StringIO.StringIO()
rfile = StringIO.StringIO()
connection = FakeHTTPConnection(wfile, rfile)
connection.putrequest(method, url)
connection.endheaders()
rfile.seek(0)
wfile.seek(0)
handler = TestRequestHandler(rfile, wfile)
if read:
return (handler._response, wfile.read())
else:
return handler._response
def assertHTTPResponse(self, uri, expectedResponse, **kwargs):
response = self.request(uri, read=False, **kwargs)
self.assertEqual(response, expectedResponse)
class ChannelHTTPPluginTestCase(ChannelPluginTestCase, HTTPPluginTestCase):
def setUp(self):
ChannelPluginTestCase.setUp(self, forceSetup=True)
# vim:set shiftwidth=4 softtabstop=4 expandtab textwidth=79:

View File

@ -1,3 +1,3 @@
"""stick the various versioning attributes in here, so we only have to change
them once."""
version = '0.83.4.1+limnoria (2011-07-03T10:46:48+0200)'
version = '0.83.4.1+limnoria (2011-07-03T16:16:19+0200)'