backend / app.py
SolarumAsteridion's picture
Update app.py
309ced1 verified
import os
import base64
import time
import asyncio
import logging
from typing import Dict, Optional
from fastapi import FastAPI, HTTPException, Header, UploadFile, File, BackgroundTasks
from fastapi.responses import JSONResponse, PlainTextResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import uvicorn
import uuid
# ================= CONFIGURATION =================
API_KEY = os.getenv("API_KEY")
MAX_QUEUE_SIZE = int(os.getenv("MAX_QUEUE_SIZE", "50")) # Max images in queue
IMAGE_TTL = int(os.getenv("IMAGE_TTL", "600")) # Time-to-live in seconds (10 mins)
CLEANUP_INTERVAL = 60 # Cleanup every 60 seconds
# ================= LOGGING =================
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
# ================= APP SETUP =================
app = FastAPI(title="Image Relay Server")
# CORS for flexibility
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ================= STATE MANAGEMENT =================
# Using dict for O(1) lookup and FIFO order (Python 3.7+ preserves insertion order)
MAILBOX: Dict[str, dict] = {}
MAILBOX_LOCK = asyncio.Lock()
ACTIVE_DEVICE_ID: Optional[str] = None
ACTIVE_DEVICE_NAME: Optional[str] = None
DEVICE_LOCK = asyncio.Lock()
# ================= MODELS =================
class AcknowledgePayload(BaseModel):
id: str
class RegisterDevicePayload(BaseModel):
device_id: str
device_name: str
# ================= BACKGROUND TASKS =================
async def cleanup_expired_images():
"""Periodically remove images older than TTL"""
while True:
await asyncio.sleep(CLEANUP_INTERVAL)
now = time.time()
async with MAILBOX_LOCK:
expired_ids = [
img_id for img_id, data in MAILBOX.items()
if now - data["timestamp"] > IMAGE_TTL
]
for img_id in expired_ids:
del MAILBOX[img_id]
logger.warning(f"[CLEANUP] Removed expired image: {img_id}")
if expired_ids:
logger.info(f"[CLEANUP] Purged {len(expired_ids)} expired images. Queue size: {len(MAILBOX)}")
@app.on_event("startup")
async def startup_event():
asyncio.create_task(cleanup_expired_images())
logger.info("Server started. Cleanup task active.")
# ================= ENDPOINTS =================
@app.get("/", response_class=PlainTextResponse)
async def home():
return "Image Relay Space is Running. Use /upload, /fetch, /acknowledge, /device_status, /register_device, or /stats."
@app.get("/stats")
async def get_stats():
"""Get server statistics"""
async with MAILBOX_LOCK:
queue_size = len(MAILBOX)
oldest_age = None
if MAILBOX:
oldest_ts = next(iter(MAILBOX.values()))["timestamp"]
oldest_age = time.time() - oldest_ts
return {
"queue_size": queue_size,
"max_queue_size": MAX_QUEUE_SIZE,
"image_ttl_seconds": IMAGE_TTL,
"oldest_image_age_seconds": round(oldest_age, 2) if oldest_age else None,
"active_device": {
"id": ACTIVE_DEVICE_ID,
"name": ACTIVE_DEVICE_NAME
}
}
@app.get("/device_status")
async def device_status():
async with DEVICE_LOCK:
if ACTIVE_DEVICE_ID:
return {"status": "ACTIVE", "device_id": ACTIVE_DEVICE_ID, "device_name": ACTIVE_DEVICE_NAME}
return {"status": "NONE"}
@app.post("/register_device")
async def register_device(payload: RegisterDevicePayload, x_api_key: str = Header(None)):
global ACTIVE_DEVICE_ID, ACTIVE_DEVICE_NAME
if API_KEY and x_api_key != API_KEY:
raise HTTPException(status_code=401, detail="Unauthorized")
async with DEVICE_LOCK:
previous = ACTIVE_DEVICE_NAME
ACTIVE_DEVICE_ID = payload.device_id
ACTIVE_DEVICE_NAME = payload.device_name
logger.info(f"[DEVICE] Active device set to: {payload.device_name} ({payload.device_id})")
msg = f"Device '{payload.device_name}' is now active."
if previous and previous != payload.device_name:
msg += f" Evicted previous device: '{previous}'."
return {"status": "REGISTERED", "message": msg}
@app.post("/upload")
async def upload_image(file: UploadFile = File(...), x_api_key: str = Header(None)):
if API_KEY and x_api_key != API_KEY:
logger.warning("[AUTH] Unauthorized upload attempt")
raise HTTPException(status_code=401, detail="Unauthorized")
# Check queue size
async with MAILBOX_LOCK:
if len(MAILBOX) >= MAX_QUEUE_SIZE:
raise HTTPException(
status_code=503,
detail=f"Queue full ({MAX_QUEUE_SIZE} images). Try again later."
)
logger.info(f"[UPLOAD] Received file: {file.filename}")
try:
image_bytes = await file.read()
encoded_image = base64.b64encode(image_bytes).decode('utf-8')
request_id = str(uuid.uuid4())
new_entry = {
"image_base64": encoded_image,
"id": request_id,
"timestamp": time.time()
}
async with MAILBOX_LOCK:
MAILBOX[request_id] = new_entry
queue_size = len(MAILBOX)
logger.info(f"[MAILBOX] Image {request_id} added. Queue size: {queue_size}")
return JSONResponse(content={
"status": "SUCCESS",
"message": f"Image added to buffer. Queue size: {queue_size}",
"id": request_id
})
except Exception as e:
logger.error(f"[UPLOAD ERROR] {e}")
raise HTTPException(status_code=500, detail="Upload failed")
@app.get("/fetch")
async def fetch_image(device_id: str = None):
async with DEVICE_LOCK:
if ACTIVE_DEVICE_ID and device_id != ACTIVE_DEVICE_ID:
logger.warning(f"[DEVICE] Fetch rejected for device: {device_id}")
return {"status": "STOPPED", "reason": "Another device is now active."}
async with MAILBOX_LOCK:
if MAILBOX:
# Get oldest image (FIFO)
entry = next(iter(MAILBOX.values()))
return {
"status": "FOUND",
"id": entry["id"],
"image_base64": entry["image_base64"],
"queue_size": len(MAILBOX)
}
return {"status": "EMPTY"}
@app.post("/acknowledge")
async def acknowledge(payload: AcknowledgePayload):
async with MAILBOX_LOCK:
if payload.id in MAILBOX:
del MAILBOX[payload.id]
logger.info(f"[ACK] Received confirmation for {payload.id}. Queue size: {len(MAILBOX)}")
return PlainTextResponse("YES RECEIVED")
logger.warning(f"[ACK] Invalid or expired ID: {payload.id}")
return JSONResponse(
status_code=400,
content={"status": "ERROR", "message": "Invalid or expired ID"}
)
# ================= MAIN =================
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)