mirror of
https://github.com/Mikaela/Limnoria.git
synced 2024-11-14 22:49:23 +01:00
httpserver: Rewrite without the cgi module
It is removed in Python 3.13
This commit is contained in:
parent
9bcb21389a
commit
0ad61f5791
@ -1,5 +1,5 @@
|
||||
###
|
||||
# Copyright (c) 2011-2021, Valentin Lorentz
|
||||
# Copyright (c) 2011-2024, Valentin Lorentz
|
||||
# All rights reserved.
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without
|
||||
@ -32,8 +32,8 @@ An embedded and centralized HTTP server for Supybot's plugins.
|
||||
"""
|
||||
|
||||
import os
|
||||
import cgi
|
||||
import socket
|
||||
import urllib.parse
|
||||
from threading import Thread
|
||||
|
||||
import supybot.log as log
|
||||
@ -164,6 +164,114 @@ def get_template(filename):
|
||||
with open(path + '.example', 'r') as fd:
|
||||
return fd.read()
|
||||
|
||||
class HttpHeader:
|
||||
__slots__ = ('name', 'value')
|
||||
|
||||
def __init__(self, name, value):
|
||||
self.name = name
|
||||
self.value = value
|
||||
|
||||
def __repr__(self):
|
||||
"""Return printable representation."""
|
||||
return "HttpHeader(%r, %r)" % (self.name, self.value)
|
||||
|
||||
class HttpHeaders:
|
||||
"""Copy of `cgi.FieldStorage
|
||||
<https://github.com/python/cpython/blob/v3.12.3/Lib/cgi.py#L512-L594>`
|
||||
before it was removed from the stdlib.
|
||||
"""
|
||||
__slots__ = ('list',)
|
||||
def __init__(self, headers):
|
||||
self.list = headers
|
||||
|
||||
def __repr__(self):
|
||||
return 'HttpHeaders(%r)' % self.list
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self.keys())
|
||||
|
||||
def __getattr__(self, name):
|
||||
if name != 'value':
|
||||
raise AttributeError(name)
|
||||
if self.file:
|
||||
self.file.seek(0)
|
||||
value = self.file.read()
|
||||
self.file.seek(0)
|
||||
elif self.list is not None:
|
||||
value = self.list
|
||||
else:
|
||||
value = None
|
||||
return value
|
||||
|
||||
def __getitem__(self, key):
|
||||
"""Dictionary style indexing."""
|
||||
if self.list is None:
|
||||
raise TypeError("not indexable")
|
||||
found = []
|
||||
for item in self.list:
|
||||
if item.name == key: found.append(item)
|
||||
if not found:
|
||||
raise KeyError(key)
|
||||
if len(found) == 1:
|
||||
return found[0]
|
||||
else:
|
||||
return found
|
||||
|
||||
def getvalue(self, key, default=None):
|
||||
"""Dictionary style get() method, including 'value' lookup."""
|
||||
if key in self:
|
||||
value = self[key]
|
||||
if isinstance(value, list):
|
||||
return [x.value for x in value]
|
||||
else:
|
||||
return value.value
|
||||
else:
|
||||
return default
|
||||
|
||||
def getfirst(self, key, default=None):
|
||||
""" Return the first value received."""
|
||||
if key in self:
|
||||
value = self[key]
|
||||
if isinstance(value, list):
|
||||
return value[0].value
|
||||
else:
|
||||
return value.value
|
||||
else:
|
||||
return default
|
||||
|
||||
def getlist(self, key):
|
||||
""" Return list of received values."""
|
||||
if key in self:
|
||||
value = self[key]
|
||||
if isinstance(value, list):
|
||||
return [x.value for x in value]
|
||||
else:
|
||||
return [value.value]
|
||||
else:
|
||||
return []
|
||||
|
||||
def keys(self):
|
||||
"""Dictionary style keys() method."""
|
||||
if self.list is None:
|
||||
raise TypeError("not indexable")
|
||||
return list(set(item.name for item in self.list))
|
||||
|
||||
def __contains__(self, key):
|
||||
"""Dictionary style __contains__ method."""
|
||||
if self.list is None:
|
||||
raise TypeError("not indexable")
|
||||
return any(item.name == key for item in self.list)
|
||||
|
||||
def __len__(self):
|
||||
"""Dictionary style len(x) support."""
|
||||
return len(self.keys())
|
||||
|
||||
def __bool__(self):
|
||||
if self.list is None:
|
||||
raise TypeError("Cannot be converted to bool.")
|
||||
return bool(self.list)
|
||||
|
||||
|
||||
class SupyHTTPRequestHandler(BaseHTTPRequestHandler):
|
||||
def do_X(self, callbackMethod, *args, **kwargs):
|
||||
if self.path == '/':
|
||||
@ -199,12 +307,11 @@ class SupyHTTPRequestHandler(BaseHTTPRequestHandler):
|
||||
if 'Content-Type' not in self.headers:
|
||||
self.headers['Content-Type'] = 'application/x-www-form-urlencoded'
|
||||
if self.headers['Content-Type'] == 'application/x-www-form-urlencoded':
|
||||
form = cgi.FieldStorage(
|
||||
fp=self.rfile,
|
||||
headers=self.headers,
|
||||
environ={'REQUEST_METHOD':'POST',
|
||||
'CONTENT_TYPE':self.headers['Content-Type'],
|
||||
})
|
||||
length = min(100000, int(self.headers.get('Content-Length', '100000')))
|
||||
qs = self.rfile.read(length).decode()
|
||||
form = HttpHeaders([
|
||||
HttpHeader(k, v) for (k, v) in urllib.parse.parse_qsl(qs)
|
||||
])
|
||||
else:
|
||||
content_length = int(self.headers.get('Content-Length', '0'))
|
||||
form = self.rfile.read(content_length)
|
||||
|
Loading…
Reference in New Issue
Block a user