from fastapi import FastAPI, APIRouter, HTTPException from dotenv import load_dotenv from starlette.middleware.cors import CORSMiddleware from motor.motor_asyncio import AsyncIOMotorClient import os import logging import math import time import uuid from pathlib import Path from pydantic import BaseModel, Field, ConfigDict from typing import List, Optional, Any, Dict from datetime import datetime, timezone from agents import run_pipeline, generate_social_post ROOT_DIR = Path(__file__).parent load_dotenv(ROOT_DIR / ".env") mongo_url = os.environ["MONGO_URL"] client = AsyncIOMotorClient(mongo_url) db = client[os.environ["DB_NAME"]] app = FastAPI(title="ForgeSight API") api_router = APIRouter(prefix="/api") # ------------------------- Models ------------------------- class InspectionCreate(BaseModel): image_base64: str = Field(..., description="Raw base64 (no data URI prefix)") notes: Optional[str] = "" product_spec: Optional[str] = "" source: Optional[str] = "upload" # upload | sample class InspectionSummary(BaseModel): model_config = ConfigDict(extra="ignore") id: str created_at: str verdict: str confidence: float headline: str defect_count: int priority: str source: str class JournalCreate(BaseModel): title: str body: str tags: List[str] = [] class JournalEntry(BaseModel): id: str created_at: str title: str body: str tags: List[str] x_post: str linkedin_post: str # ------------------------- Helpers ------------------------- def _now_iso() -> str: return datetime.now(timezone.utc).isoformat() def _summarize(inspection: Dict[str, Any]) -> Dict[str, Any]: agents = inspection.get("transcript", {}).get("agents", []) inspector = next((a for a in agents if a["role"] == "inspector"), None) reporter = next((a for a in agents if a["role"] == "reporter"), None) action = next((a for a in agents if a["role"] == "action"), None) inspector_out = (inspector or {}).get("output", {}).get("parsed", {}) or {} reporter_out = (reporter or {}).get("output", {}).get("parsed", {}) or {} action_out = (action or {}).get("output", {}).get("parsed", {}) or {} defects = inspector_out.get("defects") or [] return { "id": inspection["id"], "created_at": inspection["created_at"], "verdict": inspector_out.get("verdict", "warn"), "confidence": float(inspector_out.get("confidence", 0.0) or 0.0), "headline": reporter_out.get("headline") or inspector_out.get("observation", "Inspection complete")[:60], "defect_count": len(defects) if isinstance(defects, list) else 0, "priority": action_out.get("priority", "P2"), "source": inspection.get("source", "upload"), } # ------------------------- Routes ------------------------- @api_router.get("/") async def root(): return {"service": "forgesight", "status": "online", "track": "AMD Hackathon — Tracks 1+2+3"} @api_router.post("/inspections") async def create_inspection(payload: InspectionCreate): # Strip potential data URI prefix img_b64 = payload.image_base64 if "," in img_b64 and img_b64.strip().startswith("data:"): img_b64 = img_b64.split(",", 1)[1] try: transcript = await run_pipeline( image_base64=img_b64, notes=payload.notes or "", product_spec=payload.product_spec or "", ) except Exception as e: logger.exception("Agent pipeline failed") raise HTTPException(status_code=500, detail=f"Agent pipeline failed: {str(e)}") inspection = { "id": str(uuid.uuid4()), "created_at": _now_iso(), "notes": payload.notes or "", "product_spec": payload.product_spec or "", "source": payload.source or "upload", "transcript": transcript, } # Do NOT persist image_base64 to keep docs small; store SHA/size if needed doc = {**inspection} await db.inspections.insert_one(doc) return {"id": inspection["id"], "created_at": inspection["created_at"], "transcript": transcript, "summary": _summarize(inspection)} @api_router.get("/inspections") async def list_inspections(limit: int = 50): cursor = db.inspections.find({}, {"_id": 0}).sort("created_at", -1).limit(limit) items = [] async for doc in cursor: items.append(_summarize(doc)) return {"items": items, "total": len(items)} @api_router.get("/inspections/{inspection_id}") async def get_inspection(inspection_id: str): doc = await db.inspections.find_one({"id": inspection_id}, {"_id": 0}) if not doc: raise HTTPException(status_code=404, detail="Inspection not found") return {**doc, "summary": _summarize(doc)} @api_router.get("/metrics") async def metrics(): cursor = db.inspections.find({}, {"_id": 0}) total = 0 verdict_counts = {"pass": 0, "warn": 0, "fail": 0} defect_type_counts: Dict[str, int] = {} confidences: List[float] = [] async for doc in cursor: total += 1 summary = _summarize(doc) v = summary["verdict"] if summary["verdict"] in verdict_counts else "warn" verdict_counts[v] += 1 confidences.append(summary["confidence"]) agents = doc.get("transcript", {}).get("agents", []) inspector = next((a for a in agents if a["role"] == "inspector"), None) defects = ((inspector or {}).get("output", {}).get("parsed", {}) or {}).get("defects") or [] if isinstance(defects, list): for d in defects: if isinstance(d, dict): t = (d.get("type") or "unknown").lower() defect_type_counts[t] = defect_type_counts.get(t, 0) + 1 avg_conf = sum(confidences) / len(confidences) if confidences else 0.0 top_defects = sorted(defect_type_counts.items(), key=lambda x: x[1], reverse=True)[:6] quality_score = 0 if total > 0: quality_score = round(100 * (verdict_counts["pass"] + 0.5 * verdict_counts["warn"]) / total) return { "total_inspections": total, "verdict_counts": verdict_counts, "avg_confidence": round(avg_conf, 3), "top_defects": [{"type": t, "count": c} for t, c in top_defects], "quality_score": quality_score, } @api_router.get("/telemetry") async def telemetry(): """Simulated MI300X telemetry. Labeled as SIMULATED in the UI.""" t = time.time() gpu_util = 62 + 30 * math.sin(t / 4.0) # 32 - 92 vram_gb_total = 192.0 # MI300X HBM3 vram_used = 88 + 20 * math.sin(t / 7.0) tokens_per_sec = 2850 + 450 * math.sin(t / 3.0) power_w = 620 + 80 * math.sin(t / 5.0) temp_c = 58 + 7 * math.sin(t / 6.0) return { "simulated": True, "device": "AMD Instinct MI300X", "gpu_util_pct": round(max(0, min(100, gpu_util)), 1), "vram_used_gb": round(max(0, vram_used), 1), "vram_total_gb": vram_gb_total, "tokens_per_sec": int(max(0, tokens_per_sec)), "power_watts": int(max(0, power_w)), "temp_c": round(max(0, temp_c), 1), "ts": _now_iso(), } @api_router.get("/blueprint") async def blueprint(): return { "stack": [ { "layer": "Hardware", "title": "AMD Instinct MI300X", "detail": "192 GB HBM3 · 5.3 TB/s memory bandwidth · 8× GPU node", "why": "Massive VRAM enables serving 70B-class Qwen-VL models without sharding.", }, { "layer": "Runtime", "title": "ROCm 6.2", "detail": "Open compute runtime · HIP · MIOpen · RCCL", "why": "PyTorch + vLLM run natively on MI300X via ROCm.", }, { "layer": "Serving", "title": "vLLM on ROCm", "detail": "PagedAttention · continuous batching · OpenAI-compatible API", "why": "High-throughput multimodal inference for the agent pipeline.", }, { "layer": "Model", "title": "Qwen2-VL-72B (fine-tuned)", "detail": "LoRA fine-tune on defect-image + work-order pairs via Optimum-AMD", "why": "Domain-specialized vision reasoning beats zero-shot generic VLMs.", }, { "layer": "Agents", "title": "Inspector → Diagnostician → Action → Reporter", "detail": "Sequential multi-agent with structured JSON hand-offs", "why": "Interpretable, auditable pipeline for industrial QC.", }, { "layer": "Product", "title": "ForgeSight Console", "detail": "React + FastAPI · live transcript · defect feed · build journal", "why": "End-to-end demonstrable app shipped for the hackathon.", }, ], "finetune_recipe": { "base_model": "Qwen/Qwen2-VL-72B-Instruct", "dataset": "ForgeSight-QC-10K (proprietary defect-image ↔ work-order pairs)", "method": "QLoRA r=64 · Optimum-AMD · bf16", "hardware": "1× MI300X node (8 GPUs)", "expected_wall_clock": "~6h for 3 epochs on 10K pairs", "serve_with": "vLLM 0.6+ on ROCm", }, } @api_router.get("/journal") async def list_journal(): cursor = db.journal.find({}, {"_id": 0}).sort("created_at", -1) items = [doc async for doc in cursor] return {"items": items, "total": len(items)} @api_router.post("/journal") async def create_journal(payload: JournalCreate): try: social = await generate_social_post(payload.title, payload.body) except Exception: logger.exception("Social gen failed; storing without drafts") social = {"x_post": "", "linkedin_post": ""} entry = { "id": str(uuid.uuid4()), "created_at": _now_iso(), "title": payload.title, "body": payload.body, "tags": payload.tags or [], "x_post": social.get("x_post", ""), "linkedin_post": social.get("linkedin_post", ""), } await db.journal.insert_one({**entry}) return entry @api_router.post("/journal/seed") async def seed_journal(): """Idempotent seed of initial build-journal entries.""" existing = await db.journal.count_documents({}) if existing > 0: return {"seeded": 0, "reason": "already seeded"} seeds = [ { "title": "Kickoff: ForgeSight on AMD Developer Cloud", "body": "Spun up an MI300X instance on AMD Developer Cloud. First impression: zero CUDA-lock-in, ROCm + PyTorch just worked. Targeting all three hackathon tracks with one agentic multimodal QC copilot.", "tags": ["kickoff", "amd", "rocm"], }, { "title": "Multi-agent pipeline wired end-to-end", "body": "Inspector → Diagnostician → Action → Reporter. Each agent produces strict JSON so hand-offs stay auditable. Running on Claude Sonnet 4.5 today, swapping to Qwen2-VL on MI300X next.", "tags": ["agents", "pipeline", "qwen"], }, { "title": "Fine-tune recipe: QLoRA on Qwen2-VL with Optimum-AMD", "body": "Drafted the LoRA fine-tune path for 10K defect-image ↔ work-order pairs. Expecting ~6h wall-clock on a single MI300X node. vLLM-ROCm will serve the result.", "tags": ["fine-tuning", "qlora", "optimum-amd"], }, ] for s in seeds: try: social = await generate_social_post(s["title"], s["body"]) except Exception: social = {"x_post": "", "linkedin_post": ""} await db.journal.insert_one({ "id": str(uuid.uuid4()), "created_at": _now_iso(), **s, "x_post": social.get("x_post", ""), "linkedin_post": social.get("linkedin_post", ""), }) return {"seeded": len(seeds)} app.include_router(api_router) app.add_middleware( CORSMiddleware, allow_credentials=True, allow_origins=os.environ.get("CORS_ORIGINS", "*").split(","), allow_methods=["*"], allow_headers=["*"], ) logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") logger = logging.getLogger("forgesight") @app.on_event("shutdown") async def shutdown_db_client(): client.close() if __name__ == "__main__": import uvicorn port = int(os.environ.get("PORT", 8001)) uvicorn.run(app, host="0.0.0.0", port=port)