doco-talk / app.py
rahul7star's picture
Update app.py
38e4b77 verified
import json
from pathlib import Path
import time
import os
from fastapi import FastAPI, UploadFile, File, HTTPException, Request
from fastapi.responses import HTMLResponse, FileResponse
from pydantic import BaseModel
from typing import List
from services.retriever import Retriever
from services.llm import generate_answer
from services.tts import synthesize
# =====================================================
# TENANT CONFIG
# =====================================================
TENANT_ID = "ohamlab"
TENANT_FILE = Path(f"tenants/{TENANT_ID}.json")
if not TENANT_FILE.exists():
raise RuntimeError(f"Tenant config not found: {TENANT_FILE}")
with TENANT_FILE.open(encoding="utf-8") as f:
TENANT = json.load(f)
DOCS_DIR = Path(f"documents/{TENANT_ID}")
DOCS_DIR.mkdir(parents=True, exist_ok=True)
# =====================================================
# RETRIEVER
# =====================================================
retriever = Retriever()
try:
retriever.load_documents(DOCS_DIR)
print(f"✅ Documents indexed: {len(retriever.doc_text_cache)}")
except Exception as e:
print(f"⚠️ No documents indexed yet: {e}")
# =====================================================
# FASTAPI APP
# =====================================================
app = FastAPI(title=f"{TENANT['brand']['name']} API")
# =====================================================
# API QUOTA (PER-IP)
# =====================================================
API_LIMIT = int(os.getenv("API_MAX_PER_DAY", 10)) # read from env, default 10
API_WINDOW = 86400 # 24h in seconds
api_usage = {} # ip -> (count, window_start)
def enforce_quota(request: Request):
ip = request.client.host if request.client else "unknown"
now = time.time()
count, start = api_usage.get(ip, (0, now))
# Reset 24h window
if now - start > API_WINDOW:
count = 0
start = now
if count >= API_LIMIT:
raise HTTPException(
status_code=429,
detail={
"error": "Quota exceeded",
"message": f"Daily limit of {API_LIMIT} requests reached. Try again later.",
"reset_in_seconds": int(API_WINDOW - (now - start))
}
)
api_usage[ip] = (count + 1, start)
def get_quota_status(request: Request):
ip = request.client.host if request.client else "unknown"
now = time.time()
count, start = api_usage.get(ip, (0, now))
if now - start > API_WINDOW:
count = 0
start = now
remaining = max(0, API_LIMIT - count)
return {"ip": ip, "used": count, "remaining": remaining, "limit": API_LIMIT}
# =====================================================
# HELPERS
# =====================================================
def get_files() -> List[str]:
return [f.name for f in DOCS_DIR.iterdir() if f.is_file()]
async def log_request_response(request: Request, response: dict):
try:
body = await request.json()
except Exception:
body = {}
print(f"[API LOG] {request.method} {request.url}")
print(f"Request body: {json.dumps(body)}")
print(f"Response: {json.dumps(response)}")
# =====================================================
# ROOT / HEALTH / CONFIG
# =====================================================
@app.get("/", response_class=HTMLResponse)
def root():
return f"""
<h1>{TENANT['brand']['name']} – Document AI API</h1>
<ul>
<li>/health</li>
<li>/api/config</li>
<li>/api/files</li>
<li>POST /api/upload</li>
<li>POST /api/reload</li>
<li>POST /api/clear-cache</li>
<li>POST /api/ask</li>
<li>GET /api/quota</li>
</ul>
"""
@app.get("/health")
def health():
resp = {
"status": "ok",
"tenant": TENANT_ID,
"documents_indexed": len(retriever.doc_text_cache),
"chunks_indexed": len(retriever.chunk_texts),
"daily_quota": API_LIMIT
}
print(f"[API LOG] GET /health -> {resp}")
return resp
@app.get("/api/config")
def get_config():
resp = {
"brand": TENANT["brand"],
"features": TENANT["features"],
"limits": TENANT["limits"],
"languages": TENANT["features"]["languages"],
}
print(f"[API LOG] GET /api/config -> {resp}")
return resp
@app.get("/api/files")
def list_files(request: Request):
enforce_quota(request)
files = []
for f in DOCS_DIR.iterdir():
if f.is_file():
files.append({"name": f.name, "size_kb": round(f.stat().st_size / 1024, 2)})
resp = {"tenant": TENANT_ID, "count": len(files), "files": files}
print(f"[API LOG] GET /api/files -> {resp}")
return resp
# =====================================================
# UPLOAD
# =====================================================
@app.post("/api/upload")
async def upload_doc(file: UploadFile = File(...), request: Request = None):
enforce_quota(request)
try:
filename = Path(file.filename).name
dest = DOCS_DIR / filename
with dest.open("wb") as f:
f.write(await file.read())
return {"status": "ok", "file": filename, "message": "Uploaded. Call /api/reload to index."}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# =====================================================
# RELOAD / CLEAR CACHE
# =====================================================
@app.post("/api/reload")
def reload_docs(request: Request):
enforce_quota(request)
retriever.load_documents(DOCS_DIR)
resp = {
"status": "ok",
"documents_indexed": len(retriever.doc_text_cache),
"chunks_indexed": len(retriever.chunk_texts)
}
print(f"[API LOG] POST /api/reload -> {resp}")
return resp
@app.post("/api/clear-cache")
def clear_cache(request: Request):
enforce_quota(request)
retriever.clear_cache()
resp = {"status": "ok", "message": "Cache cleared"}
print(f"[API LOG] POST /api/clear-cache -> {resp}")
return resp
# =====================================================
# AUDIO
# =====================================================
@app.get("/api/audio")
def get_audio(path: str, request: Request):
enforce_quota(request)
if not Path(path).exists():
raise HTTPException(404, "Audio not found")
resp = f"Streaming audio: {path}"
print(f"[API LOG] GET /api/audio -> {resp}")
return FileResponse(path, media_type="audio/wav")
# =====================================================
# ASK ENDPOINT
# =====================================================
class AskRequest(BaseModel):
question: str
language: str = "en"
documents: list[str] | None = None
@app.post("/api/ask")
async def ask(req: AskRequest, request: Request):
enforce_quota(request) # <<<<<< APPLY QUOTA HERE
try:
docs_to_query = req.documents if req.documents else get_files()
total_chars = sum(len(retriever.doc_text_cache[d]) for d in docs_to_query)
FAST_PATH_CHAR_LIMIT = 50000
if total_chars <= FAST_PATH_CHAR_LIMIT:
context = retriever.get_plaintext_context(docs_to_query)
sources_used = docs_to_query
else:
context, sources_used = retriever.search(
query=req.question,
documents=docs_to_query,
top_k=5
)
answer = generate_answer(
question=req.question,
context=context,
max_tokens=TENANT["limits"]["max_tokens"]
)
audio_path = synthesize(answer, req.language) if TENANT["features"].get("tts") else None
resp = {"answer": answer, "sources": sources_used, "audio_path": audio_path}
await log_request_response(request, resp)
return resp
except HTTPException:
raise # rethrow quota errors cleanly
except Exception as e:
resp = {"answer": "", "sources": [], "audio_path": None, "error": str(e)}
await log_request_response(request, resp)
return resp
# =====================================================
# NEW QUOTA STATUS ENDPOINT
# =====================================================
@app.get("/api/quota")
def quota_status(request: Request):
return get_quota_status(request)