| from __future__ import annotations |
|
|
| import os |
| import sys |
| import json |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from tempfile import NamedTemporaryFile |
| from uuid import uuid4 |
|
|
| import gradio as gr |
| import uvicorn |
| from fastapi import FastAPI, File, UploadFile |
| from fastapi.responses import HTMLResponse |
| from pydantic import BaseModel |
|
|
| ROOT = Path(__file__).parent |
| sys.path.insert(0, str(ROOT / "src")) |
|
|
| from packetcourt import audit_packet |
| from packetcourt.models import AgentReview |
| from packetcourt.ocr import extract_text, merge_extractions |
| from packetcourt.remote_vision import extract_remote, is_configured |
| from packetcourt.remote_nemotron import is_configured as nemotron_is_configured |
| from packetcourt.remote_nemotron import review as nemotron_review |
| from packetcourt.samples import SAMPLES |
| from packetcourt.vlm import model_status |
|
|
|
|
| class AuditRequest(BaseModel): |
| front_text: str |
| back_text: str |
|
|
|
|
| class FeedbackRequest(BaseModel): |
| verdict: str |
| correction: str = "" |
| audit: dict |
|
|
|
|
| def run_audit(front_text: str, back_text: str): |
| result = audit_packet(front_text, back_text) |
| if not nemotron_is_configured(): |
| return result |
| snapshot = { |
| "claims": [claim.model_dump(mode="json") for claim in result.claims], |
| "investigation": result.investigation.model_dump(), |
| "nutrition": result.nutrition.model_dump(), |
| "ingredients_found": bool(result.ingredients), |
| "expiry": result.expiry.model_dump(), |
| "limitations": result.limitations, |
| } |
| try: |
| result.agent_review = AgentReview.model_validate(nemotron_review(snapshot)) |
| if not result.investigation.missing_evidence: |
| result.agent_review.status = "COMPLETE" |
| result.agent_review.priority = "No additional claim-resolving evidence is required." |
| result.agent_review.evidence_request = "" |
| result.agent_review.rationale = ( |
| "Nemotron completed an independent review; PacketCourt's bounded investigation " |
| "found no unresolved evidence required for the detected front claims." |
| ) |
| except Exception as exc: |
| result.agent_review = AgentReview( |
| status="UNAVAILABLE", |
| rationale=f"Nemotron review unavailable: {type(exc).__name__}: {exc}", |
| model="nvidia/Nemotron-Mini-4B-Instruct", |
| ) |
| return result |
|
|
|
|
| def build_gradio_engine() -> gr.Blocks: |
| with gr.Blocks(title="PacketCourt Engine") as engine: |
| gr.Markdown( |
| "# PacketCourt Engine\n" |
| "The public product interface is served at `/`. This mounted Gradio engine " |
| "keeps PacketCourt compatible with the Build Small Gradio requirement." |
| ) |
| front = gr.Textbox(label="Front claims") |
| back = gr.Textbox(label="Back-label evidence") |
| output = gr.JSON(label="Evidence case") |
| gr.Button("Audit").click( |
| lambda front_text, back_text: run_audit(front_text, back_text).model_dump(mode="json"), |
| [front, back], |
| output, |
| ) |
| return engine |
|
|
|
|
| app = FastAPI(title="PacketCourt") |
|
|
|
|
| @app.get("/", response_class=HTMLResponse) |
| def index() -> str: |
| html = (ROOT / "frontend" / "index.html").read_text() |
| css = (ROOT / "frontend" / "styles.css").read_text() |
| javascript = (ROOT / "frontend" / "app.js").read_text() |
| return html.replace("/*__PACKETCOURT_CSS__*/", css).replace("/*__PACKETCOURT_JS__*/", javascript) |
|
|
|
|
| @app.get("/api/samples") |
| def samples() -> dict: |
| return SAMPLES |
|
|
|
|
| @app.get("/api/model") |
| def model() -> dict: |
| status = model_status() |
| status["router"] = ( |
| os.getenv("PACKETCOURT_ROUTER_MODEL", "build-small-hackathon/packetcourt-evidence-router") |
| if os.getenv("PACKETCOURT_ROUTER", "0") == "1" |
| else "deterministic fallback" |
| ) |
| status["nemotron_reviewer"] = ( |
| "nvidia/Nemotron-Mini-4B-Instruct" |
| if nemotron_is_configured() |
| else "not configured" |
| ) |
| if is_configured(): |
| status.update( |
| enabled=True, |
| mode="OpenBMB MiniCPM-V-4.6 ZeroGPU extraction with deterministic audit", |
| companion=os.getenv("PACKETCOURT_VISION_SPACE"), |
| ) |
| return status |
|
|
|
|
| @app.post("/api/audit") |
| def audit(request: AuditRequest) -> dict: |
| return run_audit(request.front_text, request.back_text).model_dump(mode="json") |
|
|
|
|
| @app.post("/api/feedback") |
| def feedback(request: FeedbackRequest) -> dict: |
| if request.verdict not in {"accurate", "needs_correction"}: |
| return {"status": "REJECTED", "message": "Choose accurate or needs correction."} |
| if request.verdict == "needs_correction" and len(request.correction.strip()) < 8: |
| return {"status": "REJECTED", "message": "Explain the correction so it can be reviewed."} |
|
|
| record_id = str(uuid4()) |
| record = { |
| "id": record_id, |
| "created_at": datetime.now(timezone.utc).isoformat(), |
| "verdict": request.verdict, |
| "correction": request.correction.strip()[:1200], |
| "front_text": str(request.audit.get("front_text", ""))[:3000], |
| "back_text": str(request.audit.get("back_text", ""))[:9000], |
| "claims": request.audit.get("claims", []), |
| "investigation": request.audit.get("investigation", {}), |
| "nemotron_review": request.audit.get("agent_review", {}), |
| "proposed_router_examples": [ |
| { |
| "text": claim.get("claim", ""), |
| "candidate_tools": [ |
| step.get("tool", "") |
| for step in request.audit.get("investigation", {}).get("steps", []) |
| ], |
| } |
| for claim in request.audit.get("claims", []) |
| ], |
| "review_status": "pending_human_review", |
| "training_eligible": False, |
| "learning_policy": "Only approved corrections enter the next evidence-router fine-tune.", |
| } |
| dataset_id = os.getenv("PACKETCOURT_FEEDBACK_DATASET") |
| if not dataset_id: |
| return { |
| "status": "UNAVAILABLE", |
| "message": "The community learning queue is not configured on this deployment.", |
| } |
| try: |
| from huggingface_hub import HfApi |
|
|
| HfApi().upload_file( |
| path_or_fileobj=json.dumps(record, indent=2).encode(), |
| path_in_repo=f"feedback/{record_id}.json", |
| repo_id=dataset_id, |
| repo_type="dataset", |
| commit_message=f"feedback: queue PacketCourt review {record_id[:8]}", |
| ) |
| except Exception as exc: |
| return { |
| "status": "UNAVAILABLE", |
| "message": f"Feedback could not be persisted: {type(exc).__name__}", |
| } |
| return { |
| "status": "QUEUED", |
| "id": record_id, |
| "message": "Review queued. It will become training data only after evidence review.", |
| "dataset": f"https://huggingface.co/datasets/{dataset_id}", |
| } |
|
|
|
|
| async def _read_uploads(uploads: list[UploadFile], side: str) -> dict: |
| extracted: list[tuple[str, str]] = [] |
| for upload in uploads[:6]: |
| suffix = Path(upload.filename or "image.jpg").suffix or ".jpg" |
| with NamedTemporaryFile(suffix=suffix) as temp: |
| temp.write(await upload.read()) |
| temp.flush() |
| extracted.append(extract_text(temp.name, side, extract_remote if is_configured() else None)) |
| text, status, images = merge_extractions(extracted, side) |
| if len(uploads) > 6: |
| status += f" Only the first 6 of {len(uploads)} photos were processed." |
| return {"text": text, "status": status, "images": images} |
|
|
|
|
| @app.post("/api/ocr") |
| async def ocr( |
| fronts: list[UploadFile] | None = File(default=None), |
| backs: list[UploadFile] | None = File(default=None), |
| front: UploadFile | None = File(default=None), |
| back: UploadFile | None = File(default=None), |
| ) -> dict: |
| front_uploads = list(fronts or []) + ([front] if front else []) |
| back_uploads = list(backs or []) + ([back] if back else []) |
| return { |
| "front": await _read_uploads(front_uploads, "front"), |
| "back": await _read_uploads(back_uploads, "back"), |
| } |
|
|
|
|
| app = gr.mount_gradio_app(app, build_gradio_engine(), path="/engine") |
|
|
|
|
| if __name__ == "__main__": |
| uvicorn.run(app, host="0.0.0.0", port=7860) |
|
|