Spaces:
Running
Running
File size: 7,279 Bytes
2d930c9 413ffe2 2d930c9 413ffe2 2d930c9 413ffe2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 | import os
import hashlib
import logging
import time
from contextlib import asynccontextmanager
import httpx
from fastapi import FastAPI, Request, HTTPException, status
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import List
from motor.motor_asyncio import AsyncIOMotorClient
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Secrets / Environment variables
# ---------------------------------------------------------------------------
def _require_secret(name: str) -> str:
value = os.environ.get(name, "")
if not value:
log.warning("Secret '%s' is not set — related functionality will be unavailable.", name)
return value
HASH_VALUE = os.environ.get("HASH_VALUE", "")
CF_SECRET_KEY = os.environ.get("CF_SECRET_KEY", "")
ALLOWED_DOMAIN = os.environ.get("ALLOWED_DOMAIN", "")
MONGO_PASSWORD = _require_secret("MONGO_PASSWORD")
MONGO_DB = os.environ.get("MONGO_DB", "MariaPTDB")
MONGO_COLLECTION = os.environ.get("MONGO_COLL", "MariaPTColl")
MONGO_URI = os.environ.get("MONGO_URI") or \
f"mongodb+srv://testuser:{MONGO_PASSWORD}@cluster0.ntz2mpi.mongodb.net/"
# ---------------------------------------------------------------------------
# MongoDB client (shared, created once at startup)
# ---------------------------------------------------------------------------
mongo_client: AsyncIOMotorClient | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global mongo_client
log.info("Connecting to MongoDB…")
mongo_client = AsyncIOMotorClient(MONGO_URI, serverSelectionTimeoutMS=10_000)
# Quick connectivity check
try:
await mongo_client.admin.command("ping")
log.info("MongoDB connection established.")
except Exception as exc:
log.error("MongoDB ping failed: %s", exc)
yield
log.info("Closing MongoDB connection…")
mongo_client.close()
# ---------------------------------------------------------------------------
# FastAPI app
# ---------------------------------------------------------------------------
app = FastAPI(
title="MariaPT Curriculum API",
version="1.0.0",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ---------------------------------------------------------------------------
# Auth helpers
# ---------------------------------------------------------------------------
def _check_auth_code(code: str) -> bool:
if not HASH_VALUE:
return False
return hashlib.sha256(code.encode()).hexdigest() == HASH_VALUE
async def _check_turnstile(token: str) -> bool:
if not CF_SECRET_KEY:
return False
try:
async with httpx.AsyncClient(timeout=8.0) as client:
resp = await client.post(
"https://challenges.cloudflare.com/turnstile/v0/siteverify",
data={"secret": CF_SECRET_KEY, "response": token},
)
return resp.json().get("success", False)
except Exception as exc:
log.error("Turnstile verification error: %s", exc)
return False
async def _authenticate(request: Request) -> bool:
auth_code = request.headers.get("auth_code")
cf_token = request.headers.get("cf-turnstile-token")
if auth_code:
return _check_auth_code(auth_code)
if cf_token:
return await _check_turnstile(cf_token)
# Fallback: domain / referer check
referer = request.headers.get("referer", "")
origin = request.headers.get("origin", "")
if ALLOWED_DOMAIN and (ALLOWED_DOMAIN in referer or ALLOWED_DOMAIN in origin):
return True
return False
# ---------------------------------------------------------------------------
# Pydantic models
# ---------------------------------------------------------------------------
class CurriculumObjective(BaseModel):
topic: str
content: str
learning_objectives: List[str]
class CurriculumRequest(BaseModel):
board: str
class_: str = Field(..., alias="class")
subject: str
student_name: str
teacher_persona: str
curriculum_objectives: List[CurriculumObjective]
model_config = {"populate_by_name": True}
class CurriculumResponse(BaseModel):
request: str
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@app.get(
"/ping",
summary="Authenticated liveness check",
)
async def ping(raw_request: Request):
if not await _authenticate(raw_request):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Unauthorized: provide a valid auth_code, cf-turnstile-token, or request from an allowed domain.",
)
return {"message": "Alive"}
@app.post(
"/curriculum",
response_model=CurriculumResponse,
status_code=status.HTTP_201_CREATED,
summary="Store a curriculum request in MongoDB",
)
async def create_curriculum(
payload: CurriculumRequest,
raw_request: Request,
):
# --- Auth ---
if not await _authenticate(raw_request):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Unauthorized: provide a valid auth_code, cf-turnstile-token, or request from an allowed domain.",
)
# --- Generate request ID from current timestamp (ms) ---
request_id = str(int(time.time() * 1000))
# --- Build document ---
document = {
"request": request_id,
"board": payload.board,
"class": payload.class_,
"subject": payload.subject,
"student_name": payload.student_name,
"teacher_persona": payload.teacher_persona,
"curriculum_objectives": [
obj.model_dump() for obj in payload.curriculum_objectives
],
}
# --- Write to MongoDB ---
if mongo_client is None:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Database connection is not available.",
)
try:
collection = mongo_client[MONGO_DB][MONGO_COLLECTION]
result = await collection.insert_one(document)
log.info("Inserted document with _id=%s request_id=%s", result.inserted_id, request_id)
except Exception as exc:
log.error("MongoDB write error: %s", exc)
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail="Failed to persist data to the database.",
)
return CurriculumResponse(request=request_id)
# ---------------------------------------------------------------------------
# Health check (no auth required)
# ---------------------------------------------------------------------------
@app.get("/health", include_in_schema=False)
async def health():
return {"status": "ok"} |