import socket from ure import compile class MicroWebSrvRoute: def __init__(self, route, method, func, routeArgNames, routeRegex): self.route = route self.method = method self.func = func self.routeArgNames = routeArgNames self.routeRegex = routeRegex class MicroWebSrv: _html_escape_chars = { "&": "&", '"': """, "'": "'", ">": ">", "<": "<" } _decoratedRouteHandlers = [] @classmethod def route(cls, url, method='GET'): def route_decorator(func): item = (url, method, func) cls._decoratedRouteHandlers.append(item) return func return route_decorator @staticmethod def HTMLEscape(s): return ''.join(MicroWebSrv._html_escape_chars.get(c, c) for c in s) @staticmethod def _tryAllocByteArray(size): for x in range(10): try: return bytearray(size) except: pass return None @staticmethod def _unquote(s): r = s.split('%') for i in range(1, len(r)): s = r[i] try: r[i] = chr(int(s[:2], 16)) + s[2:] except: r[i] = '%' + s return ''.join(r) @staticmethod def _unquote_plus(s): return MicroWebSrv._unquote(s.replace('+', ' ')) def __init__(self, routeHandlers=[], port=80, bindIP='0.0.0.0'): self._srvAddr = (bindIP, port) self._started = False self._routeHandlers = [] routeHandlers += self._decoratedRouteHandlers for route, method, func in routeHandlers: routeParts = route.split('/') routeArgNames = [] routeRegex = '' for s in routeParts: if s.startswith('<') and s.endswith('>'): routeArgNames.append(s[1:-1]) routeRegex += '/(\\w*)' elif s: routeRegex += '/' + s routeRegex += '$' routeRegex = compile(routeRegex) self._routeHandlers.append(MicroWebSrvRoute(route, method, func, routeArgNames, routeRegex)) def _serverProcess(self): self._started = True while True: try: client, cliAddr = self._server.accepted() if client == None: continue except Exception as e: print(e) break self._client(self, client, cliAddr) self._started = False def Start(self): if self._started: return self._server = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) self._server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._server.bind(self._srvAddr) self._server.listen(1) self._server.settimeout(0.5) self._serverProcess() def Stop(self): if not self._started: return self._server.close() def GetRouteHandler(self, resUrl, method): if not self._routeHandlers: return (None, None) if resUrl.endswith('/'): resUrl = resUrl[:-1] method = method.upper() for rh in self._routeHandlers: if rh.method == method: m = rh.routeRegex.match(resUrl) if m and rh.routeArgNames: routeArgs = {} for i, name in enumerate(rh.routeArgNames): value = m.group(i + 1) try: value = int(value) except: pass routeArgs[name] = value return (rh.func, routeArgs) elif m: return (rh.func, None) return (None, None) class _client: def __init__(self, microWebSrv, socket, addr): socket.settimeout(2) self._microWebSrv = microWebSrv self._socket = socket self._addr = addr self._method = None self._path = None self._httpVer = None self._resPath = "/" self._queryString = "" self._queryParams = {} self._headers = {} self._contentType = None self._contentLength = 0 self._processRequest() def _processRequest(self): try: response = MicroWebSrv._response(self) if self._parseFirstLine(response) and self._parseHeader(response): routeHandler, routeArgs = self._microWebSrv.GetRouteHandler(self._resPath, self._method) if routeHandler and routeArgs is not None: routeHandler(self, response, routeArgs) elif routeHandler: routeHandler(self, response) else: response.WriteResponseNotFound() except: response.WriteResponseInternalServerError() try: self._socket.close() except: pass def _parseFirstLine(self, response): try: elements = self._socket.readline().decode().strip().split() if len(elements) != 3: return False self._method = elements[0].upper() self._path = elements[1] self._httpVer = elements[2].upper() elements = self._path.split('?', 1) if len(elements) < 1: return True self._resPath = MicroWebSrv._unquote_plus(elements[0]) if len(elements) < 2: return True self._queryString = elements[1] elements = self._queryString.split('&') for s in elements: param = s.split('=', 1) if len(param) > 0: value = MicroWebSrv._unquote(param[1]) if len(param) > 1 else '' self._queryParams[MicroWebSrv._unquote(param[0])] = value return True except: pass return False def _parseHeader(self, response): while True: elements = self._socket.readline().decode().strip().split(':', 1) if len(elements) == 2: self._headers[elements[0].strip()] = elements[1].strip() elif len(elements) == 1 and len(elements[0]) == 0: if self._method == 'POST': self._contentType = self._headers.get("Content-Type", None) self._contentLength = int(self._headers.get("Content-Length", 0)) return True else: return False def ReadRequestContent(self, size=None): self._socket.setblocking(False) b = None try: if not size: b = self._socket.read(self._contentLength) elif size > 0: b = self._socket.read(size) except: pass self._socket.setblocking(True) return b if b else b'' def ReadRequestPostedFormData(self): res = {} data = self.ReadRequestContent() if len(data) < 1: return res elements = data.decode().split('&') for s in elements: param = s.split('=', 1) if len(param) < 1: next value = MicroWebSrv._unquote(param[1]) if len(param) > 1 else '' res[MicroWebSrv._unquote(param[0])] = value return res class _response: def __init__(self, client): self._client = client def _write(self, data): if type(data) == str: data = data.encode() return self._client._socket.write(data) def _writeFirstLine(self, code): reason = self._responseCodes.get(code, ('Unknown reason',))[0] self._write("HTTP/1.1 %s %s\r\n" % (code, reason)) def _writeHeader(self, name, value): self._write("%s: %s\r\n" % (name, value)) def _writeContentTypeHeader(self, contentType, charset=None): if contentType: ct = contentType \ + (("; charset=%s" % charset) if charset else "") else: ct = "application/octet-stream" self._writeHeader("Content-Type", ct) def _writeServerHeader(self): self._writeHeader("Server", "MicroWebSrv lite") def _writeEndHeader(self): self._write("\r\n") def _writeBeforeContent(self, code, headers, contentType, contentCharset, contentLength): self._writeFirstLine(code) if isinstance(headers, dict): for header in headers: self._writeHeader(header, headers[header]) if contentLength > 0: self._writeContentTypeHeader(contentType, contentCharset) self._writeHeader("Content-Length", contentLength) self._writeServerHeader() self._writeHeader("Connection", "close") self._writeEndHeader() def WriteResponse(self, code, headers, contentType, contentCharset, content): try: contentLength = len(content) if content else 0 self._writeBeforeContent(code, headers, contentType, contentCharset, contentLength) if contentLength > 0: self._write(content) return True except: return False def WriteResponseOk(self, headers=None, contentType=None, contentCharset=None, content=None): return self.WriteResponse(200, headers, contentType, contentCharset, content) def WriteResponseRedirect(self, location): headers = {"Location": location} return self.WriteResponse(302, headers, None, None, None) def WriteResponseError(self, code): responseCode = self._responseCodes.get(code, ('Unknown reason', '')) return self.WriteResponse(code, None, "text/html", "UTF-8", '{"code":"%s","reason":"%s","message":"%s"}' % ( code, responseCode[0], responseCode[1])) def WriteResponseBadRequest(self): return self.WriteResponseError(400) def WriteResponseForbidden(self): return self.WriteResponseError(403) def WriteResponseNotFound(self): return self.WriteResponseError(404) def WriteResponseInternalServerError(self): return self.WriteResponseError(500) _responseCodes = { 100: ('Continue', 'Request received, please continue'), 101: ('Switching Protocols', 'Switching to new protocol; obey Upgrade header'), 200: ('OK', 'Request fulfilled, document follows'), 201: ('Created', 'Document created, URL follows'), 202: ('Accepted', 'Request accepted, processing continues off-line'), 204: ('No Content', 'Request fulfilled, nothing follows'), 301: ('Moved Permanently', 'Object moved permanently -- see URI list'), 302: ('Found', 'Object moved temporarily -- see URI list'), 404: ('Not Found', 'Nothing matches the given URI'), 500: ('Internal Server Error', 'Server got itself in trouble'), 501: ('Not Implemented', 'Server does not support this operation'), }