parentteacher / app.py
digifreely's picture
Update app.py
413ffe2 verified
import os
import hashlib
import logging
import time
from contextlib import asynccontextmanager
import httpx
from fastapi import FastAPI, Request, HTTPException, status
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import List
from motor.motor_asyncio import AsyncIOMotorClient
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Secrets / Environment variables
# ---------------------------------------------------------------------------
def _require_secret(name: str) -> str:
value = os.environ.get(name, "")
if not value:
log.warning("Secret '%s' is not set — related functionality will be unavailable.", name)
return value
HASH_VALUE = os.environ.get("HASH_VALUE", "")
CF_SECRET_KEY = os.environ.get("CF_SECRET_KEY", "")
ALLOWED_DOMAIN = os.environ.get("ALLOWED_DOMAIN", "")
MONGO_PASSWORD = _require_secret("MONGO_PASSWORD")
MONGO_DB = os.environ.get("MONGO_DB", "MariaPTDB")
MONGO_COLLECTION = os.environ.get("MONGO_COLL", "MariaPTColl")
MONGO_URI = os.environ.get("MONGO_URI") or \
f"mongodb+srv://testuser:{MONGO_PASSWORD}@cluster0.ntz2mpi.mongodb.net/"
# ---------------------------------------------------------------------------
# MongoDB client (shared, created once at startup)
# ---------------------------------------------------------------------------
mongo_client: AsyncIOMotorClient | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global mongo_client
log.info("Connecting to MongoDB…")
mongo_client = AsyncIOMotorClient(MONGO_URI, serverSelectionTimeoutMS=10_000)
# Quick connectivity check
try:
await mongo_client.admin.command("ping")
log.info("MongoDB connection established.")
except Exception as exc:
log.error("MongoDB ping failed: %s", exc)
yield
log.info("Closing MongoDB connection…")
mongo_client.close()
# ---------------------------------------------------------------------------
# FastAPI app
# ---------------------------------------------------------------------------
app = FastAPI(
title="MariaPT Curriculum API",
version="1.0.0",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ---------------------------------------------------------------------------
# Auth helpers
# ---------------------------------------------------------------------------
def _check_auth_code(code: str) -> bool:
if not HASH_VALUE:
return False
return hashlib.sha256(code.encode()).hexdigest() == HASH_VALUE
async def _check_turnstile(token: str) -> bool:
if not CF_SECRET_KEY:
return False
try:
async with httpx.AsyncClient(timeout=8.0) as client:
resp = await client.post(
"https://challenges.cloudflare.com/turnstile/v0/siteverify",
data={"secret": CF_SECRET_KEY, "response": token},
)
return resp.json().get("success", False)
except Exception as exc:
log.error("Turnstile verification error: %s", exc)
return False
async def _authenticate(request: Request) -> bool:
auth_code = request.headers.get("auth_code")
cf_token = request.headers.get("cf-turnstile-token")
if auth_code:
return _check_auth_code(auth_code)
if cf_token:
return await _check_turnstile(cf_token)
# Fallback: domain / referer check
referer = request.headers.get("referer", "")
origin = request.headers.get("origin", "")
if ALLOWED_DOMAIN and (ALLOWED_DOMAIN in referer or ALLOWED_DOMAIN in origin):
return True
return False
# ---------------------------------------------------------------------------
# Pydantic models
# ---------------------------------------------------------------------------
class CurriculumObjective(BaseModel):
topic: str
content: str
learning_objectives: List[str]
class CurriculumRequest(BaseModel):
board: str
class_: str = Field(..., alias="class")
subject: str
student_name: str
teacher_persona: str
curriculum_objectives: List[CurriculumObjective]
model_config = {"populate_by_name": True}
class CurriculumResponse(BaseModel):
request: str
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@app.get(
"/ping",
summary="Authenticated liveness check",
)
async def ping(raw_request: Request):
if not await _authenticate(raw_request):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Unauthorized: provide a valid auth_code, cf-turnstile-token, or request from an allowed domain.",
)
return {"message": "Alive"}
@app.post(
"/curriculum",
response_model=CurriculumResponse,
status_code=status.HTTP_201_CREATED,
summary="Store a curriculum request in MongoDB",
)
async def create_curriculum(
payload: CurriculumRequest,
raw_request: Request,
):
# --- Auth ---
if not await _authenticate(raw_request):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Unauthorized: provide a valid auth_code, cf-turnstile-token, or request from an allowed domain.",
)
# --- Generate request ID from current timestamp (ms) ---
request_id = str(int(time.time() * 1000))
# --- Build document ---
document = {
"request": request_id,
"board": payload.board,
"class": payload.class_,
"subject": payload.subject,
"student_name": payload.student_name,
"teacher_persona": payload.teacher_persona,
"curriculum_objectives": [
obj.model_dump() for obj in payload.curriculum_objectives
],
}
# --- Write to MongoDB ---
if mongo_client is None:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Database connection is not available.",
)
try:
collection = mongo_client[MONGO_DB][MONGO_COLLECTION]
result = await collection.insert_one(document)
log.info("Inserted document with _id=%s request_id=%s", result.inserted_id, request_id)
except Exception as exc:
log.error("MongoDB write error: %s", exc)
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail="Failed to persist data to the database.",
)
return CurriculumResponse(request=request_id)
# ---------------------------------------------------------------------------
# Health check (no auth required)
# ---------------------------------------------------------------------------
@app.get("/health", include_in_schema=False)
async def health():
return {"status": "ok"}