import json from pathlib import Path import time import os from fastapi import FastAPI, UploadFile, File, HTTPException, Request from fastapi.responses import HTMLResponse, FileResponse from pydantic import BaseModel from typing import List from services.retriever import Retriever from services.llm import generate_answer from services.tts import synthesize # ===================================================== # TENANT CONFIG # ===================================================== TENANT_ID = "ohamlab" TENANT_FILE = Path(f"tenants/{TENANT_ID}.json") if not TENANT_FILE.exists(): raise RuntimeError(f"Tenant config not found: {TENANT_FILE}") with TENANT_FILE.open(encoding="utf-8") as f: TENANT = json.load(f) DOCS_DIR = Path(f"documents/{TENANT_ID}") DOCS_DIR.mkdir(parents=True, exist_ok=True) # ===================================================== # RETRIEVER # ===================================================== retriever = Retriever() try: retriever.load_documents(DOCS_DIR) print(f"✅ Documents indexed: {len(retriever.doc_text_cache)}") except Exception as e: print(f"⚠️ No documents indexed yet: {e}") # ===================================================== # FASTAPI APP # ===================================================== app = FastAPI(title=f"{TENANT['brand']['name']} API") # ===================================================== # API QUOTA (PER-IP) # ===================================================== API_LIMIT = int(os.getenv("API_MAX_PER_DAY", 10)) # read from env, default 10 API_WINDOW = 86400 # 24h in seconds api_usage = {} # ip -> (count, window_start) def enforce_quota(request: Request): ip = request.client.host if request.client else "unknown" now = time.time() count, start = api_usage.get(ip, (0, now)) # Reset 24h window if now - start > API_WINDOW: count = 0 start = now if count >= API_LIMIT: raise HTTPException( status_code=429, detail={ "error": "Quota exceeded", "message": f"Daily limit of {API_LIMIT} requests reached. Try again later.", "reset_in_seconds": int(API_WINDOW - (now - start)) } ) api_usage[ip] = (count + 1, start) def get_quota_status(request: Request): ip = request.client.host if request.client else "unknown" now = time.time() count, start = api_usage.get(ip, (0, now)) if now - start > API_WINDOW: count = 0 start = now remaining = max(0, API_LIMIT - count) return {"ip": ip, "used": count, "remaining": remaining, "limit": API_LIMIT} # ===================================================== # HELPERS # ===================================================== def get_files() -> List[str]: return [f.name for f in DOCS_DIR.iterdir() if f.is_file()] async def log_request_response(request: Request, response: dict): try: body = await request.json() except Exception: body = {} print(f"[API LOG] {request.method} {request.url}") print(f"Request body: {json.dumps(body)}") print(f"Response: {json.dumps(response)}") # ===================================================== # ROOT / HEALTH / CONFIG # ===================================================== @app.get("/", response_class=HTMLResponse) def root(): return f"""

{TENANT['brand']['name']} – Document AI API

""" @app.get("/health") def health(): resp = { "status": "ok", "tenant": TENANT_ID, "documents_indexed": len(retriever.doc_text_cache), "chunks_indexed": len(retriever.chunk_texts), "daily_quota": API_LIMIT } print(f"[API LOG] GET /health -> {resp}") return resp @app.get("/api/config") def get_config(): resp = { "brand": TENANT["brand"], "features": TENANT["features"], "limits": TENANT["limits"], "languages": TENANT["features"]["languages"], } print(f"[API LOG] GET /api/config -> {resp}") return resp @app.get("/api/files") def list_files(request: Request): enforce_quota(request) files = [] for f in DOCS_DIR.iterdir(): if f.is_file(): files.append({"name": f.name, "size_kb": round(f.stat().st_size / 1024, 2)}) resp = {"tenant": TENANT_ID, "count": len(files), "files": files} print(f"[API LOG] GET /api/files -> {resp}") return resp # ===================================================== # UPLOAD # ===================================================== @app.post("/api/upload") async def upload_doc(file: UploadFile = File(...), request: Request = None): enforce_quota(request) try: filename = Path(file.filename).name dest = DOCS_DIR / filename with dest.open("wb") as f: f.write(await file.read()) return {"status": "ok", "file": filename, "message": "Uploaded. Call /api/reload to index."} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # ===================================================== # RELOAD / CLEAR CACHE # ===================================================== @app.post("/api/reload") def reload_docs(request: Request): enforce_quota(request) retriever.load_documents(DOCS_DIR) resp = { "status": "ok", "documents_indexed": len(retriever.doc_text_cache), "chunks_indexed": len(retriever.chunk_texts) } print(f"[API LOG] POST /api/reload -> {resp}") return resp @app.post("/api/clear-cache") def clear_cache(request: Request): enforce_quota(request) retriever.clear_cache() resp = {"status": "ok", "message": "Cache cleared"} print(f"[API LOG] POST /api/clear-cache -> {resp}") return resp # ===================================================== # AUDIO # ===================================================== @app.get("/api/audio") def get_audio(path: str, request: Request): enforce_quota(request) if not Path(path).exists(): raise HTTPException(404, "Audio not found") resp = f"Streaming audio: {path}" print(f"[API LOG] GET /api/audio -> {resp}") return FileResponse(path, media_type="audio/wav") # ===================================================== # ASK ENDPOINT # ===================================================== class AskRequest(BaseModel): question: str language: str = "en" documents: list[str] | None = None @app.post("/api/ask") async def ask(req: AskRequest, request: Request): enforce_quota(request) # <<<<<< APPLY QUOTA HERE try: docs_to_query = req.documents if req.documents else get_files() total_chars = sum(len(retriever.doc_text_cache[d]) for d in docs_to_query) FAST_PATH_CHAR_LIMIT = 50000 if total_chars <= FAST_PATH_CHAR_LIMIT: context = retriever.get_plaintext_context(docs_to_query) sources_used = docs_to_query else: context, sources_used = retriever.search( query=req.question, documents=docs_to_query, top_k=5 ) answer = generate_answer( question=req.question, context=context, max_tokens=TENANT["limits"]["max_tokens"] ) audio_path = synthesize(answer, req.language) if TENANT["features"].get("tts") else None resp = {"answer": answer, "sources": sources_used, "audio_path": audio_path} await log_request_response(request, resp) return resp except HTTPException: raise # rethrow quota errors cleanly except Exception as e: resp = {"answer": "", "sources": [], "audio_path": None, "error": str(e)} await log_request_response(request, resp) return resp # ===================================================== # NEW QUOTA STATUS ENDPOINT # ===================================================== @app.get("/api/quota") def quota_status(request: Request): return get_quota_status(request)