Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import shutil | |
| import uvicorn | |
| from pathlib import Path | |
| from fastapi import FastAPI, File, UploadFile, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse, FileResponse | |
| import sys | |
| sys.path.append(str(Path(__file__).resolve().parent.parent)) | |
| from core.detection import PersonDetector | |
| from core.tracker import ByteTracker | |
| from core.osint import OSINTAudit | |
| from config import OUTPUTS_DIR, GALLERY_DIR, API_HOST, API_PORT | |
| from api.routes.predictive_exit import router as predictive_exit_router | |
| from api.routes.zone_intelligence import router as zone_intelligence_router | |
| app = FastAPI( | |
| title="PhantomEye API", | |
| description="AI-powered surveillance intelligence β Person Re-ID, Behavioral Analytics, OSINT Defense", | |
| version="1.0.0", | |
| ) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| app.include_router(predictive_exit_router) | |
| app.include_router(zone_intelligence_router) | |
| detector = PersonDetector() | |
| osint = OSINTAudit() | |
| import cv2 | |
| import numpy as np | |
| def root(): | |
| return { | |
| "system" : "PhantomEye", | |
| "version" : "1.0.0", | |
| "status" : "online", | |
| "modules" : ["detection", "tracking", "analytics", "osint", "predictive_exit", "zone_intelligence"], | |
| "author" : "Abu-Sameer-66", | |
| } | |
| def health(): | |
| return { | |
| "status" : "healthy", | |
| "gallery_size" : len(osint.gallery), | |
| "timestamp" : time.strftime("%Y-%m-%d %H:%M:%S"), | |
| } | |
| async def detect_persons(file: UploadFile = File(...)): | |
| if not file.filename.lower().endswith((".jpg", ".jpeg", ".png")): | |
| raise HTTPException(400, "Only JPG/PNG images supported.") | |
| data = await file.read() | |
| arr = np.frombuffer(data, np.uint8) | |
| image = cv2.imdecode(arr, cv2.IMREAD_COLOR) | |
| if image is None: | |
| raise HTTPException(400, "Cannot decode image.") | |
| detections = detector.detect(image) | |
| return JSONResponse({ | |
| "status" : "success", | |
| "filename" : file.filename, | |
| "total_persons" : len(detections), | |
| "detections" : [ | |
| { | |
| "id" : i + 1, | |
| "bbox" : list(d["bbox"]), | |
| "confidence": d["confidence"], | |
| } | |
| for i, d in enumerate(detections) | |
| ], | |
| }) | |
| async def osint_audit(file: UploadFile = File(...)): | |
| if not file.filename.lower().endswith((".jpg", ".jpeg", ".png")): | |
| raise HTTPException(400, "Only JPG/PNG images supported.") | |
| data = await file.read() | |
| arr = np.frombuffer(data, np.uint8) | |
| image = cv2.imdecode(arr, cv2.IMREAD_COLOR) | |
| if image is None: | |
| raise HTTPException(400, "Cannot decode image.") | |
| query_id = Path(file.filename).stem | |
| result = osint.audit(image, query_id=query_id) | |
| osint.save_report(result) | |
| return JSONResponse({ | |
| "status": "success", | |
| "audit" : result, | |
| }) | |
| async def add_to_gallery( | |
| file : UploadFile = File(...), | |
| person_id: str = "unknown", | |
| ): | |
| data = await file.read() | |
| arr = np.frombuffer(data, np.uint8) | |
| image = cv2.imdecode(arr, cv2.IMREAD_COLOR) | |
| if image is None: | |
| raise HTTPException(400, "Cannot decode image.") | |
| success = osint.add_to_gallery(image, person_id) | |
| if not success: | |
| raise HTTPException(400, "No face detected in uploaded image.") | |
| return JSONResponse({ | |
| "status" : "success", | |
| "person_id" : person_id, | |
| "gallery_size": len(osint.gallery), | |
| "message" : f"Person '{person_id}' added to gallery.", | |
| }) | |
| def get_gallery(): | |
| return JSONResponse({ | |
| "status" : "success", | |
| "gallery_size": len(osint.gallery), | |
| "persons" : list(osint.gallery.keys()), | |
| }) | |
| async def track_video(file: UploadFile = File(...)): | |
| if not file.filename.lower().endswith((".mp4", ".avi", ".mov")): | |
| raise HTTPException(400, "Only MP4/AVI/MOV videos supported.") | |
| tmp_path = OUTPUTS_DIR / f"tmp_{int(time.time())}_{file.filename}" | |
| OUTPUTS_DIR.mkdir(parents=True, exist_ok=True) | |
| with open(tmp_path, "wb") as f: | |
| shutil.copyfileobj(file.file, f) | |
| cap = cv2.VideoCapture(str(tmp_path)) | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) or 25 | |
| total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| cap.release() | |
| tracker = ByteTracker() | |
| cap = cv2.VideoCapture(str(tmp_path)) | |
| frame_data = [] | |
| frame_idx = 0 | |
| while frame_idx < min(total, fps * 10): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| dets = detector.detect(frame) | |
| active = tracker.update(dets) | |
| frame_data.append({ | |
| "frame" : frame_idx, | |
| "active_persons" : len(active), | |
| "track_ids" : [t.track_id for t in active], | |
| }) | |
| frame_idx += 1 | |
| cap.release() | |
| tmp_path.unlink(missing_ok=True) | |
| return JSONResponse({ | |
| "status" : "success", | |
| "filename" : file.filename, | |
| "resolution" : f"{w}x{h}", | |
| "fps" : fps, | |
| "frames_analyzed" : frame_idx, | |
| "unique_persons" : tracker.next_id - 1, | |
| "frame_summary" : frame_data[:10], | |
| }) | |
| def list_outputs(): | |
| OUTPUTS_DIR.mkdir(parents=True, exist_ok=True) | |
| files = [ | |
| { | |
| "name": f.name, | |
| "size": f"{f.stat().st_size // 1024} KB", | |
| } | |
| for f in OUTPUTS_DIR.iterdir() | |
| if f.is_file() and not f.name.startswith("tmp_") | |
| ] | |
| return JSONResponse({ | |
| "status": "success", | |
| "total" : len(files), | |
| "files" : files, | |
| }) | |
| # ββ AUTH ENDPOINTS ββββββββββββββββββββββββββββββββββββββββββ | |
| from api.auth import generate_api_key, validate_api_key, create_access_token, get_api_key_stats | |
| from fastapi import Header | |
| async def api_generate_key(client_name: str, tier: str = "free"): | |
| """Generate a new PhantomEye API key.""" | |
| if tier not in ["free", "pro"]: | |
| return JSONResponse({"error": "tier must be free or pro"}, status_code=400) | |
| key = generate_api_key(client_name, tier) | |
| return JSONResponse({ | |
| "status": "success", | |
| "api_key": key, | |
| "client": client_name, | |
| "tier": tier, | |
| "rate_limit": 100 if tier == "free" else 10000, | |
| "message": "Store this key securely β it will not be shown again" | |
| }) | |
| async def api_get_token(api_key: str = Header(..., alias="X-API-Key")): | |
| """Exchange API key for JWT access token.""" | |
| key_data = validate_api_key(api_key) | |
| token = create_access_token({"sub": key_data["client"], "tier": key_data["tier"]}) | |
| return JSONResponse({ | |
| "status": "success", | |
| "access_token": token, | |
| "token_type": "bearer", | |
| "expires_in": "24 hours", | |
| "tier": key_data["tier"] | |
| }) | |
| async def api_key_stats(api_key: str = Header(..., alias="X-API-Key")): | |
| """Get API key usage statistics.""" | |
| validate_api_key(api_key) | |
| return JSONResponse(get_api_key_stats()) | |
| async def api_validate_key(api_key: str = Header(..., alias="X-API-Key")): | |
| """Validate an API key and return its metadata.""" | |
| data = validate_api_key(api_key) | |
| return JSONResponse({ | |
| "status": "valid", | |
| "client": data["client"], | |
| "tier": data["tier"], | |
| "calls_today": data["calls_today"], | |
| "rate_limit": data["rate_limit"], | |
| "remaining": data["rate_limit"] - data["calls_today"] | |
| }) | |
| if __name__ == "__main__": | |
| import os | |
| port = int(os.environ.get("PORT", 8000)) | |
| uvicorn.run( | |
| "api.main:app", | |
| host=API_HOST, | |
| port=port, | |
| reload=False, | |
| ) |