packetcourt / app.py
DIV-45's picture
fix: align completed review rationale
fe5ddf7 verified
Raw
History Blame Contribute Delete
8.29 kB
from __future__ import annotations
import os
import sys
import json
from datetime import datetime, timezone
from pathlib import Path
from tempfile import NamedTemporaryFile
from uuid import uuid4
import gradio as gr
import uvicorn
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
ROOT = Path(__file__).parent
sys.path.insert(0, str(ROOT / "src"))
from packetcourt import audit_packet
from packetcourt.models import AgentReview
from packetcourt.ocr import extract_text, merge_extractions
from packetcourt.remote_vision import extract_remote, is_configured
from packetcourt.remote_nemotron import is_configured as nemotron_is_configured
from packetcourt.remote_nemotron import review as nemotron_review
from packetcourt.samples import SAMPLES
from packetcourt.vlm import model_status
class AuditRequest(BaseModel):
front_text: str
back_text: str
class FeedbackRequest(BaseModel):
verdict: str
correction: str = ""
audit: dict
def run_audit(front_text: str, back_text: str):
result = audit_packet(front_text, back_text)
if not nemotron_is_configured():
return result
snapshot = {
"claims": [claim.model_dump(mode="json") for claim in result.claims],
"investigation": result.investigation.model_dump(),
"nutrition": result.nutrition.model_dump(),
"ingredients_found": bool(result.ingredients),
"expiry": result.expiry.model_dump(),
"limitations": result.limitations,
}
try:
result.agent_review = AgentReview.model_validate(nemotron_review(snapshot))
if not result.investigation.missing_evidence:
result.agent_review.status = "COMPLETE"
result.agent_review.priority = "No additional claim-resolving evidence is required."
result.agent_review.evidence_request = ""
result.agent_review.rationale = (
"Nemotron completed an independent review; PacketCourt's bounded investigation "
"found no unresolved evidence required for the detected front claims."
)
except Exception as exc:
result.agent_review = AgentReview(
status="UNAVAILABLE",
rationale=f"Nemotron review unavailable: {type(exc).__name__}: {exc}",
model="nvidia/Nemotron-Mini-4B-Instruct",
)
return result
def build_gradio_engine() -> gr.Blocks:
with gr.Blocks(title="PacketCourt Engine") as engine:
gr.Markdown(
"# PacketCourt Engine\n"
"The public product interface is served at `/`. This mounted Gradio engine "
"keeps PacketCourt compatible with the Build Small Gradio requirement."
)
front = gr.Textbox(label="Front claims")
back = gr.Textbox(label="Back-label evidence")
output = gr.JSON(label="Evidence case")
gr.Button("Audit").click(
lambda front_text, back_text: run_audit(front_text, back_text).model_dump(mode="json"),
[front, back],
output,
)
return engine
app = FastAPI(title="PacketCourt")
@app.get("/", response_class=HTMLResponse)
def index() -> str:
html = (ROOT / "frontend" / "index.html").read_text()
css = (ROOT / "frontend" / "styles.css").read_text()
javascript = (ROOT / "frontend" / "app.js").read_text()
return html.replace("/*__PACKETCOURT_CSS__*/", css).replace("/*__PACKETCOURT_JS__*/", javascript)
@app.get("/api/samples")
def samples() -> dict:
return SAMPLES
@app.get("/api/model")
def model() -> dict:
status = model_status()
status["router"] = (
os.getenv("PACKETCOURT_ROUTER_MODEL", "build-small-hackathon/packetcourt-evidence-router")
if os.getenv("PACKETCOURT_ROUTER", "0") == "1"
else "deterministic fallback"
)
status["nemotron_reviewer"] = (
"nvidia/Nemotron-Mini-4B-Instruct"
if nemotron_is_configured()
else "not configured"
)
if is_configured():
status.update(
enabled=True,
mode="OpenBMB MiniCPM-V-4.6 ZeroGPU extraction with deterministic audit",
companion=os.getenv("PACKETCOURT_VISION_SPACE"),
)
return status
@app.post("/api/audit")
def audit(request: AuditRequest) -> dict:
return run_audit(request.front_text, request.back_text).model_dump(mode="json")
@app.post("/api/feedback")
def feedback(request: FeedbackRequest) -> dict:
if request.verdict not in {"accurate", "needs_correction"}:
return {"status": "REJECTED", "message": "Choose accurate or needs correction."}
if request.verdict == "needs_correction" and len(request.correction.strip()) < 8:
return {"status": "REJECTED", "message": "Explain the correction so it can be reviewed."}
record_id = str(uuid4())
record = {
"id": record_id,
"created_at": datetime.now(timezone.utc).isoformat(),
"verdict": request.verdict,
"correction": request.correction.strip()[:1200],
"front_text": str(request.audit.get("front_text", ""))[:3000],
"back_text": str(request.audit.get("back_text", ""))[:9000],
"claims": request.audit.get("claims", []),
"investigation": request.audit.get("investigation", {}),
"nemotron_review": request.audit.get("agent_review", {}),
"proposed_router_examples": [
{
"text": claim.get("claim", ""),
"candidate_tools": [
step.get("tool", "")
for step in request.audit.get("investigation", {}).get("steps", [])
],
}
for claim in request.audit.get("claims", [])
],
"review_status": "pending_human_review",
"training_eligible": False,
"learning_policy": "Only approved corrections enter the next evidence-router fine-tune.",
}
dataset_id = os.getenv("PACKETCOURT_FEEDBACK_DATASET")
if not dataset_id:
return {
"status": "UNAVAILABLE",
"message": "The community learning queue is not configured on this deployment.",
}
try:
from huggingface_hub import HfApi
HfApi().upload_file(
path_or_fileobj=json.dumps(record, indent=2).encode(),
path_in_repo=f"feedback/{record_id}.json",
repo_id=dataset_id,
repo_type="dataset",
commit_message=f"feedback: queue PacketCourt review {record_id[:8]}",
)
except Exception as exc:
return {
"status": "UNAVAILABLE",
"message": f"Feedback could not be persisted: {type(exc).__name__}",
}
return {
"status": "QUEUED",
"id": record_id,
"message": "Review queued. It will become training data only after evidence review.",
"dataset": f"https://huggingface.co/datasets/{dataset_id}",
}
async def _read_uploads(uploads: list[UploadFile], side: str) -> dict:
extracted: list[tuple[str, str]] = []
for upload in uploads[:6]:
suffix = Path(upload.filename or "image.jpg").suffix or ".jpg"
with NamedTemporaryFile(suffix=suffix) as temp:
temp.write(await upload.read())
temp.flush()
extracted.append(extract_text(temp.name, side, extract_remote if is_configured() else None))
text, status, images = merge_extractions(extracted, side)
if len(uploads) > 6:
status += f" Only the first 6 of {len(uploads)} photos were processed."
return {"text": text, "status": status, "images": images}
@app.post("/api/ocr")
async def ocr(
fronts: list[UploadFile] | None = File(default=None),
backs: list[UploadFile] | None = File(default=None),
front: UploadFile | None = File(default=None),
back: UploadFile | None = File(default=None),
) -> dict:
front_uploads = list(fronts or []) + ([front] if front else [])
back_uploads = list(backs or []) + ([back] if back else [])
return {
"front": await _read_uploads(front_uploads, "front"),
"back": await _read_uploads(back_uploads, "back"),
}
app = gr.mount_gradio_app(app, build_gradio_engine(), path="/engine")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)