Spaces:
Running
Running
| import os | |
| import base64 | |
| import time | |
| import asyncio | |
| import logging | |
| from typing import Dict, Optional | |
| from fastapi import FastAPI, HTTPException, Header, UploadFile, File, BackgroundTasks | |
| from fastapi.responses import JSONResponse, PlainTextResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| import uvicorn | |
| import uuid | |
| # ================= CONFIGURATION ================= | |
| API_KEY = os.getenv("API_KEY") | |
| MAX_QUEUE_SIZE = int(os.getenv("MAX_QUEUE_SIZE", "50")) # Max images in queue | |
| IMAGE_TTL = int(os.getenv("IMAGE_TTL", "600")) # Time-to-live in seconds (10 mins) | |
| CLEANUP_INTERVAL = 60 # Cleanup every 60 seconds | |
| # ================= LOGGING ================= | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(message)s", | |
| handlers=[logging.StreamHandler()] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # ================= APP SETUP ================= | |
| app = FastAPI(title="Image Relay Server") | |
| # CORS for flexibility | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # ================= STATE MANAGEMENT ================= | |
| # Using dict for O(1) lookup and FIFO order (Python 3.7+ preserves insertion order) | |
| MAILBOX: Dict[str, dict] = {} | |
| MAILBOX_LOCK = asyncio.Lock() | |
| ACTIVE_DEVICE_ID: Optional[str] = None | |
| ACTIVE_DEVICE_NAME: Optional[str] = None | |
| DEVICE_LOCK = asyncio.Lock() | |
| # ================= MODELS ================= | |
| class AcknowledgePayload(BaseModel): | |
| id: str | |
| class RegisterDevicePayload(BaseModel): | |
| device_id: str | |
| device_name: str | |
| # ================= BACKGROUND TASKS ================= | |
| async def cleanup_expired_images(): | |
| """Periodically remove images older than TTL""" | |
| while True: | |
| await asyncio.sleep(CLEANUP_INTERVAL) | |
| now = time.time() | |
| async with MAILBOX_LOCK: | |
| expired_ids = [ | |
| img_id for img_id, data in MAILBOX.items() | |
| if now - data["timestamp"] > IMAGE_TTL | |
| ] | |
| for img_id in expired_ids: | |
| del MAILBOX[img_id] | |
| logger.warning(f"[CLEANUP] Removed expired image: {img_id}") | |
| if expired_ids: | |
| logger.info(f"[CLEANUP] Purged {len(expired_ids)} expired images. Queue size: {len(MAILBOX)}") | |
| async def startup_event(): | |
| asyncio.create_task(cleanup_expired_images()) | |
| logger.info("Server started. Cleanup task active.") | |
| # ================= ENDPOINTS ================= | |
| async def home(): | |
| return "Image Relay Space is Running. Use /upload, /fetch, /acknowledge, /device_status, /register_device, or /stats." | |
| async def get_stats(): | |
| """Get server statistics""" | |
| async with MAILBOX_LOCK: | |
| queue_size = len(MAILBOX) | |
| oldest_age = None | |
| if MAILBOX: | |
| oldest_ts = next(iter(MAILBOX.values()))["timestamp"] | |
| oldest_age = time.time() - oldest_ts | |
| return { | |
| "queue_size": queue_size, | |
| "max_queue_size": MAX_QUEUE_SIZE, | |
| "image_ttl_seconds": IMAGE_TTL, | |
| "oldest_image_age_seconds": round(oldest_age, 2) if oldest_age else None, | |
| "active_device": { | |
| "id": ACTIVE_DEVICE_ID, | |
| "name": ACTIVE_DEVICE_NAME | |
| } | |
| } | |
| async def device_status(): | |
| async with DEVICE_LOCK: | |
| if ACTIVE_DEVICE_ID: | |
| return {"status": "ACTIVE", "device_id": ACTIVE_DEVICE_ID, "device_name": ACTIVE_DEVICE_NAME} | |
| return {"status": "NONE"} | |
| async def register_device(payload: RegisterDevicePayload, x_api_key: str = Header(None)): | |
| global ACTIVE_DEVICE_ID, ACTIVE_DEVICE_NAME | |
| if API_KEY and x_api_key != API_KEY: | |
| raise HTTPException(status_code=401, detail="Unauthorized") | |
| async with DEVICE_LOCK: | |
| previous = ACTIVE_DEVICE_NAME | |
| ACTIVE_DEVICE_ID = payload.device_id | |
| ACTIVE_DEVICE_NAME = payload.device_name | |
| logger.info(f"[DEVICE] Active device set to: {payload.device_name} ({payload.device_id})") | |
| msg = f"Device '{payload.device_name}' is now active." | |
| if previous and previous != payload.device_name: | |
| msg += f" Evicted previous device: '{previous}'." | |
| return {"status": "REGISTERED", "message": msg} | |
| async def upload_image(file: UploadFile = File(...), x_api_key: str = Header(None)): | |
| if API_KEY and x_api_key != API_KEY: | |
| logger.warning("[AUTH] Unauthorized upload attempt") | |
| raise HTTPException(status_code=401, detail="Unauthorized") | |
| # Check queue size | |
| async with MAILBOX_LOCK: | |
| if len(MAILBOX) >= MAX_QUEUE_SIZE: | |
| raise HTTPException( | |
| status_code=503, | |
| detail=f"Queue full ({MAX_QUEUE_SIZE} images). Try again later." | |
| ) | |
| logger.info(f"[UPLOAD] Received file: {file.filename}") | |
| try: | |
| image_bytes = await file.read() | |
| encoded_image = base64.b64encode(image_bytes).decode('utf-8') | |
| request_id = str(uuid.uuid4()) | |
| new_entry = { | |
| "image_base64": encoded_image, | |
| "id": request_id, | |
| "timestamp": time.time() | |
| } | |
| async with MAILBOX_LOCK: | |
| MAILBOX[request_id] = new_entry | |
| queue_size = len(MAILBOX) | |
| logger.info(f"[MAILBOX] Image {request_id} added. Queue size: {queue_size}") | |
| return JSONResponse(content={ | |
| "status": "SUCCESS", | |
| "message": f"Image added to buffer. Queue size: {queue_size}", | |
| "id": request_id | |
| }) | |
| except Exception as e: | |
| logger.error(f"[UPLOAD ERROR] {e}") | |
| raise HTTPException(status_code=500, detail="Upload failed") | |
| async def fetch_image(device_id: str = None): | |
| async with DEVICE_LOCK: | |
| if ACTIVE_DEVICE_ID and device_id != ACTIVE_DEVICE_ID: | |
| logger.warning(f"[DEVICE] Fetch rejected for device: {device_id}") | |
| return {"status": "STOPPED", "reason": "Another device is now active."} | |
| async with MAILBOX_LOCK: | |
| if MAILBOX: | |
| # Get oldest image (FIFO) | |
| entry = next(iter(MAILBOX.values())) | |
| return { | |
| "status": "FOUND", | |
| "id": entry["id"], | |
| "image_base64": entry["image_base64"], | |
| "queue_size": len(MAILBOX) | |
| } | |
| return {"status": "EMPTY"} | |
| async def acknowledge(payload: AcknowledgePayload): | |
| async with MAILBOX_LOCK: | |
| if payload.id in MAILBOX: | |
| del MAILBOX[payload.id] | |
| logger.info(f"[ACK] Received confirmation for {payload.id}. Queue size: {len(MAILBOX)}") | |
| return PlainTextResponse("YES RECEIVED") | |
| logger.warning(f"[ACK] Invalid or expired ID: {payload.id}") | |
| return JSONResponse( | |
| status_code=400, | |
| content={"status": "ERROR", "message": "Invalid or expired ID"} | |
| ) | |
| # ================= MAIN ================= | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |