320 lines
12 KiB
Python
320 lines
12 KiB
Python
import socket
|
|
from ure import compile
|
|
|
|
class MicroWebSrvRoute:
|
|
def __init__(self, route, method, func, routeArgNames, routeRegex):
|
|
self.route = route
|
|
self.method = method
|
|
self.func = func
|
|
self.routeArgNames = routeArgNames
|
|
self.routeRegex = routeRegex
|
|
|
|
class MicroWebSrv:
|
|
_html_escape_chars = {
|
|
"&": "&",
|
|
'"': """,
|
|
"'": "'",
|
|
">": ">",
|
|
"<": "<"
|
|
}
|
|
_decoratedRouteHandlers = []
|
|
|
|
@classmethod
|
|
def route(cls, url, method='GET'):
|
|
def route_decorator(func):
|
|
item = (url, method, func)
|
|
cls._decoratedRouteHandlers.append(item)
|
|
return func
|
|
return route_decorator
|
|
|
|
@staticmethod
|
|
def HTMLEscape(s):
|
|
return ''.join(MicroWebSrv._html_escape_chars.get(c, c) for c in s)
|
|
|
|
@staticmethod
|
|
def _tryAllocByteArray(size):
|
|
for x in range(10):
|
|
try:
|
|
return bytearray(size)
|
|
except:
|
|
pass
|
|
return None
|
|
|
|
@staticmethod
|
|
def _unquote(s):
|
|
r = s.split('%')
|
|
for i in range(1, len(r)):
|
|
s = r[i]
|
|
try:
|
|
r[i] = chr(int(s[:2], 16)) + s[2:]
|
|
except:
|
|
r[i] = '%' + s
|
|
return ''.join(r)
|
|
|
|
@staticmethod
|
|
def _unquote_plus(s):
|
|
return MicroWebSrv._unquote(s.replace('+', ' '))
|
|
|
|
def __init__(self,
|
|
routeHandlers=[],
|
|
port=80,
|
|
bindIP='0.0.0.0'):
|
|
|
|
self._srvAddr = (bindIP, port)
|
|
self._started = False
|
|
|
|
self._routeHandlers = []
|
|
routeHandlers += self._decoratedRouteHandlers
|
|
for route, method, func in routeHandlers:
|
|
routeParts = route.split('/')
|
|
routeArgNames = []
|
|
routeRegex = ''
|
|
for s in routeParts:
|
|
if s.startswith('<') and s.endswith('>'):
|
|
routeArgNames.append(s[1:-1])
|
|
routeRegex += '/(\\w*)'
|
|
elif s:
|
|
routeRegex += '/' + s
|
|
routeRegex += '$'
|
|
routeRegex = compile(routeRegex)
|
|
self._routeHandlers.append(MicroWebSrvRoute(route, method, func, routeArgNames, routeRegex))
|
|
|
|
def _serverProcess(self):
|
|
self._started = True
|
|
while True:
|
|
try:
|
|
client, cliAddr = self._server.accepted()
|
|
if client == None: continue
|
|
except Exception as e:
|
|
print(e)
|
|
break
|
|
self._client(self, client, cliAddr)
|
|
self._started = False
|
|
|
|
def Start(self):
|
|
if self._started: return
|
|
self._server = socket.socket(socket.AF_INET,
|
|
socket.SOCK_STREAM,
|
|
socket.IPPROTO_TCP)
|
|
self._server.setsockopt(socket.SOL_SOCKET,
|
|
socket.SO_REUSEADDR,
|
|
1)
|
|
self._server.bind(self._srvAddr)
|
|
self._server.listen(1)
|
|
self._server.settimeout(0.5)
|
|
self._serverProcess()
|
|
|
|
def Stop(self):
|
|
if not self._started: return
|
|
self._server.close()
|
|
|
|
def GetRouteHandler(self, resUrl, method):
|
|
if not self._routeHandlers:
|
|
return (None, None)
|
|
if resUrl.endswith('/'):
|
|
resUrl = resUrl[:-1]
|
|
method = method.upper()
|
|
for rh in self._routeHandlers:
|
|
if rh.method == method:
|
|
m = rh.routeRegex.match(resUrl)
|
|
if m and rh.routeArgNames:
|
|
routeArgs = {}
|
|
for i, name in enumerate(rh.routeArgNames):
|
|
value = m.group(i + 1)
|
|
try: value = int(value)
|
|
except: pass
|
|
routeArgs[name] = value
|
|
return (rh.func, routeArgs)
|
|
elif m: return (rh.func, None)
|
|
return (None, None)
|
|
|
|
class _client:
|
|
def __init__(self, microWebSrv, socket, addr):
|
|
socket.settimeout(2)
|
|
self._microWebSrv = microWebSrv
|
|
self._socket = socket
|
|
self._addr = addr
|
|
self._method = None
|
|
self._path = None
|
|
self._httpVer = None
|
|
self._resPath = "/"
|
|
self._queryString = ""
|
|
self._queryParams = {}
|
|
self._headers = {}
|
|
self._contentType = None
|
|
self._contentLength = 0
|
|
|
|
self._processRequest()
|
|
|
|
def _processRequest(self):
|
|
try:
|
|
response = MicroWebSrv._response(self)
|
|
if self._parseFirstLine(response) and self._parseHeader(response):
|
|
routeHandler, routeArgs = self._microWebSrv.GetRouteHandler(self._resPath, self._method)
|
|
if routeHandler and routeArgs is not None:
|
|
routeHandler(self, response, routeArgs)
|
|
elif routeHandler:
|
|
routeHandler(self, response)
|
|
else: response.WriteResponseNotFound()
|
|
except: response.WriteResponseInternalServerError()
|
|
try: self._socket.close()
|
|
except: pass
|
|
|
|
def _parseFirstLine(self, response):
|
|
try:
|
|
elements = self._socket.readline().decode().strip().split()
|
|
if len(elements) != 3: return False
|
|
self._method = elements[0].upper()
|
|
self._path = elements[1]
|
|
self._httpVer = elements[2].upper()
|
|
elements = self._path.split('?', 1)
|
|
if len(elements) < 1: return True
|
|
self._resPath = MicroWebSrv._unquote_plus(elements[0])
|
|
if len(elements) < 2: return True
|
|
self._queryString = elements[1]
|
|
elements = self._queryString.split('&')
|
|
for s in elements:
|
|
param = s.split('=', 1)
|
|
if len(param) > 0:
|
|
value = MicroWebSrv._unquote(param[1]) if len(param) > 1 else ''
|
|
self._queryParams[MicroWebSrv._unquote(param[0])] = value
|
|
return True
|
|
except:
|
|
pass
|
|
return False
|
|
|
|
def _parseHeader(self, response):
|
|
while True:
|
|
elements = self._socket.readline().decode().strip().split(':', 1)
|
|
if len(elements) == 2:
|
|
self._headers[elements[0].strip()] = elements[1].strip()
|
|
elif len(elements) == 1 and len(elements[0]) == 0:
|
|
if self._method == 'POST':
|
|
self._contentType = self._headers.get("Content-Type", None)
|
|
self._contentLength = int(self._headers.get("Content-Length", 0))
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def ReadRequestContent(self, size=None):
|
|
self._socket.setblocking(False)
|
|
b = None
|
|
try:
|
|
if not size:
|
|
b = self._socket.read(self._contentLength)
|
|
elif size > 0:
|
|
b = self._socket.read(size)
|
|
except:
|
|
pass
|
|
self._socket.setblocking(True)
|
|
return b if b else b''
|
|
|
|
def ReadRequestPostedFormData(self):
|
|
res = {}
|
|
data = self.ReadRequestContent()
|
|
if len(data) < 1: return res
|
|
elements = data.decode().split('&')
|
|
for s in elements:
|
|
param = s.split('=', 1)
|
|
if len(param) < 1: next
|
|
value = MicroWebSrv._unquote(param[1]) if len(param) > 1 else ''
|
|
res[MicroWebSrv._unquote(param[0])] = value
|
|
return res
|
|
|
|
class _response:
|
|
def __init__(self, client):
|
|
self._client = client
|
|
|
|
def _write(self, data):
|
|
if type(data) == str:
|
|
data = data.encode()
|
|
return self._client._socket.write(data)
|
|
|
|
def _writeFirstLine(self, code):
|
|
reason = self._responseCodes.get(code, ('Unknown reason',))[0]
|
|
self._write("HTTP/1.1 %s %s\r\n" % (code, reason))
|
|
|
|
def _writeHeader(self, name, value):
|
|
self._write("%s: %s\r\n" % (name, value))
|
|
|
|
def _writeContentTypeHeader(self, contentType, charset=None):
|
|
if contentType:
|
|
ct = contentType \
|
|
+ (("; charset=%s" % charset) if charset else "")
|
|
else:
|
|
ct = "application/octet-stream"
|
|
self._writeHeader("Content-Type", ct)
|
|
|
|
def _writeServerHeader(self):
|
|
self._writeHeader("Server", "MicroWebSrv lite")
|
|
|
|
def _writeEndHeader(self):
|
|
self._write("\r\n")
|
|
|
|
def _writeBeforeContent(self, code, headers, contentType, contentCharset, contentLength):
|
|
self._writeFirstLine(code)
|
|
if isinstance(headers, dict):
|
|
for header in headers:
|
|
self._writeHeader(header, headers[header])
|
|
if contentLength > 0:
|
|
self._writeContentTypeHeader(contentType, contentCharset)
|
|
self._writeHeader("Content-Length", contentLength)
|
|
self._writeServerHeader()
|
|
self._writeHeader("Connection", "close")
|
|
self._writeEndHeader()
|
|
|
|
def WriteResponse(self, code, headers, contentType, contentCharset, content):
|
|
try:
|
|
contentLength = len(content) if content else 0
|
|
self._writeBeforeContent(code, headers, contentType, contentCharset, contentLength)
|
|
if contentLength > 0:
|
|
self._write(content)
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
def WriteResponseOk(self, headers=None, contentType=None, contentCharset=None, content=None):
|
|
return self.WriteResponse(200, headers, contentType, contentCharset, content)
|
|
|
|
def WriteResponseRedirect(self, location):
|
|
headers = {"Location": location}
|
|
return self.WriteResponse(302, headers, None, None, None)
|
|
|
|
def WriteResponseError(self, code):
|
|
responseCode = self._responseCodes.get(code, ('Unknown reason', ''))
|
|
return self.WriteResponse(code,
|
|
None,
|
|
"text/html",
|
|
"UTF-8",
|
|
'{"code":"%s","reason":"%s","message":"%s"}' % (
|
|
code, responseCode[0], responseCode[1]))
|
|
|
|
def WriteResponseBadRequest(self):
|
|
return self.WriteResponseError(400)
|
|
|
|
def WriteResponseForbidden(self):
|
|
return self.WriteResponseError(403)
|
|
|
|
def WriteResponseNotFound(self):
|
|
return self.WriteResponseError(404)
|
|
|
|
def WriteResponseInternalServerError(self):
|
|
return self.WriteResponseError(500)
|
|
|
|
_responseCodes = {
|
|
100: ('Continue', 'Request received, please continue'),
|
|
101: ('Switching Protocols',
|
|
'Switching to new protocol; obey Upgrade header'),
|
|
200: ('OK', 'Request fulfilled, document follows'),
|
|
201: ('Created', 'Document created, URL follows'),
|
|
202: ('Accepted',
|
|
'Request accepted, processing continues off-line'),
|
|
204: ('No Content', 'Request fulfilled, nothing follows'),
|
|
301: ('Moved Permanently', 'Object moved permanently -- see URI list'),
|
|
302: ('Found', 'Object moved temporarily -- see URI list'),
|
|
404: ('Not Found', 'Nothing matches the given URI'),
|
|
500: ('Internal Server Error', 'Server got itself in trouble'),
|
|
501: ('Not Implemented',
|
|
'Server does not support this operation'),
|
|
}
|