import os import json from fastapi import FastAPI, HTTPException from pydantic import BaseModel import psycopg2 import redis from datetime import datetime from dotenv import load_dotenv load_dotenv() DATABASE_URL = os.getenv("DATABASE_URL") REDIS_URL = os.getenv("REDIS_URL") app = FastAPI() r = redis.from_url(REDIS_URL) def get_conn(): return psycopg2.connect(DATABASE_URL) class Detection(BaseModel): namespace: str type: str timestamp: str = None location: dict severity: int = 1 images: list = [] provenance: dict = {} @app.post("/detect") def detect(d: Detection): ts = d.timestamp or datetime.utcnow().isoformat() try: conn = get_conn() cur = conn.cursor() cur.execute( "INSERT INTO objects (namespace,type,timestamp,location,severity,images,provenance) VALUES (%s,%s,%s,%s,%s,%s,%s) RETURNING id", (d.namespace, d.type, ts, json.dumps(d.location), d.severity, json.dumps(d.images), json.dumps(d.provenance)) ) new_id = cur.fetchone()[0] conn.commit() cur.close() conn.close() # publish event ev = {"id": str(new_id), "namespace": d.namespace, "type": d.type, "location": d.location, "severity": d.severity} r.publish("infra:faults:created", json.dumps(ev)) return {"id": str(new_id)} except Exception as e: raise HTTPException(status_code=500, detail=str(e))