from fastapi import FastAPI from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware from ultralytics import YOLO from typing import Dict # from core.defect_detection import * from core.defect_detection_image import * from utils import * import uvicorn, asyncio # ============================================================ # APP SETUP # ============================================================ app = FastAPI( title="AI Engine Dummy", version="1.0.0", description=""" ## 🧠 AI Engine Dummy API API simulasi integrasi **AI Engine** untuk deteksi defect pada sistem monitoring. --- ### 🔹 Endpoint Utama - `/start-detection` → Memulai simulasi deteksi untuk beberapa kamera. ### 🔹 Webhook - Gunakan https://webhook.site/ untuk menerima hasil deteksi. - Pastikan mengisi `webhook_url` pada payload request. ### 🔹 Simulasi - Tiap kamera akan melakukan deteksi selama max 5 (Sesuai Waktu Timeout) detik. - Jika ditemukan defect, hasil langsung dikirim ke webhook dan semua kamera berhenti. - Jika semua kamera tidak menemukan defect setelah 5 (Sesuai Waktu Timeout) detik → status "OK" dikirim satu kali. """, ) # ============================================================ # CORS CONFIG (Hanya port 8899) # ============================================================ allowed_origins = [ "*", # "http://localhost:8899", # "http://127.0.0.1:8899", # "http://0.0.0.0:8899", ] app.add_middleware( CORSMiddleware, allow_origins=allowed_origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ============================================================ # ROUTES # ============================================================ @app.get("/") def read_root(): return {"message": "Defect Detection API is running."} @app.post("/start-detection") async def start_detection(data: Dict): station_id = data.get("station_id") parts = data.get("parts") webhook_url = data.get("webhook_url") cameras = data.get("cameras", []) # ------------------------------- # BASIC VALIDATION # ------------------------------- if not station_id or not parts or not webhook_url or not cameras: return JSONResponse( status_code=400, content={ "status": "error", "station_id": station_id, "camera_count": len(cameras), "message": "Missing required fields" } ) # ------------------------------- # VALIDATION BEFORE EXECUTION # ------------------------------- required_parts_fields = ["id", "pin_api", "name", "sku"] validation_errors = validate_input(required_parts_fields, station_id, cameras, parts, webhook_url) if validation_errors: logger.error("[VALIDATION FAILED] Input data is invalid.") for err in validation_errors: logger.error(f" - {err}") return JSONResponse( status_code=400, content={ "status": "error", "station_id": station_id, "camera_count": len(cameras), "message": " | ".join(validation_errors) } ) logger.info(f"[INFO] Get metadata parts") model_path = model_by_id_metadata(parts['id']) logger.info(f"[INFO] Checking model_path") model = model_path # ===================================================== # BASE64 IMAGE DETECTION (NOT VIDEO STREAM) # ===================================================== logger.info(f"[START] Station {station_id} → {len(cameras)} camera(s) with base64 images") # Jalankan detection di background asyncio.create_task( run_detection_group(station_id, cameras, webhook_url, model, parts) ) return JSONResponse( status_code=200, content={ "status": "started", "station_id": station_id, "camera_count": len(cameras), "message": "Base64 image detection is running in background." } ) # @app.post("/start-detection") # live stream # async def start_detection(data: Dict): # station_id = data.get("station_id") # parts = data.get("parts") # webhook_url = data.get("webhook_url") # cameras = data.get("cameras", []) # if not station_id or not parts or not webhook_url or not cameras: # return JSONResponse( # status_code=400, # content={ # "status": "error", # "station_id": station_id, # "camera_count": len(cameras), # "message": "Missing required fields" # } # ) # # ------------------------------- # # VALIDATION BEFORE EXECUTION # # ------------------------------- # required_parts_fields = [ # "id", # "pin_api", # "name", # "sku" # ] # validation_errors = validate_input(required_parts_fields, station_id, cameras, parts, webhook_url) # if validation_errors: # logger.error("[VALIDATION FAILED] Input data is invalid.") # for err in validation_errors: # logger.error(f" - {err}") # return JSONResponse( # status_code=400, # content={ # "status": "error", # "station_id": station_id, # "camera_count": len(cameras), # "message": " | ".join(validation_errors) # } # ) # logger.info(f"[INFO] Get metadata parts") # model_path = model_by_id_metadata(parts['id']) # logger.info(f"[INFO] Checking model_path") # if isinstance(model_path, str): # if not os.path.exists(model_path): # logger.info(f"[INFO] Model file not found") # return {"status": "error", "message": f"Model file not found: {model_path}"} # model = YOLO(model_path) # else: # model = model_path # logger.info(f"[START] Station {station_id} → {len(cameras)} kamera diproses") # # running background # asyncio.create_task(run_detection_group(station_id, cameras, webhook_url, model, parts)) # return JSONResponse( # status_code=200, # content={ # "status": "started", # "station_id": station_id, # "camera_count": len(cameras), # "message": "Detection is running in background." # } # ) # ============================================================ # ENTRY POINT # ============================================================ if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)