import os import time import shutil import uvicorn from pathlib import Path from fastapi import FastAPI, File, UploadFile, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, FileResponse import sys sys.path.append(str(Path(__file__).resolve().parent.parent)) from core.detection import PersonDetector from core.tracker import ByteTracker from core.osint import OSINTAudit from config import OUTPUTS_DIR, GALLERY_DIR, API_HOST, API_PORT from api.routes.predictive_exit import router as predictive_exit_router from api.routes.zone_intelligence import router as zone_intelligence_router app = FastAPI( title="PhantomEye API", description="AI-powered surveillance intelligence — Person Re-ID, Behavioral Analytics, OSINT Defense", version="1.0.0", ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) app.include_router(predictive_exit_router) app.include_router(zone_intelligence_router) detector = PersonDetector() osint = OSINTAudit() import cv2 import numpy as np @app.get("/") def root(): return { "system" : "PhantomEye", "version" : "1.0.0", "status" : "online", "modules" : ["detection", "tracking", "analytics", "osint", "predictive_exit", "zone_intelligence"], "author" : "Abu-Sameer-66", } @app.get("/health") def health(): return { "status" : "healthy", "gallery_size" : len(osint.gallery), "timestamp" : time.strftime("%Y-%m-%d %H:%M:%S"), } @app.post("/detect") async def detect_persons(file: UploadFile = File(...)): if not file.filename.lower().endswith((".jpg", ".jpeg", ".png")): raise HTTPException(400, "Only JPG/PNG images supported.") data = await file.read() arr = np.frombuffer(data, np.uint8) image = cv2.imdecode(arr, cv2.IMREAD_COLOR) if image is None: raise HTTPException(400, "Cannot decode image.") detections = detector.detect(image) return JSONResponse({ "status" : "success", "filename" : file.filename, "total_persons" : len(detections), "detections" : [ { "id" : i + 1, "bbox" : list(d["bbox"]), "confidence": d["confidence"], } for i, d in enumerate(detections) ], }) @app.post("/osint/audit") async def osint_audit(file: UploadFile = File(...)): if not file.filename.lower().endswith((".jpg", ".jpeg", ".png")): raise HTTPException(400, "Only JPG/PNG images supported.") data = await file.read() arr = np.frombuffer(data, np.uint8) image = cv2.imdecode(arr, cv2.IMREAD_COLOR) if image is None: raise HTTPException(400, "Cannot decode image.") query_id = Path(file.filename).stem result = osint.audit(image, query_id=query_id) osint.save_report(result) return JSONResponse({ "status": "success", "audit" : result, }) @app.post("/osint/add-to-gallery") async def add_to_gallery( file : UploadFile = File(...), person_id: str = "unknown", ): data = await file.read() arr = np.frombuffer(data, np.uint8) image = cv2.imdecode(arr, cv2.IMREAD_COLOR) if image is None: raise HTTPException(400, "Cannot decode image.") success = osint.add_to_gallery(image, person_id) if not success: raise HTTPException(400, "No face detected in uploaded image.") return JSONResponse({ "status" : "success", "person_id" : person_id, "gallery_size": len(osint.gallery), "message" : f"Person '{person_id}' added to gallery.", }) @app.get("/osint/gallery") def get_gallery(): return JSONResponse({ "status" : "success", "gallery_size": len(osint.gallery), "persons" : list(osint.gallery.keys()), }) @app.post("/track/video") async def track_video(file: UploadFile = File(...)): if not file.filename.lower().endswith((".mp4", ".avi", ".mov")): raise HTTPException(400, "Only MP4/AVI/MOV videos supported.") tmp_path = OUTPUTS_DIR / f"tmp_{int(time.time())}_{file.filename}" OUTPUTS_DIR.mkdir(parents=True, exist_ok=True) with open(tmp_path, "wb") as f: shutil.copyfileobj(file.file, f) cap = cv2.VideoCapture(str(tmp_path)) fps = int(cap.get(cv2.CAP_PROP_FPS)) or 25 total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) cap.release() tracker = ByteTracker() cap = cv2.VideoCapture(str(tmp_path)) frame_data = [] frame_idx = 0 while frame_idx < min(total, fps * 10): ret, frame = cap.read() if not ret: break dets = detector.detect(frame) active = tracker.update(dets) frame_data.append({ "frame" : frame_idx, "active_persons" : len(active), "track_ids" : [t.track_id for t in active], }) frame_idx += 1 cap.release() tmp_path.unlink(missing_ok=True) return JSONResponse({ "status" : "success", "filename" : file.filename, "resolution" : f"{w}x{h}", "fps" : fps, "frames_analyzed" : frame_idx, "unique_persons" : tracker.next_id - 1, "frame_summary" : frame_data[:10], }) @app.get("/outputs") def list_outputs(): OUTPUTS_DIR.mkdir(parents=True, exist_ok=True) files = [ { "name": f.name, "size": f"{f.stat().st_size // 1024} KB", } for f in OUTPUTS_DIR.iterdir() if f.is_file() and not f.name.startswith("tmp_") ] return JSONResponse({ "status": "success", "total" : len(files), "files" : files, }) # ── AUTH ENDPOINTS ────────────────────────────────────────── from api.auth import generate_api_key, validate_api_key, create_access_token, get_api_key_stats from fastapi import Header @app.post("/auth/generate-key") async def api_generate_key(client_name: str, tier: str = "free"): """Generate a new PhantomEye API key.""" if tier not in ["free", "pro"]: return JSONResponse({"error": "tier must be free or pro"}, status_code=400) key = generate_api_key(client_name, tier) return JSONResponse({ "status": "success", "api_key": key, "client": client_name, "tier": tier, "rate_limit": 100 if tier == "free" else 10000, "message": "Store this key securely — it will not be shown again" }) @app.post("/auth/token") async def api_get_token(api_key: str = Header(..., alias="X-API-Key")): """Exchange API key for JWT access token.""" key_data = validate_api_key(api_key) token = create_access_token({"sub": key_data["client"], "tier": key_data["tier"]}) return JSONResponse({ "status": "success", "access_token": token, "token_type": "bearer", "expires_in": "24 hours", "tier": key_data["tier"] }) @app.get("/auth/stats") async def api_key_stats(api_key: str = Header(..., alias="X-API-Key")): """Get API key usage statistics.""" validate_api_key(api_key) return JSONResponse(get_api_key_stats()) @app.get("/auth/validate") async def api_validate_key(api_key: str = Header(..., alias="X-API-Key")): """Validate an API key and return its metadata.""" data = validate_api_key(api_key) return JSONResponse({ "status": "valid", "client": data["client"], "tier": data["tier"], "calls_today": data["calls_today"], "rate_limit": data["rate_limit"], "remaining": data["rate_limit"] - data["calls_today"] }) if __name__ == "__main__": import os port = int(os.environ.get("PORT", 8000)) uvicorn.run( "api.main:app", host=API_HOST, port=port, reload=False, )