import os import hashlib import logging import time from contextlib import asynccontextmanager import httpx from fastapi import FastAPI, Request, HTTPException, status from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field from typing import List from motor.motor_asyncio import AsyncIOMotorClient # --------------------------------------------------------------------------- # Logging # --------------------------------------------------------------------------- logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") log = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Secrets / Environment variables # --------------------------------------------------------------------------- def _require_secret(name: str) -> str: value = os.environ.get(name, "") if not value: log.warning("Secret '%s' is not set — related functionality will be unavailable.", name) return value HASH_VALUE = os.environ.get("HASH_VALUE", "") CF_SECRET_KEY = os.environ.get("CF_SECRET_KEY", "") ALLOWED_DOMAIN = os.environ.get("ALLOWED_DOMAIN", "") MONGO_PASSWORD = _require_secret("MONGO_PASSWORD") MONGO_DB = os.environ.get("MONGO_DB", "MariaPTDB") MONGO_COLLECTION = os.environ.get("MONGO_COLL", "MariaPTColl") MONGO_URI = os.environ.get("MONGO_URI") or \ f"mongodb+srv://testuser:{MONGO_PASSWORD}@cluster0.ntz2mpi.mongodb.net/" # --------------------------------------------------------------------------- # MongoDB client (shared, created once at startup) # --------------------------------------------------------------------------- mongo_client: AsyncIOMotorClient | None = None @asynccontextmanager async def lifespan(app: FastAPI): global mongo_client log.info("Connecting to MongoDB…") mongo_client = AsyncIOMotorClient(MONGO_URI, serverSelectionTimeoutMS=10_000) # Quick connectivity check try: await mongo_client.admin.command("ping") log.info("MongoDB connection established.") except Exception as exc: log.error("MongoDB ping failed: %s", exc) yield log.info("Closing MongoDB connection…") mongo_client.close() # --------------------------------------------------------------------------- # FastAPI app # --------------------------------------------------------------------------- app = FastAPI( title="MariaPT Curriculum API", version="1.0.0", lifespan=lifespan, ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # --------------------------------------------------------------------------- # Auth helpers # --------------------------------------------------------------------------- def _check_auth_code(code: str) -> bool: if not HASH_VALUE: return False return hashlib.sha256(code.encode()).hexdigest() == HASH_VALUE async def _check_turnstile(token: str) -> bool: if not CF_SECRET_KEY: return False try: async with httpx.AsyncClient(timeout=8.0) as client: resp = await client.post( "https://challenges.cloudflare.com/turnstile/v0/siteverify", data={"secret": CF_SECRET_KEY, "response": token}, ) return resp.json().get("success", False) except Exception as exc: log.error("Turnstile verification error: %s", exc) return False async def _authenticate(request: Request) -> bool: auth_code = request.headers.get("auth_code") cf_token = request.headers.get("cf-turnstile-token") if auth_code: return _check_auth_code(auth_code) if cf_token: return await _check_turnstile(cf_token) # Fallback: domain / referer check referer = request.headers.get("referer", "") origin = request.headers.get("origin", "") if ALLOWED_DOMAIN and (ALLOWED_DOMAIN in referer or ALLOWED_DOMAIN in origin): return True return False # --------------------------------------------------------------------------- # Pydantic models # --------------------------------------------------------------------------- class CurriculumObjective(BaseModel): topic: str content: str learning_objectives: List[str] class CurriculumRequest(BaseModel): board: str class_: str = Field(..., alias="class") subject: str student_name: str teacher_persona: str curriculum_objectives: List[CurriculumObjective] model_config = {"populate_by_name": True} class CurriculumResponse(BaseModel): request: str # --------------------------------------------------------------------------- # Endpoints # --------------------------------------------------------------------------- @app.get( "/ping", summary="Authenticated liveness check", ) async def ping(raw_request: Request): if not await _authenticate(raw_request): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Unauthorized: provide a valid auth_code, cf-turnstile-token, or request from an allowed domain.", ) return {"message": "Alive"} @app.post( "/curriculum", response_model=CurriculumResponse, status_code=status.HTTP_201_CREATED, summary="Store a curriculum request in MongoDB", ) async def create_curriculum( payload: CurriculumRequest, raw_request: Request, ): # --- Auth --- if not await _authenticate(raw_request): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Unauthorized: provide a valid auth_code, cf-turnstile-token, or request from an allowed domain.", ) # --- Generate request ID from current timestamp (ms) --- request_id = str(int(time.time() * 1000)) # --- Build document --- document = { "request": request_id, "board": payload.board, "class": payload.class_, "subject": payload.subject, "student_name": payload.student_name, "teacher_persona": payload.teacher_persona, "curriculum_objectives": [ obj.model_dump() for obj in payload.curriculum_objectives ], } # --- Write to MongoDB --- if mongo_client is None: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Database connection is not available.", ) try: collection = mongo_client[MONGO_DB][MONGO_COLLECTION] result = await collection.insert_one(document) log.info("Inserted document with _id=%s request_id=%s", result.inserted_id, request_id) except Exception as exc: log.error("MongoDB write error: %s", exc) raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail="Failed to persist data to the database.", ) return CurriculumResponse(request=request_id) # --------------------------------------------------------------------------- # Health check (no auth required) # --------------------------------------------------------------------------- @app.get("/health", include_in_schema=False) async def health(): return {"status": "ok"}