sushilideaclan01's picture
first push
91d209c
"""
Storage Management
Handles in-memory storage for images, videos, and SSE clients
"""
from typing import Dict, Any
import asyncio
from datetime import datetime
# In-memory storage
temp_images: Dict[str, Dict[str, Any]] = {}
video_results: Dict[str, Dict[str, Any]] = {}
sse_clients: Dict[str, asyncio.Queue] = {}
def cleanup_old_results(max_age_hours: int = 24):
"""
Clean up old video results
Args:
max_age_hours: Maximum age in hours before cleanup
"""
current_time = datetime.now().timestamp()
to_remove = []
for task_id, data in video_results.items():
if current_time - data['timestamp'] > (max_age_hours * 3600):
to_remove.append(task_id)
for task_id in to_remove:
del video_results[task_id]
if to_remove:
print(f"🧹 Cleaned up {len(to_remove)} old video results")
def cleanup_old_files():
"""Clean up all temporary storage on shutdown"""
temp_images.clear()
video_results.clear()
sse_clients.clear()
print("🧹 Cleared all temporary storage")