Spaces:
Sleeping
Sleeping
File size: 8,288 Bytes
e52b262 7865b91 e52b262 2441bab e52b262 a78f66b 2441bab e52b262 fce4455 a78f66b e3356c8 a78f66b e52b262 0745894 516ee53 fbaf5c4 a3d1427 e52b262 59446fc e52b262 a78f66b 2441bab a3d1427 0745894 65db32f 2baea4b fe5ddf7 a3d1427 0745894 a3d1427 a78f66b a3d1427 a78f66b e52b262 a78f66b e3356c8 a78f66b 59446fc fbaf5c4 644a42b a3d1427 fbaf5c4 59446fc a78f66b a3d1427 a78f66b 2441bab 516ee53 a78f66b 516ee53 a78f66b fce4455 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 | from __future__ import annotations
import os
import sys
import json
from datetime import datetime, timezone
from pathlib import Path
from tempfile import NamedTemporaryFile
from uuid import uuid4
import gradio as gr
import uvicorn
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
ROOT = Path(__file__).parent
sys.path.insert(0, str(ROOT / "src"))
from packetcourt import audit_packet
from packetcourt.models import AgentReview
from packetcourt.ocr import extract_text, merge_extractions
from packetcourt.remote_vision import extract_remote, is_configured
from packetcourt.remote_nemotron import is_configured as nemotron_is_configured
from packetcourt.remote_nemotron import review as nemotron_review
from packetcourt.samples import SAMPLES
from packetcourt.vlm import model_status
class AuditRequest(BaseModel):
front_text: str
back_text: str
class FeedbackRequest(BaseModel):
verdict: str
correction: str = ""
audit: dict
def run_audit(front_text: str, back_text: str):
result = audit_packet(front_text, back_text)
if not nemotron_is_configured():
return result
snapshot = {
"claims": [claim.model_dump(mode="json") for claim in result.claims],
"investigation": result.investigation.model_dump(),
"nutrition": result.nutrition.model_dump(),
"ingredients_found": bool(result.ingredients),
"expiry": result.expiry.model_dump(),
"limitations": result.limitations,
}
try:
result.agent_review = AgentReview.model_validate(nemotron_review(snapshot))
if not result.investigation.missing_evidence:
result.agent_review.status = "COMPLETE"
result.agent_review.priority = "No additional claim-resolving evidence is required."
result.agent_review.evidence_request = ""
result.agent_review.rationale = (
"Nemotron completed an independent review; PacketCourt's bounded investigation "
"found no unresolved evidence required for the detected front claims."
)
except Exception as exc:
result.agent_review = AgentReview(
status="UNAVAILABLE",
rationale=f"Nemotron review unavailable: {type(exc).__name__}: {exc}",
model="nvidia/Nemotron-Mini-4B-Instruct",
)
return result
def build_gradio_engine() -> gr.Blocks:
with gr.Blocks(title="PacketCourt Engine") as engine:
gr.Markdown(
"# PacketCourt Engine\n"
"The public product interface is served at `/`. This mounted Gradio engine "
"keeps PacketCourt compatible with the Build Small Gradio requirement."
)
front = gr.Textbox(label="Front claims")
back = gr.Textbox(label="Back-label evidence")
output = gr.JSON(label="Evidence case")
gr.Button("Audit").click(
lambda front_text, back_text: run_audit(front_text, back_text).model_dump(mode="json"),
[front, back],
output,
)
return engine
app = FastAPI(title="PacketCourt")
@app.get("/", response_class=HTMLResponse)
def index() -> str:
html = (ROOT / "frontend" / "index.html").read_text()
css = (ROOT / "frontend" / "styles.css").read_text()
javascript = (ROOT / "frontend" / "app.js").read_text()
return html.replace("/*__PACKETCOURT_CSS__*/", css).replace("/*__PACKETCOURT_JS__*/", javascript)
@app.get("/api/samples")
def samples() -> dict:
return SAMPLES
@app.get("/api/model")
def model() -> dict:
status = model_status()
status["router"] = (
os.getenv("PACKETCOURT_ROUTER_MODEL", "build-small-hackathon/packetcourt-evidence-router")
if os.getenv("PACKETCOURT_ROUTER", "0") == "1"
else "deterministic fallback"
)
status["nemotron_reviewer"] = (
"nvidia/Nemotron-Mini-4B-Instruct"
if nemotron_is_configured()
else "not configured"
)
if is_configured():
status.update(
enabled=True,
mode="OpenBMB MiniCPM-V-4.6 ZeroGPU extraction with deterministic audit",
companion=os.getenv("PACKETCOURT_VISION_SPACE"),
)
return status
@app.post("/api/audit")
def audit(request: AuditRequest) -> dict:
return run_audit(request.front_text, request.back_text).model_dump(mode="json")
@app.post("/api/feedback")
def feedback(request: FeedbackRequest) -> dict:
if request.verdict not in {"accurate", "needs_correction"}:
return {"status": "REJECTED", "message": "Choose accurate or needs correction."}
if request.verdict == "needs_correction" and len(request.correction.strip()) < 8:
return {"status": "REJECTED", "message": "Explain the correction so it can be reviewed."}
record_id = str(uuid4())
record = {
"id": record_id,
"created_at": datetime.now(timezone.utc).isoformat(),
"verdict": request.verdict,
"correction": request.correction.strip()[:1200],
"front_text": str(request.audit.get("front_text", ""))[:3000],
"back_text": str(request.audit.get("back_text", ""))[:9000],
"claims": request.audit.get("claims", []),
"investigation": request.audit.get("investigation", {}),
"nemotron_review": request.audit.get("agent_review", {}),
"proposed_router_examples": [
{
"text": claim.get("claim", ""),
"candidate_tools": [
step.get("tool", "")
for step in request.audit.get("investigation", {}).get("steps", [])
],
}
for claim in request.audit.get("claims", [])
],
"review_status": "pending_human_review",
"training_eligible": False,
"learning_policy": "Only approved corrections enter the next evidence-router fine-tune.",
}
dataset_id = os.getenv("PACKETCOURT_FEEDBACK_DATASET")
if not dataset_id:
return {
"status": "UNAVAILABLE",
"message": "The community learning queue is not configured on this deployment.",
}
try:
from huggingface_hub import HfApi
HfApi().upload_file(
path_or_fileobj=json.dumps(record, indent=2).encode(),
path_in_repo=f"feedback/{record_id}.json",
repo_id=dataset_id,
repo_type="dataset",
commit_message=f"feedback: queue PacketCourt review {record_id[:8]}",
)
except Exception as exc:
return {
"status": "UNAVAILABLE",
"message": f"Feedback could not be persisted: {type(exc).__name__}",
}
return {
"status": "QUEUED",
"id": record_id,
"message": "Review queued. It will become training data only after evidence review.",
"dataset": f"https://huggingface.co/datasets/{dataset_id}",
}
async def _read_uploads(uploads: list[UploadFile], side: str) -> dict:
extracted: list[tuple[str, str]] = []
for upload in uploads[:6]:
suffix = Path(upload.filename or "image.jpg").suffix or ".jpg"
with NamedTemporaryFile(suffix=suffix) as temp:
temp.write(await upload.read())
temp.flush()
extracted.append(extract_text(temp.name, side, extract_remote if is_configured() else None))
text, status, images = merge_extractions(extracted, side)
if len(uploads) > 6:
status += f" Only the first 6 of {len(uploads)} photos were processed."
return {"text": text, "status": status, "images": images}
@app.post("/api/ocr")
async def ocr(
fronts: list[UploadFile] | None = File(default=None),
backs: list[UploadFile] | None = File(default=None),
front: UploadFile | None = File(default=None),
back: UploadFile | None = File(default=None),
) -> dict:
front_uploads = list(fronts or []) + ([front] if front else [])
back_uploads = list(backs or []) + ([back] if back else [])
return {
"front": await _read_uploads(front_uploads, "front"),
"back": await _read_uploads(back_uploads, "back"),
}
app = gr.mount_gradio_app(app, build_gradio_engine(), path="/engine")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)
|