import os import time from typing import List, Dict from fastapi import FastAPI, UploadFile, File, Depends from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from sqlalchemy.orm import Session from .db import Base, engine, SessionLocal from .models import ExtractionRecord from .schemas import ExtractionRecordBase, ExtractionStage from .openrouter_client import extract_fields_from_document # Ensure data dir exists for SQLite os.makedirs("data", exist_ok=True) # Create tables Base.metadata.create_all(bind=engine) app = FastAPI(title="Document Capture Demo – Backend") # CORS (for safety we allow all; you can tighten later) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) def get_db(): db = SessionLocal() try: yield db finally: db.close() @app.get("/ping") def ping(): """Healthcheck.""" return {"status": "ok", "message": "backend alive"} def make_stages(total_ms: int, status: str) -> Dict[str, ExtractionStage]: """ Build synthetic stage timing data for the History UI. For now we just split total_ms into 4 stages. """ if total_ms <= 0: total_ms = 1000 return { "uploading": ExtractionStage( time=int(total_ms * 0.15), status="completed", variation="normal", ), "aiAnalysis": ExtractionStage( time=int(total_ms * 0.55), status="completed" if status == "completed" else "failed", variation="normal", ), "dataExtraction": ExtractionStage( time=int(total_ms * 0.2), status="completed" if status == "completed" else "skipped", variation="fast", ), "outputRendering": ExtractionStage( time=int(total_ms * 0.1), status="completed" if status == "completed" else "skipped", variation="normal", ), } @app.post("/api/extract") async def extract_document( file: UploadFile = File(...), db: Session = Depends(get_db), ): """ Main extraction endpoint used by the Dashboard. 1) Read the uploaded file 2) Call OpenRouter + Qwen3-VL 3) Store a record in SQLite 4) Return extraction result + metadata """ start = time.time() content = await file.read() content_type = file.content_type or "application/octet-stream" size_mb = len(content) / 1024 / 1024 size_str = f"{size_mb:.2f} MB" try: extracted = await extract_fields_from_document(content, content_type, file.filename) total_ms = int((time.time() - start) * 1000) confidence = float(extracted.get("confidence", 90)) fields = extracted.get("fields", {}) fields_extracted = len(fields) if isinstance(fields, dict) else 0 status = "completed" error_message = None except Exception as e: total_ms = int((time.time() - start) * 1000) confidence = 0.0 fields = {} fields_extracted = 0 status = "failed" error_message = str(e) # Save record to DB rec = ExtractionRecord( file_name=file.filename, file_type=content_type, file_size=size_str, status=status, confidence=confidence, fields_extracted=fields_extracted, total_time_ms=total_ms, raw_output=str(fields), error_message=error_message, ) db.add(rec) db.commit() db.refresh(rec) stages = make_stages(total_ms, status) # Response shape that frontend will consume return { "id": rec.id, "fileName": rec.file_name, "fileType": rec.file_type, "fileSize": rec.file_size, "status": status, "confidence": confidence, "fieldsExtracted": fields_extracted, "totalTime": total_ms, "fields": fields, "stages": {k: v.dict() for k, v in stages.items()}, "errorMessage": error_message, } @app.get("/api/history", response_model=List[ExtractionRecordBase]) def get_history(db: Session = Depends(get_db)): """ Used by the History page. Returns last 100 records, with synthetic stage data. """ recs = ( db.query(ExtractionRecord) .order_by(ExtractionRecord.created_at.desc()) .limit(100) .all() ) output: List[ExtractionRecordBase] = [] for r in recs: stages = make_stages(r.total_time_ms or 1000, r.status or "completed") output.append( ExtractionRecordBase( id=r.id, fileName=r.file_name, fileType=r.file_type or "", fileSize=r.file_size or "", extractedAt=r.created_at, status=r.status or "completed", confidence=r.confidence or 0.0, fieldsExtracted=r.fields_extracted or 0, totalTime=r.total_time_ms or 0, stages=stages, errorMessage=r.error_message, ) ) return output # Static frontend mounting (used after we build React) # Dockerfile copies the Vite build into backend/frontend_dist frontend_dir = os.path.join( os.path.dirname(os.path.dirname(__file__)), "frontend_dist" ) if os.path.isdir(frontend_dir): app.mount( "/", StaticFiles(directory=frontend_dir, html=True), name="frontend", )