Spaces:
Running
Running
| import os | |
| import hashlib | |
| import logging | |
| import time | |
| from contextlib import asynccontextmanager | |
| import httpx | |
| from fastapi import FastAPI, Request, HTTPException, status | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel, Field | |
| from typing import List | |
| from motor.motor_asyncio import AsyncIOMotorClient | |
| # --------------------------------------------------------------------------- | |
| # Logging | |
| # --------------------------------------------------------------------------- | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") | |
| log = logging.getLogger(__name__) | |
| # --------------------------------------------------------------------------- | |
| # Secrets / Environment variables | |
| # --------------------------------------------------------------------------- | |
| def _require_secret(name: str) -> str: | |
| value = os.environ.get(name, "") | |
| if not value: | |
| log.warning("Secret '%s' is not set — related functionality will be unavailable.", name) | |
| return value | |
| HASH_VALUE = os.environ.get("HASH_VALUE", "") | |
| CF_SECRET_KEY = os.environ.get("CF_SECRET_KEY", "") | |
| ALLOWED_DOMAIN = os.environ.get("ALLOWED_DOMAIN", "") | |
| MONGO_PASSWORD = _require_secret("MONGO_PASSWORD") | |
| MONGO_DB = os.environ.get("MONGO_DB", "MariaPTDB") | |
| MONGO_COLLECTION = os.environ.get("MONGO_COLL", "MariaPTColl") | |
| MONGO_URI = os.environ.get("MONGO_URI") or \ | |
| f"mongodb+srv://testuser:{MONGO_PASSWORD}@cluster0.ntz2mpi.mongodb.net/" | |
| # --------------------------------------------------------------------------- | |
| # MongoDB client (shared, created once at startup) | |
| # --------------------------------------------------------------------------- | |
| mongo_client: AsyncIOMotorClient | None = None | |
| async def lifespan(app: FastAPI): | |
| global mongo_client | |
| log.info("Connecting to MongoDB…") | |
| mongo_client = AsyncIOMotorClient(MONGO_URI, serverSelectionTimeoutMS=10_000) | |
| # Quick connectivity check | |
| try: | |
| await mongo_client.admin.command("ping") | |
| log.info("MongoDB connection established.") | |
| except Exception as exc: | |
| log.error("MongoDB ping failed: %s", exc) | |
| yield | |
| log.info("Closing MongoDB connection…") | |
| mongo_client.close() | |
| # --------------------------------------------------------------------------- | |
| # FastAPI app | |
| # --------------------------------------------------------------------------- | |
| app = FastAPI( | |
| title="MariaPT Curriculum API", | |
| version="1.0.0", | |
| lifespan=lifespan, | |
| ) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Auth helpers | |
| # --------------------------------------------------------------------------- | |
| def _check_auth_code(code: str) -> bool: | |
| if not HASH_VALUE: | |
| return False | |
| return hashlib.sha256(code.encode()).hexdigest() == HASH_VALUE | |
| async def _check_turnstile(token: str) -> bool: | |
| if not CF_SECRET_KEY: | |
| return False | |
| try: | |
| async with httpx.AsyncClient(timeout=8.0) as client: | |
| resp = await client.post( | |
| "https://challenges.cloudflare.com/turnstile/v0/siteverify", | |
| data={"secret": CF_SECRET_KEY, "response": token}, | |
| ) | |
| return resp.json().get("success", False) | |
| except Exception as exc: | |
| log.error("Turnstile verification error: %s", exc) | |
| return False | |
| async def _authenticate(request: Request) -> bool: | |
| auth_code = request.headers.get("auth_code") | |
| cf_token = request.headers.get("cf-turnstile-token") | |
| if auth_code: | |
| return _check_auth_code(auth_code) | |
| if cf_token: | |
| return await _check_turnstile(cf_token) | |
| # Fallback: domain / referer check | |
| referer = request.headers.get("referer", "") | |
| origin = request.headers.get("origin", "") | |
| if ALLOWED_DOMAIN and (ALLOWED_DOMAIN in referer or ALLOWED_DOMAIN in origin): | |
| return True | |
| return False | |
| # --------------------------------------------------------------------------- | |
| # Pydantic models | |
| # --------------------------------------------------------------------------- | |
| class CurriculumObjective(BaseModel): | |
| topic: str | |
| content: str | |
| learning_objectives: List[str] | |
| class CurriculumRequest(BaseModel): | |
| board: str | |
| class_: str = Field(..., alias="class") | |
| subject: str | |
| student_name: str | |
| teacher_persona: str | |
| curriculum_objectives: List[CurriculumObjective] | |
| model_config = {"populate_by_name": True} | |
| class CurriculumResponse(BaseModel): | |
| request: str | |
| # --------------------------------------------------------------------------- | |
| # Endpoints | |
| # --------------------------------------------------------------------------- | |
| async def ping(raw_request: Request): | |
| if not await _authenticate(raw_request): | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Unauthorized: provide a valid auth_code, cf-turnstile-token, or request from an allowed domain.", | |
| ) | |
| return {"message": "Alive"} | |
| async def create_curriculum( | |
| payload: CurriculumRequest, | |
| raw_request: Request, | |
| ): | |
| # --- Auth --- | |
| if not await _authenticate(raw_request): | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Unauthorized: provide a valid auth_code, cf-turnstile-token, or request from an allowed domain.", | |
| ) | |
| # --- Generate request ID from current timestamp (ms) --- | |
| request_id = str(int(time.time() * 1000)) | |
| # --- Build document --- | |
| document = { | |
| "request": request_id, | |
| "board": payload.board, | |
| "class": payload.class_, | |
| "subject": payload.subject, | |
| "student_name": payload.student_name, | |
| "teacher_persona": payload.teacher_persona, | |
| "curriculum_objectives": [ | |
| obj.model_dump() for obj in payload.curriculum_objectives | |
| ], | |
| } | |
| # --- Write to MongoDB --- | |
| if mongo_client is None: | |
| raise HTTPException( | |
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | |
| detail="Database connection is not available.", | |
| ) | |
| try: | |
| collection = mongo_client[MONGO_DB][MONGO_COLLECTION] | |
| result = await collection.insert_one(document) | |
| log.info("Inserted document with _id=%s request_id=%s", result.inserted_id, request_id) | |
| except Exception as exc: | |
| log.error("MongoDB write error: %s", exc) | |
| raise HTTPException( | |
| status_code=status.HTTP_502_BAD_GATEWAY, | |
| detail="Failed to persist data to the database.", | |
| ) | |
| return CurriculumResponse(request=request_id) | |
| # --------------------------------------------------------------------------- | |
| # Health check (no auth required) | |
| # --------------------------------------------------------------------------- | |
| async def health(): | |
| return {"status": "ok"} |