ForgeSight / backend /server.py
rasAli02's picture
feat: finalize MI300X inference connection and live status update
8d2779b
from fastapi import FastAPI, APIRouter, HTTPException
from dotenv import load_dotenv
from starlette.middleware.cors import CORSMiddleware
from motor.motor_asyncio import AsyncIOMotorClient
import os
import logging
import math
import time
import uuid
from pathlib import Path
from pydantic import BaseModel, Field, ConfigDict
from typing import List, Optional, Any, Dict
from datetime import datetime, timezone
from agents import run_pipeline, generate_social_post
ROOT_DIR = Path(__file__).parent
load_dotenv(ROOT_DIR / ".env")
mongo_url = os.environ["MONGO_URL"]
client = AsyncIOMotorClient(mongo_url)
db = client[os.environ["DB_NAME"]]
app = FastAPI(title="ForgeSight API")
api_router = APIRouter(prefix="/api")
# ------------------------- Models -------------------------
class InspectionCreate(BaseModel):
image_base64: str = Field(..., description="Raw base64 (no data URI prefix)")
notes: Optional[str] = ""
product_spec: Optional[str] = ""
source: Optional[str] = "upload" # upload | sample
class InspectionSummary(BaseModel):
model_config = ConfigDict(extra="ignore")
id: str
created_at: str
verdict: str
confidence: float
headline: str
defect_count: int
priority: str
source: str
class JournalCreate(BaseModel):
title: str
body: str
tags: List[str] = []
class JournalEntry(BaseModel):
id: str
created_at: str
title: str
body: str
tags: List[str]
x_post: str
linkedin_post: str
# ------------------------- Helpers -------------------------
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _summarize(inspection: Dict[str, Any]) -> Dict[str, Any]:
agents = inspection.get("transcript", {}).get("agents", [])
inspector = next((a for a in agents if a["role"] == "inspector"), None)
reporter = next((a for a in agents if a["role"] == "reporter"), None)
action = next((a for a in agents if a["role"] == "action"), None)
inspector_out = (inspector or {}).get("output", {}).get("parsed", {}) or {}
reporter_out = (reporter or {}).get("output", {}).get("parsed", {}) or {}
action_out = (action or {}).get("output", {}).get("parsed", {}) or {}
defects = inspector_out.get("defects") or []
return {
"id": inspection["id"],
"created_at": inspection["created_at"],
"verdict": inspector_out.get("verdict", "warn"),
"confidence": float(inspector_out.get("confidence", 0.0) or 0.0),
"headline": reporter_out.get("headline") or inspector_out.get("observation", "Inspection complete")[:60],
"defect_count": len(defects) if isinstance(defects, list) else 0,
"priority": action_out.get("priority", "P2"),
"source": inspection.get("source", "upload"),
}
# ------------------------- Routes -------------------------
@api_router.get("/")
async def root():
return {"service": "forgesight", "status": "online", "track": "AMD Hackathon — Tracks 1+2+3"}
@api_router.post("/inspections")
async def create_inspection(payload: InspectionCreate):
# Strip potential data URI prefix
img_b64 = payload.image_base64
if "," in img_b64 and img_b64.strip().startswith("data:"):
img_b64 = img_b64.split(",", 1)[1]
try:
transcript = await run_pipeline(
image_base64=img_b64,
notes=payload.notes or "",
product_spec=payload.product_spec or "",
)
except Exception as e:
logger.exception("Agent pipeline failed")
raise HTTPException(status_code=500, detail=f"Agent pipeline failed: {str(e)}")
inspection = {
"id": str(uuid.uuid4()),
"created_at": _now_iso(),
"notes": payload.notes or "",
"product_spec": payload.product_spec or "",
"source": payload.source or "upload",
"transcript": transcript,
}
# Do NOT persist image_base64 to keep docs small; store SHA/size if needed
doc = {**inspection}
await db.inspections.insert_one(doc)
return {"id": inspection["id"], "created_at": inspection["created_at"], "transcript": transcript, "summary": _summarize(inspection)}
@api_router.get("/inspections")
async def list_inspections(limit: int = 50):
cursor = db.inspections.find({}, {"_id": 0}).sort("created_at", -1).limit(limit)
items = []
async for doc in cursor:
items.append(_summarize(doc))
return {"items": items, "total": len(items)}
@api_router.get("/inspections/{inspection_id}")
async def get_inspection(inspection_id: str):
doc = await db.inspections.find_one({"id": inspection_id}, {"_id": 0})
if not doc:
raise HTTPException(status_code=404, detail="Inspection not found")
return {**doc, "summary": _summarize(doc)}
@api_router.get("/metrics")
async def metrics():
cursor = db.inspections.find({}, {"_id": 0})
total = 0
verdict_counts = {"pass": 0, "warn": 0, "fail": 0}
defect_type_counts: Dict[str, int] = {}
confidences: List[float] = []
async for doc in cursor:
total += 1
summary = _summarize(doc)
v = summary["verdict"] if summary["verdict"] in verdict_counts else "warn"
verdict_counts[v] += 1
confidences.append(summary["confidence"])
agents = doc.get("transcript", {}).get("agents", [])
inspector = next((a for a in agents if a["role"] == "inspector"), None)
defects = ((inspector or {}).get("output", {}).get("parsed", {}) or {}).get("defects") or []
if isinstance(defects, list):
for d in defects:
if isinstance(d, dict):
t = (d.get("type") or "unknown").lower()
defect_type_counts[t] = defect_type_counts.get(t, 0) + 1
avg_conf = sum(confidences) / len(confidences) if confidences else 0.0
top_defects = sorted(defect_type_counts.items(), key=lambda x: x[1], reverse=True)[:6]
quality_score = 0
if total > 0:
quality_score = round(100 * (verdict_counts["pass"] + 0.5 * verdict_counts["warn"]) / total)
return {
"total_inspections": total,
"verdict_counts": verdict_counts,
"avg_confidence": round(avg_conf, 3),
"top_defects": [{"type": t, "count": c} for t, c in top_defects],
"quality_score": quality_score,
}
@api_router.get("/telemetry")
async def telemetry():
"""Simulated MI300X telemetry. Labeled as SIMULATED in the UI."""
t = time.time()
gpu_util = 62 + 30 * math.sin(t / 4.0) # 32 - 92
vram_gb_total = 192.0 # MI300X HBM3
vram_used = 88 + 20 * math.sin(t / 7.0)
tokens_per_sec = 2850 + 450 * math.sin(t / 3.0)
power_w = 620 + 80 * math.sin(t / 5.0)
temp_c = 58 + 7 * math.sin(t / 6.0)
return {
"simulated": True,
"device": "AMD Instinct MI300X",
"gpu_util_pct": round(max(0, min(100, gpu_util)), 1),
"vram_used_gb": round(max(0, vram_used), 1),
"vram_total_gb": vram_gb_total,
"tokens_per_sec": int(max(0, tokens_per_sec)),
"power_watts": int(max(0, power_w)),
"temp_c": round(max(0, temp_c), 1),
"ts": _now_iso(),
}
@api_router.get("/blueprint")
async def blueprint():
return {
"stack": [
{
"layer": "Hardware",
"title": "AMD Instinct MI300X",
"detail": "192 GB HBM3 · 5.3 TB/s memory bandwidth · 8× GPU node",
"why": "Massive VRAM enables serving 70B-class Qwen-VL models without sharding.",
},
{
"layer": "Runtime",
"title": "ROCm 6.2",
"detail": "Open compute runtime · HIP · MIOpen · RCCL",
"why": "PyTorch + vLLM run natively on MI300X via ROCm.",
},
{
"layer": "Serving",
"title": "vLLM on ROCm",
"detail": "PagedAttention · continuous batching · OpenAI-compatible API",
"why": "High-throughput multimodal inference for the agent pipeline.",
},
{
"layer": "Model",
"title": "Qwen2-VL-72B (fine-tuned)",
"detail": "LoRA fine-tune on defect-image + work-order pairs via Optimum-AMD",
"why": "Domain-specialized vision reasoning beats zero-shot generic VLMs.",
},
{
"layer": "Agents",
"title": "Inspector → Diagnostician → Action → Reporter",
"detail": "Sequential multi-agent with structured JSON hand-offs",
"why": "Interpretable, auditable pipeline for industrial QC.",
},
{
"layer": "Product",
"title": "ForgeSight Console",
"detail": "React + FastAPI · live transcript · defect feed · build journal",
"why": "End-to-end demonstrable app shipped for the hackathon.",
},
],
"finetune_recipe": {
"base_model": "Qwen/Qwen2-VL-72B-Instruct",
"dataset": "ForgeSight-QC-10K (proprietary defect-image ↔ work-order pairs)",
"method": "QLoRA r=64 · Optimum-AMD · bf16",
"hardware": "1× MI300X node (8 GPUs)",
"expected_wall_clock": "~6h for 3 epochs on 10K pairs",
"serve_with": "vLLM 0.6+ on ROCm",
},
}
@api_router.get("/journal")
async def list_journal():
cursor = db.journal.find({}, {"_id": 0}).sort("created_at", -1)
items = [doc async for doc in cursor]
return {"items": items, "total": len(items)}
@api_router.post("/journal")
async def create_journal(payload: JournalCreate):
try:
social = await generate_social_post(payload.title, payload.body)
except Exception:
logger.exception("Social gen failed; storing without drafts")
social = {"x_post": "", "linkedin_post": ""}
entry = {
"id": str(uuid.uuid4()),
"created_at": _now_iso(),
"title": payload.title,
"body": payload.body,
"tags": payload.tags or [],
"x_post": social.get("x_post", ""),
"linkedin_post": social.get("linkedin_post", ""),
}
await db.journal.insert_one({**entry})
return entry
@api_router.post("/journal/seed")
async def seed_journal():
"""Idempotent seed of initial build-journal entries."""
existing = await db.journal.count_documents({})
if existing > 0:
return {"seeded": 0, "reason": "already seeded"}
seeds = [
{
"title": "Kickoff: ForgeSight on AMD Developer Cloud",
"body": "Spun up an MI300X instance on AMD Developer Cloud. First impression: zero CUDA-lock-in, ROCm + PyTorch just worked. Targeting all three hackathon tracks with one agentic multimodal QC copilot.",
"tags": ["kickoff", "amd", "rocm"],
},
{
"title": "Multi-agent pipeline wired end-to-end",
"body": "Inspector → Diagnostician → Action → Reporter. Each agent produces strict JSON so hand-offs stay auditable. Running on Claude Sonnet 4.5 today, swapping to Qwen2-VL on MI300X next.",
"tags": ["agents", "pipeline", "qwen"],
},
{
"title": "Fine-tune recipe: QLoRA on Qwen2-VL with Optimum-AMD",
"body": "Drafted the LoRA fine-tune path for 10K defect-image ↔ work-order pairs. Expecting ~6h wall-clock on a single MI300X node. vLLM-ROCm will serve the result.",
"tags": ["fine-tuning", "qlora", "optimum-amd"],
},
]
for s in seeds:
try:
social = await generate_social_post(s["title"], s["body"])
except Exception:
social = {"x_post": "", "linkedin_post": ""}
await db.journal.insert_one({
"id": str(uuid.uuid4()),
"created_at": _now_iso(),
**s,
"x_post": social.get("x_post", ""),
"linkedin_post": social.get("linkedin_post", ""),
})
return {"seeded": len(seeds)}
app.include_router(api_router)
app.add_middleware(
CORSMiddleware,
allow_credentials=True,
allow_origins=os.environ.get("CORS_ORIGINS", "*").split(","),
allow_methods=["*"],
allow_headers=["*"],
)
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger("forgesight")
@app.on_event("shutdown")
async def shutdown_db_client():
client.close()
if __name__ == "__main__":
import uvicorn
port = int(os.environ.get("PORT", 8001))
uvicorn.run(app, host="0.0.0.0", port=port)