from http import HTTPStatus import msgspec from blacksheep import Application, Request, get, post from blacksheep.server.bindings import FromJSON from blacksheep.server.responses import json from loguru import logger from antispam_api.config import API_KEY from antispam_api.loader import get_spam_score app = Application() logger.info("Initializing") class RequestData(msgspec.Struct): message: str @post("/check") async def check_message(request: Request, data: FromJSON[RequestData]): logger.info("New request: {}", {"message": data.value.message}) key = request.headers.get_first(b"x-api-key") if not key: logger.warning("Tried to request without key") return json({"detail": "No API key provided"}, status=HTTPStatus.UNAUTHORIZED) if key.decode() != API_KEY: logger.warning("Incorrent API key") return json({"detail": "Incorrect API key"}, status=HTTPStatus.UNAUTHORIZED) try: output = get_spam_score(data.value.message) logger.info("Model output: {}", output) result = {"spam_score": output} logger.info("Returning: {}", result) return json(result) except Exception as e: # noqa: BLE001 logger.error("Error while checking message: {!r}", str(e)) return json({"detail": str(e)}, status=HTTPStatus.INTERNAL_SERVER_ERROR) @get("/") async def health_check(request: Request): key = request.headers.get_first(b"x-api-key") if not key: logger.warning("Tried to request without key") return json({"detail": "No API key provided"}, status=HTTPStatus.UNAUTHORIZED) if key.decode() != API_KEY: logger.warning("Incorrent API key") return json({"detail": "Incorrect API key"}, status=HTTPStatus.UNAUTHORIZED) return json({"detail": "Meow!"})