# /// zerodep # version = "0.1.0" # deps = [] # tier = "subsystem" # category = "network" # note = "Install/update via `zerodep add httpserver`" # /// """Zero-dependency async HTTP server with decorator-based routing. Part of zerodep: https://github.com/Oaklight/zerodep Copyright (c) 2026 Peng Ding. MIT License. Async HTTP/1.1 server built on ``asyncio.start_server()``. Supports decorator-based routing, JSON request/response, static file serving, streaming responses (SSE), and graceful shutdown. Usage:: from httpserver import App, JSONResponse app = App() @app.route("/status") async def status(request): return JSONResponse({"state": "idle"}) @app.route("/echo", methods=["POST"]) def echo(request): return JSONResponse(request.json()) app.run(host="127.0.0.1", port=8000) """ # ── Imports ────────────────────────────────────────────────────────────────── from __future__ import annotations import asyncio import dataclasses import inspect import json as _json import logging import mimetypes import os import re import signal import sys from collections.abc import AsyncIterator, Callable from email.utils import formatdate from pathlib import Path from typing import Any from urllib.parse import parse_qs, unquote, urlparse __all__ = [ # Application "App", # Request / Response "Request", "Response", "JSONResponse", "StreamingResponse", "FileResponse", # Exceptions "HTTPException", # Utilities "abort", ] logger = logging.getLogger(__name__) # ── Constants ──────────────────────────────────────────────────────────────── DEFAULT_HOST = "127.0.0.1" DEFAULT_PORT = 8000 DEFAULT_MAX_BODY_SIZE = 1_048_576 # 1 MB DEFAULT_READ_TIMEOUT = 30.0 # seconds _STATUS_REASONS: dict[int, str] = { 100: "Continue", 101: "Switching Protocols", 200: "OK", 201: "Created", 202: "Accepted", 204: "No Content", 301: "Moved Permanently", 302: "Found", 303: "See Other", 304: "Not Modified", 307: "Temporary Redirect", 308: "Permanent Redirect", 400: "Bad Request", 401: "Unauthorized", 403: "Forbidden", 404: "Not Found", 405: "Method Not Allowed", 406: "Not Acceptable", 408: "Request Timeout", 409: "Conflict", 413: "Content Too Large", 415: "Unsupported Media Type", 422: "Unprocessable Entity", 429: "Too Many Requests", 500: "Internal Server Error", 502: "Bad Gateway", 503: "Service Unavailable", 504: "Gateway Timeout", } # Type converters for path parameters _PARAM_CONVERTERS: dict[str, tuple[str, Callable[[str], Any]]] = { "str": (r"([^/]+)", str), "int": (r"(-?\d+)", int), "float": (r"(-?[0-9]+(?:\.[0-9]+)?)", float), "path": (r"(.+)", str), } # Regex to find or segments in route patterns _PARAM_RE = re.compile(r"<(?:(\w+):)?(\w+)>") _SENTINEL = object() # ── Exceptions ─────────────────────────────────────────────────────────────── class HTTPException(Exception): """HTTP error that maps to a specific status code. Raise directly or via :func:`abort` to short-circuit request handling. Args: status_code: HTTP status code (e.g. 404, 500). message: Optional human-readable error message. """ __slots__ = ("status_code", "message") def __init__(self, status_code: int, message: str | None = None): self.status_code = status_code self.message = message or _STATUS_REASONS.get(status_code, "Error") super().__init__(self.message) class _BadRequest(Exception): """Internal: malformed HTTP request from client.""" def abort(status_code: int, message: str | None = None) -> None: """Raise an :class:`HTTPException` with the given status code. Args: status_code: HTTP status code. message: Optional error message. """ raise HTTPException(status_code, message) # ── Request ────────────────────────────────────────────────────────────────── class Request: """Parsed HTTP request. Attributes: method: Uppercase HTTP method (GET, POST, ...). path: URL path, percent-decoded. query_string: Raw query string (without leading ``?``). query_params: Parsed query string as ``{key: [values]}``. headers: Case-insensitive header dict (keys stored lowercase). body: Raw request body bytes. path_params: Parameters extracted from the route pattern. client_addr: Client ``(host, port)`` tuple. app: Reference to the :class:`App` instance handling this request. """ __slots__ = ( "method", "path", "query_string", "query_params", "headers", "body", "path_params", "client_addr", "app", "_json", ) def __init__( self, method: str, path: str, query_string: str, headers: dict[str, str], body: bytes, client_addr: tuple[str, int], app: App | None = None, ): self.method = method self.path = path self.query_string = query_string self.query_params: dict[str, list[str]] = parse_qs(query_string) self.headers = headers self.body = body self.path_params: dict[str, Any] = {} self.client_addr = client_addr self.app = app self._json: Any = _SENTINEL def json(self) -> Any: """Parse body as JSON (cached).""" if self._json is _SENTINEL: self._json = _json.loads(self.body) return self._json def text(self) -> str: """Decode body as UTF-8.""" return self.body.decode("utf-8") def form(self) -> dict[str, list[str]]: """Parse URL-encoded form body.""" return parse_qs(self.body.decode("utf-8")) # ── Response Classes ───────────────────────────────────────────────────────── class Response: """HTTP response with a fixed body. Args: body: Response body (bytes or str). status_code: HTTP status code. headers: Extra response headers. content_type: Shorthand for ``Content-Type`` header. """ __slots__ = ("status_code", "headers", "body") def __init__( self, body: bytes | str = b"", status_code: int = 200, headers: dict[str, str] | None = None, content_type: str | None = None, ): self.status_code = status_code self.headers: dict[str, str] = headers.copy() if headers else {} if isinstance(body, str): self.body = body.encode("utf-8") else: self.body = body if content_type is not None: self.headers["Content-Type"] = content_type async def _write(self, writer: asyncio.StreamWriter) -> None: """Serialize and write the full HTTP response.""" reason = _STATUS_REASONS.get(self.status_code, "Unknown") self.headers.setdefault("Content-Length", str(len(self.body))) self.headers.setdefault("Content-Type", "text/plain; charset=utf-8") self.headers.setdefault("Date", _http_date()) self.headers.setdefault("Connection", "close") buf = bytearray() buf.extend(f"HTTP/1.1 {self.status_code} {reason}\r\n".encode("latin-1")) for k, v in self.headers.items(): buf.extend(f"{k}: {v}\r\n".encode("latin-1")) buf.extend(b"\r\n") buf.extend(self.body) writer.write(bytes(buf)) await writer.drain() class JSONResponse(Response): """Response serialized as JSON. Args: data: Python object to serialize. status_code: HTTP status code. headers: Extra response headers. """ def __init__( self, data: Any, status_code: int = 200, headers: dict[str, str] | None = None, ): body = _json.dumps(data, ensure_ascii=False).encode("utf-8") super().__init__( body=body, status_code=status_code, headers=headers, content_type="application/json; charset=utf-8", ) class StreamingResponse: """HTTP response streamed from an async generator. Writes chunks using ``Transfer-Encoding: chunked`` unless ``content_type`` is ``text/event-stream`` (SSE), in which case raw bytes are flushed directly for maximum compatibility with SSE clients. Args: generator: Async iterator yielding ``bytes`` or ``str`` chunks. status_code: HTTP status code. headers: Extra response headers. content_type: MIME type (default ``application/octet-stream``). """ __slots__ = ("_generator", "status_code", "headers", "content_type") def __init__( self, generator: AsyncIterator[bytes | str], status_code: int = 200, headers: dict[str, str] | None = None, content_type: str = "application/octet-stream", ): self._generator = generator self.status_code = status_code self.headers: dict[str, str] = headers.copy() if headers else {} self.content_type = content_type async def _write(self, writer: asyncio.StreamWriter) -> None: """Write status line, headers, then stream the body.""" reason = _STATUS_REASONS.get(self.status_code, "Unknown") is_sse = self.content_type.startswith("text/event-stream") self.headers["Content-Type"] = self.content_type if not is_sse: self.headers.setdefault("Transfer-Encoding", "chunked") else: self.headers.setdefault("Cache-Control", "no-cache") self.headers.setdefault("Date", _http_date()) self.headers.setdefault("Connection", "close") buf = bytearray() buf.extend(f"HTTP/1.1 {self.status_code} {reason}\r\n".encode("latin-1")) for k, v in self.headers.items(): buf.extend(f"{k}: {v}\r\n".encode("latin-1")) buf.extend(b"\r\n") writer.write(bytes(buf)) await writer.drain() try: async for chunk in self._generator: if isinstance(chunk, str): chunk = chunk.encode("utf-8") if is_sse: writer.write(chunk) else: writer.write(f"{len(chunk):x}\r\n".encode("latin-1")) writer.write(chunk) writer.write(b"\r\n") await writer.drain() if not is_sse: writer.write(b"0\r\n\r\n") await writer.drain() except (BrokenPipeError, ConnectionResetError, ConnectionAbortedError): logger.debug("Client disconnected during streaming") finally: aclose = getattr(self._generator, "aclose", None) if aclose is not None: await aclose() class FileResponse(Response): """Response serving a file from disk. Args: path: Path to the file. content_type: MIME type (auto-detected from extension if ``None``). status_code: HTTP status code. """ def __init__( self, path: str | Path, content_type: str | None = None, status_code: int = 200, ): file_path = Path(path) body = file_path.read_bytes() if content_type is None: guessed, _ = mimetypes.guess_type(str(file_path)) content_type = guessed or "application/octet-stream" headers = { "Last-Modified": _http_date(os.path.getmtime(file_path)), } super().__init__( body=body, status_code=status_code, headers=headers, content_type=content_type, ) # ── Routing ────────────────────────────────────────────────────────────────── @dataclasses.dataclass(frozen=True, slots=True) class _Route: """Internal route entry.""" methods: list[str] # uppercase, e.g. ["GET", "POST"], or ["*"] pattern: re.Pattern[str] handler: Callable[..., Any] is_async: bool param_names: list[str] param_converters: list[Callable[[str], Any]] def _compile_route( path: str, ) -> tuple[re.Pattern[str], list[str], list[Callable[[str], Any]]]: """Compile a path pattern into a regex and parameter metadata. Supports ````, ````, ````, ````. Args: path: URL path pattern (e.g. ``/users/``). Returns: Tuple of (compiled regex, param names, param converters). """ param_names: list[str] = [] param_converters: list[Callable[[str], Any]] = [] regex_parts: list[str] = [] last_end = 0 for m in _PARAM_RE.finditer(path): regex_parts.append(re.escape(path[last_end : m.start()])) type_name = m.group(1) or "str" name = m.group(2) if type_name not in _PARAM_CONVERTERS: raise ValueError(f"Unknown path parameter type: {type_name!r}") regex_fragment, converter = _PARAM_CONVERTERS[type_name] regex_parts.append(regex_fragment) param_names.append(name) param_converters.append(converter) last_end = m.end() regex_parts.append(re.escape(path[last_end:])) full_regex = "^" + "".join(regex_parts) + "$" return re.compile(full_regex), param_names, param_converters # ── HTTP Parsing ───────────────────────────────────────────────────────────── async def _read_request( reader: asyncio.StreamReader, timeout: float, max_body_size: int, ) -> tuple[str, str, str, dict[str, str], bytes]: """Read and parse an HTTP request from a stream. Returns: Tuple of (method, path, query_string, headers, body). Raises: _BadRequest: If the request is malformed. asyncio.TimeoutError: If reading times out. """ # -- Request line -- raw_line = await asyncio.wait_for(reader.readline(), timeout=timeout) if not raw_line: raise _BadRequest("Empty request") request_line = raw_line.decode("latin-1").rstrip("\r\n") parts = request_line.split(" ", 2) if len(parts) != 3: raise _BadRequest(f"Malformed request line: {request_line!r}") method, raw_url, _version = parts # -- URL -- parsed = urlparse(raw_url) path = unquote(parsed.path) query_string = parsed.query # -- Headers -- headers: dict[str, str] = {} while True: line = await asyncio.wait_for(reader.readline(), timeout=timeout) decoded = line.decode("latin-1").rstrip("\r\n") if not decoded: break if ":" in decoded: k, v = decoded.split(":", 1) headers[k.strip().lower()] = v.strip() # -- Body -- body = b"" cl = headers.get("content-length") if cl is not None: length = int(cl) if length > max_body_size: raise HTTPException(413, f"Request body too large ({length} bytes)") if length > 0: body = await asyncio.wait_for(reader.readexactly(length), timeout=timeout) elif headers.get("transfer-encoding", "").lower() == "chunked": body = await _read_chunked_body(reader, timeout, max_body_size) return method.upper(), path, query_string, headers, body async def _read_chunked_body( reader: asyncio.StreamReader, timeout: float, max_body_size: int, ) -> bytes: """Read a chunked transfer-encoded body.""" parts: list[bytes] = [] total = 0 while True: size_line = await asyncio.wait_for(reader.readline(), timeout=timeout) size_str = size_line.decode("latin-1").split(";")[0].strip() if not size_str: break chunk_size = int(size_str, 16) if chunk_size == 0: await asyncio.wait_for(reader.readline(), timeout=timeout) break total += chunk_size if total > max_body_size: raise HTTPException(413, "Request body too large") data = await asyncio.wait_for(reader.readexactly(chunk_size), timeout=timeout) await asyncio.wait_for(reader.readline(), timeout=timeout) parts.append(data) return b"".join(parts) # ── Utilities ──────────────────────────────────────────────────────────────── def _http_date(timestamp: float | None = None) -> str: """Format a timestamp as an HTTP-date (RFC 7231).""" return formatdate(timeval=timestamp, localtime=False, usegmt=True) def _coerce_response(result: Any) -> Response | StreamingResponse: """Convert a handler return value into a Response object.""" if isinstance(result, (Response, StreamingResponse)): return result if result is None: return Response(status_code=204) if isinstance(result, dict): return JSONResponse(result) if isinstance(result, tuple): if len(result) == 2: body, status = result resp = _coerce_response(body) resp.status_code = status return resp if len(result) == 3: body, status, extra_headers = result resp = _coerce_response(body) resp.status_code = status resp.headers.update(extra_headers) return resp raise ValueError(f"Unsupported tuple length: {len(result)}") if isinstance(result, bytes): return Response(body=result, content_type="application/octet-stream") if isinstance(result, str): return Response(body=result, content_type="text/plain; charset=utf-8") raise TypeError( f"Cannot coerce handler return type {type(result).__name__} to Response" ) # ── Static File Resolution ─────────────────────────────────────────────────── def _resolve_static_file( request_path: str, url_prefix: str, directory: str, ) -> FileResponse | None: """Try to resolve a static file request. Returns a FileResponse if the file exists and the path is safe, otherwise None. """ if not request_path.startswith(url_prefix): return None relative = request_path[len(url_prefix) :].lstrip("/") if not relative: return None base = Path(directory).resolve() target = (base / relative).resolve() # Prevent directory traversal if not str(target).startswith(str(base)): return None if not target.is_file(): return None return FileResponse(target) # ── App ────────────────────────────────────────────────────────────────────── class App: """Async HTTP server application. Args: max_body_size: Maximum request body size in bytes. read_timeout: Timeout for reading a single request (seconds). Example:: app = App() @app.get("/hello") async def hello(request): return {"message": "Hello, world!"} app.run() """ def __init__( self, *, max_body_size: int = DEFAULT_MAX_BODY_SIZE, read_timeout: float = DEFAULT_READ_TIMEOUT, ): self._routes: list[_Route] = [] self._static_routes: list[tuple[str, str]] = [] self._before_request_handlers: list[Callable[..., Any]] = [] self._after_request_handlers: list[Callable[..., Any]] = [] self._error_handlers: dict[int | type, Callable[..., Any]] = {} self._server: asyncio.Server | None = None self._shutdown_event: asyncio.Event | None = None self.max_body_size = max_body_size self.read_timeout = read_timeout self.port: int | None = None self.host: str | None = None # ── Route Registration ─────────────────────────────────────────────── def route( self, url_pattern: str, methods: list[str] | None = None ) -> Callable[..., Any]: """Register a route handler. Args: url_pattern: URL pattern with optional parameters. methods: HTTP methods to handle (e.g. ``["GET", "POST"]``). Defaults to ``["GET"]``. Example:: @app.route("/users/", methods=["GET", "POST"]) async def user(request, id): return {"id": id} """ if methods is None: methods = ["GET"] def decorator(handler: Callable[..., Any]) -> Callable[..., Any]: pattern, param_names, converters = _compile_route(url_pattern) is_async = inspect.iscoroutinefunction(handler) self._routes.append( _Route( methods=[m.upper() for m in methods], pattern=pattern, handler=handler, is_async=is_async, param_names=param_names, param_converters=converters, ) ) return handler return decorator def get(self, path: str) -> Callable[..., Any]: """Shorthand for ``@app.route(path, methods=["GET"])``.""" return self.route(path, methods=["GET"]) def post(self, path: str) -> Callable[..., Any]: """Shorthand for ``@app.route(path, methods=["POST"])``.""" return self.route(path, methods=["POST"]) def put(self, path: str) -> Callable[..., Any]: """Shorthand for ``@app.route(path, methods=["PUT"])``.""" return self.route(path, methods=["PUT"]) def delete(self, path: str) -> Callable[..., Any]: """Shorthand for ``@app.route(path, methods=["DELETE"])``.""" return self.route(path, methods=["DELETE"]) def patch(self, path: str) -> Callable[..., Any]: """Shorthand for ``@app.route(path, methods=["PATCH"])``.""" return self.route(path, methods=["PATCH"]) def static(self, url_prefix: str, directory: str) -> None: """Register a directory for static file serving. Args: url_prefix: URL prefix (e.g. ``"/static"``). directory: Filesystem path to the directory. Example:: app.static("/assets", "./public") """ url_prefix = url_prefix.rstrip("/") abs_dir = os.path.abspath(directory) self._static_routes.append((url_prefix, abs_dir)) # ── Middleware ──────────────────────────────────────────────────────── def before_request(self, handler: Callable[..., Any]) -> Callable[..., Any]: """Register a before-request hook. The hook receives ``(request)`` and may return a ``Response`` to short-circuit the route handler. """ self._before_request_handlers.append(handler) return handler def after_request(self, handler: Callable[..., Any]) -> Callable[..., Any]: """Register an after-request hook. The hook receives ``(request, response)`` and may return a new or modified ``Response``. """ self._after_request_handlers.append(handler) return handler def errorhandler(self, code_or_exc: int | type) -> Callable[..., Any]: """Register an error handler. Args: code_or_exc: HTTP status code (int) or exception class. The handler receives ``(request, exception)`` and must return a ``Response``. """ def decorator(handler: Callable[..., Any]) -> Callable[..., Any]: self._error_handlers[code_or_exc] = handler return handler return decorator # ── Request Dispatch ───────────────────────────────────────────────── def _match_route( self, request: Request ) -> tuple[_Route | None, re.Match[str] | None, bool]: """Find the first route matching the request path and method. Returns: (matched_route, regex_match, path_existed). *path_existed* is True when a route matched the path but not the HTTP method. """ path_existed = False for route in self._routes: m = route.pattern.match(request.path) if m is None: continue path_existed = True if "*" not in route.methods and request.method not in route.methods: continue return route, m, True return None, None, path_existed async def _invoke_route( self, route: _Route, match: re.Match[str], request: Request ) -> Response | StreamingResponse: """Extract path params, call the handler, return a response.""" for name, converter, value in zip( route.param_names, route.param_converters, match.groups() ): request.path_params[name] = converter(value) try: result = await _invoke(route.handler, request, **request.path_params) return _coerce_response(result) except HTTPException as exc: return await self._handle_error(request, exc) except Exception as exc: logger.exception( "Unhandled exception in handler %s", getattr(route.handler, "__name__", repr(route.handler)), ) return await self._handle_error(request, exc) async def _run_after_hooks( self, request: Request, response: Response | StreamingResponse ) -> Response | StreamingResponse: """Run after-request hooks, allowing them to replace the response.""" for hook in self._after_request_handlers: hook_result = await _invoke(hook, request, response) if hook_result is not None: response = _coerce_response(hook_result) return response async def _dispatch(self, request: Request) -> Response | StreamingResponse: """Match a request to a route and invoke the handler.""" # -- before_request hooks -- for hook in self._before_request_handlers: result = await _invoke(hook, request) if result is not None: return _coerce_response(result) # -- Static file check -- for prefix, directory in self._static_routes: file_resp = _resolve_static_file(request.path, prefix, directory) if file_resp is not None: return file_resp # -- Route matching -- route, match, path_existed = self._match_route(request) if route is not None and match is not None: response = await self._invoke_route(route, match, request) return await self._run_after_hooks(request, response) # -- No route matched -- if path_existed: allowed = sorted( { m for r in self._routes if r.pattern.match(request.path) for m in r.methods if m != "*" } ) exc = HTTPException(405, "Method Not Allowed") response = await self._handle_error(request, exc) if isinstance(response, Response): response.headers["Allow"] = ", ".join(allowed) return response exc = HTTPException(404, "Not Found") return await self._handle_error(request, exc) async def _handle_error( self, request: Request, exc: Exception ) -> Response | StreamingResponse: """Resolve an error into a response, consulting registered handlers.""" if isinstance(exc, HTTPException): handler = self._error_handlers.get(exc.status_code) if handler is not None: result = await _invoke(handler, request, exc) return _coerce_response(result) for exc_cls, handler in self._error_handlers.items(): if isinstance(exc_cls, type) and isinstance(exc, exc_cls): result = await _invoke(handler, request, exc) return _coerce_response(result) if isinstance(exc, HTTPException): return JSONResponse( {"error": exc.message}, status_code=exc.status_code, ) return JSONResponse( {"error": "Internal Server Error"}, status_code=500, ) # ── Connection Handling ─────────────────────────────────────────────── async def _handle_connection( self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, ) -> None: """Handle a single client connection.""" peer = writer.get_extra_info("peername") client_addr = (peer[0], peer[1]) if peer else ("unknown", 0) try: method, path, qs, headers, body = await _read_request( reader, self.read_timeout, self.max_body_size ) except _BadRequest as exc: logger.debug("Bad request from %s: %s", client_addr, exc) response = Response( body=str(exc), status_code=400, content_type="text/plain; charset=utf-8", ) try: await response._write(writer) except (BrokenPipeError, ConnectionResetError): pass writer.close() return except HTTPException as exc: response = JSONResponse({"error": exc.message}, status_code=exc.status_code) try: await response._write(writer) except (BrokenPipeError, ConnectionResetError): pass writer.close() return except (asyncio.TimeoutError, asyncio.IncompleteReadError): writer.close() return except Exception: writer.close() return request = Request( method=method, path=path, query_string=qs, headers=headers, body=body, client_addr=client_addr, app=self, ) logger.debug("%s %s from %s", method, path, client_addr) try: response = await self._dispatch(request) await response._write(writer) except (BrokenPipeError, ConnectionResetError, ConnectionAbortedError): logger.debug("Connection reset by %s during response", client_addr) except Exception: logger.exception("Error writing response to %s", client_addr) finally: try: writer.close() await writer.wait_closed() except Exception: pass # ── Server Lifecycle ───────────────────────────────────────────────── def run( self, host: str = DEFAULT_HOST, port: int = DEFAULT_PORT, ) -> None: """Start the server (blocking). Args: host: Bind address. port: Bind port. Use ``0`` for OS-assigned port. """ try: asyncio.run(self._serve(host, port)) except KeyboardInterrupt: pass async def _serve(self, host: str, port: int) -> None: """Internal async server loop.""" self._shutdown_event = asyncio.Event() server = await asyncio.start_server( self._handle_connection, host, port, ) self._server = server addrs = server.sockets[0].getsockname() if server.sockets else (host, port) self.host = addrs[0] self.port = addrs[1] logger.info("Serving on %s:%d", self.host, self.port) loop = asyncio.get_running_loop() if sys.platform != "win32": for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, self._shutdown_event.set) async with server: await self._shutdown_event.wait() logger.info("Shutting down server") def shutdown(self) -> None: """Request a graceful server shutdown. Safe to call from a request handler. """ if self._shutdown_event is not None: self._shutdown_event.set() # ── Handler Invocation ─────────────────────────────────────────────────────── async def _invoke(handler: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: """Invoke a handler, wrapping sync functions with ``asyncio.to_thread``.""" if inspect.iscoroutinefunction(handler): return await handler(*args, **kwargs) return await asyncio.to_thread(handler, *args, **kwargs)