Spaces:
Sleeping
Sleeping
| """ | |
| Storage Management | |
| Handles in-memory storage for images, videos, and SSE clients | |
| """ | |
| from typing import Dict, Any | |
| import asyncio | |
| from datetime import datetime | |
| # In-memory storage | |
| temp_images: Dict[str, Dict[str, Any]] = {} | |
| video_results: Dict[str, Dict[str, Any]] = {} | |
| sse_clients: Dict[str, asyncio.Queue] = {} | |
| def cleanup_old_results(max_age_hours: int = 24): | |
| """ | |
| Clean up old video results | |
| Args: | |
| max_age_hours: Maximum age in hours before cleanup | |
| """ | |
| current_time = datetime.now().timestamp() | |
| to_remove = [] | |
| for task_id, data in video_results.items(): | |
| if current_time - data['timestamp'] > (max_age_hours * 3600): | |
| to_remove.append(task_id) | |
| for task_id in to_remove: | |
| del video_results[task_id] | |
| if to_remove: | |
| print(f"🧹 Cleaned up {len(to_remove)} old video results") | |
| def cleanup_old_files(): | |
| """Clean up all temporary storage on shutdown""" | |
| temp_images.clear() | |
| video_results.clear() | |
| sse_clients.clear() | |
| print("🧹 Cleared all temporary storage") | |