import os import base64 import time import asyncio import logging from typing import Dict, Optional from fastapi import FastAPI, HTTPException, Header, UploadFile, File, BackgroundTasks from fastapi.responses import JSONResponse, PlainTextResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import uvicorn import uuid # ================= CONFIGURATION ================= API_KEY = os.getenv("API_KEY") MAX_QUEUE_SIZE = int(os.getenv("MAX_QUEUE_SIZE", "50")) # Max images in queue IMAGE_TTL = int(os.getenv("IMAGE_TTL", "600")) # Time-to-live in seconds (10 mins) CLEANUP_INTERVAL = 60 # Cleanup every 60 seconds # ================= LOGGING ================= logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[logging.StreamHandler()] ) logger = logging.getLogger(__name__) # ================= APP SETUP ================= app = FastAPI(title="Image Relay Server") # CORS for flexibility app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ================= STATE MANAGEMENT ================= # Using dict for O(1) lookup and FIFO order (Python 3.7+ preserves insertion order) MAILBOX: Dict[str, dict] = {} MAILBOX_LOCK = asyncio.Lock() ACTIVE_DEVICE_ID: Optional[str] = None ACTIVE_DEVICE_NAME: Optional[str] = None DEVICE_LOCK = asyncio.Lock() # ================= MODELS ================= class AcknowledgePayload(BaseModel): id: str class RegisterDevicePayload(BaseModel): device_id: str device_name: str # ================= BACKGROUND TASKS ================= async def cleanup_expired_images(): """Periodically remove images older than TTL""" while True: await asyncio.sleep(CLEANUP_INTERVAL) now = time.time() async with MAILBOX_LOCK: expired_ids = [ img_id for img_id, data in MAILBOX.items() if now - data["timestamp"] > IMAGE_TTL ] for img_id in expired_ids: del MAILBOX[img_id] logger.warning(f"[CLEANUP] Removed expired image: {img_id}") if expired_ids: logger.info(f"[CLEANUP] Purged {len(expired_ids)} expired images. Queue size: {len(MAILBOX)}") @app.on_event("startup") async def startup_event(): asyncio.create_task(cleanup_expired_images()) logger.info("Server started. Cleanup task active.") # ================= ENDPOINTS ================= @app.get("/", response_class=PlainTextResponse) async def home(): return "Image Relay Space is Running. Use /upload, /fetch, /acknowledge, /device_status, /register_device, or /stats." @app.get("/stats") async def get_stats(): """Get server statistics""" async with MAILBOX_LOCK: queue_size = len(MAILBOX) oldest_age = None if MAILBOX: oldest_ts = next(iter(MAILBOX.values()))["timestamp"] oldest_age = time.time() - oldest_ts return { "queue_size": queue_size, "max_queue_size": MAX_QUEUE_SIZE, "image_ttl_seconds": IMAGE_TTL, "oldest_image_age_seconds": round(oldest_age, 2) if oldest_age else None, "active_device": { "id": ACTIVE_DEVICE_ID, "name": ACTIVE_DEVICE_NAME } } @app.get("/device_status") async def device_status(): async with DEVICE_LOCK: if ACTIVE_DEVICE_ID: return {"status": "ACTIVE", "device_id": ACTIVE_DEVICE_ID, "device_name": ACTIVE_DEVICE_NAME} return {"status": "NONE"} @app.post("/register_device") async def register_device(payload: RegisterDevicePayload, x_api_key: str = Header(None)): global ACTIVE_DEVICE_ID, ACTIVE_DEVICE_NAME if API_KEY and x_api_key != API_KEY: raise HTTPException(status_code=401, detail="Unauthorized") async with DEVICE_LOCK: previous = ACTIVE_DEVICE_NAME ACTIVE_DEVICE_ID = payload.device_id ACTIVE_DEVICE_NAME = payload.device_name logger.info(f"[DEVICE] Active device set to: {payload.device_name} ({payload.device_id})") msg = f"Device '{payload.device_name}' is now active." if previous and previous != payload.device_name: msg += f" Evicted previous device: '{previous}'." return {"status": "REGISTERED", "message": msg} @app.post("/upload") async def upload_image(file: UploadFile = File(...), x_api_key: str = Header(None)): if API_KEY and x_api_key != API_KEY: logger.warning("[AUTH] Unauthorized upload attempt") raise HTTPException(status_code=401, detail="Unauthorized") # Check queue size async with MAILBOX_LOCK: if len(MAILBOX) >= MAX_QUEUE_SIZE: raise HTTPException( status_code=503, detail=f"Queue full ({MAX_QUEUE_SIZE} images). Try again later." ) logger.info(f"[UPLOAD] Received file: {file.filename}") try: image_bytes = await file.read() encoded_image = base64.b64encode(image_bytes).decode('utf-8') request_id = str(uuid.uuid4()) new_entry = { "image_base64": encoded_image, "id": request_id, "timestamp": time.time() } async with MAILBOX_LOCK: MAILBOX[request_id] = new_entry queue_size = len(MAILBOX) logger.info(f"[MAILBOX] Image {request_id} added. Queue size: {queue_size}") return JSONResponse(content={ "status": "SUCCESS", "message": f"Image added to buffer. Queue size: {queue_size}", "id": request_id }) except Exception as e: logger.error(f"[UPLOAD ERROR] {e}") raise HTTPException(status_code=500, detail="Upload failed") @app.get("/fetch") async def fetch_image(device_id: str = None): async with DEVICE_LOCK: if ACTIVE_DEVICE_ID and device_id != ACTIVE_DEVICE_ID: logger.warning(f"[DEVICE] Fetch rejected for device: {device_id}") return {"status": "STOPPED", "reason": "Another device is now active."} async with MAILBOX_LOCK: if MAILBOX: # Get oldest image (FIFO) entry = next(iter(MAILBOX.values())) return { "status": "FOUND", "id": entry["id"], "image_base64": entry["image_base64"], "queue_size": len(MAILBOX) } return {"status": "EMPTY"} @app.post("/acknowledge") async def acknowledge(payload: AcknowledgePayload): async with MAILBOX_LOCK: if payload.id in MAILBOX: del MAILBOX[payload.id] logger.info(f"[ACK] Received confirmation for {payload.id}. Queue size: {len(MAILBOX)}") return PlainTextResponse("YES RECEIVED") logger.warning(f"[ACK] Invalid or expired ID: {payload.id}") return JSONResponse( status_code=400, content={"status": "ERROR", "message": "Invalid or expired ID"} ) # ================= MAIN ================= if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)