""" Storage Management Handles in-memory storage for images, videos, and SSE clients """ from typing import Dict, Any import asyncio from datetime import datetime # In-memory storage temp_images: Dict[str, Dict[str, Any]] = {} video_results: Dict[str, Dict[str, Any]] = {} sse_clients: Dict[str, asyncio.Queue] = {} def cleanup_old_results(max_age_hours: int = 24): """ Clean up old video results Args: max_age_hours: Maximum age in hours before cleanup """ current_time = datetime.now().timestamp() to_remove = [] for task_id, data in video_results.items(): if current_time - data['timestamp'] > (max_age_hours * 3600): to_remove.append(task_id) for task_id in to_remove: del video_results[task_id] if to_remove: print(f"🧹 Cleaned up {len(to_remove)} old video results") def cleanup_old_files(): """Clean up all temporary storage on shutdown""" temp_images.clear() video_results.clear() sse_clients.clear() print("🧹 Cleared all temporary storage")