"""File manager with automatic cleanup for uploaded files""" import os import time import threading from pathlib import Path from typing import Dict, List import logging logger = logging.getLogger(__name__) class FileManager: """Manages uploaded files with automatic cleanup and tracking""" def __init__(self, upload_folder: str, max_age_seconds: int = 86400): """ Initialize FileManager Args: upload_folder: Path to folder where files are uploaded max_age_seconds: Maximum age of files before cleanup (default: 24 hours) """ self.upload_folder = upload_folder self.max_age = max_age_seconds self.session_files: Dict[str, List[dict]] = {} self.lock = threading.Lock() # Create upload folder if it doesn't exist os.makedirs(upload_folder, exist_ok=True) # Start cleanup thread self.cleanup_thread = threading.Thread( target=self._cleanup_loop, daemon=True ) self.cleanup_thread.start() logger.info(f"FileManager initialized for {upload_folder}") def register_file(self, session_id: str, filepath: str): """ Track a file for a specific session Args: session_id: Session that owns the file filepath: Full path to the file """ with self.lock: if session_id not in self.session_files: self.session_files[session_id] = [] self.session_files[session_id].append({ 'path': filepath, 'created_at': time.time(), 'size_bytes': os.path.getsize(filepath) if os.path.exists(filepath) else 0 }) logger.debug(f"File registered: {filepath} for session {session_id}") def cleanup_session_files(self, session_id: str) -> int: """ Delete all files associated with a session Args: session_id: Session whose files to delete Returns: Number of files successfully deleted """ deleted_count = 0 total_bytes = 0 with self.lock: if session_id not in self.session_files: return 0 files = self.session_files[session_id] for file_data in files: try: if os.path.exists(file_data['path']): total_bytes += file_data['size_bytes'] os.remove(file_data['path']) deleted_count += 1 logger.info(f"Deleted: {file_data['path']}") except Exception as e: logger.error(f"Failed to delete {file_data['path']}: {e}") del self.session_files[session_id] if deleted_count > 0: logger.info(f"Cleaned {deleted_count} files ({total_bytes/1024/1024:.2f} MB) for session {session_id}") return deleted_count def cleanup_old_files(self) -> dict: """ Remove files older than max_age Returns: Dictionary with cleanup statistics """ deleted_count = 0 deleted_bytes = 0 current_time = time.time() try: if not os.path.exists(self.upload_folder): return {'deleted': 0, 'freed_bytes': 0} for filename in os.listdir(self.upload_folder): filepath = os.path.join(self.upload_folder, filename) if os.path.isfile(filepath): try: file_age = current_time - os.path.getmtime(filepath) if file_age > self.max_age: file_size = os.path.getsize(filepath) os.remove(filepath) deleted_count += 1 deleted_bytes += file_size logger.info(f"Deleted old file: {filepath}") except Exception as e: logger.error(f"Error processing {filepath}: {e}") except Exception as e: logger.error(f"Cleanup old files error: {e}") if deleted_count > 0: logger.info(f"Cleanup: Deleted {deleted_count} old files, freed {deleted_bytes/1024/1024:.2f} MB") return { 'deleted': deleted_count, 'freed_bytes': deleted_bytes } def _cleanup_loop(self): """Background cleanup thread that runs periodically""" while True: try: time.sleep(600) # Check every 10 minutes self.cleanup_old_files() except Exception as e: logger.error(f"File cleanup loop error: {e}") def get_disk_usage(self) -> dict: """ Get upload folder disk usage statistics Returns: Dictionary with file count and size information """ total_size = 0 file_count = 0 files_by_age = {'recent': 0, 'day': 0, 'week': 0, 'old': 0} current_time = time.time() try: for filename in os.listdir(self.upload_folder): filepath = os.path.join(self.upload_folder, filename) if os.path.isfile(filepath): file_size = os.path.getsize(filepath) total_size += file_size file_count += 1 # Categorize by age file_age = current_time - os.path.getmtime(filepath) if file_age < 3600: # 1 hour files_by_age['recent'] += 1 elif file_age < 86400: # 1 day files_by_age['day'] += 1 elif file_age < 604800: # 1 week files_by_age['week'] += 1 else: files_by_age['old'] += 1 except Exception as e: logger.error(f"Error calculating disk usage: {e}") return { 'file_count': file_count, 'total_bytes': total_size, 'total_mb': total_size / (1024 * 1024), 'total_gb': total_size / (1024 * 1024 * 1024), 'files_by_age': files_by_age, 'max_age_seconds': self.max_age, } def get_session_files(self, session_id: str) -> list: """Get files associated with a session""" with self.lock: return self.session_files.get(session_id, []) def get_file_statistics(self) -> dict: """Get statistics about tracked files""" total_tracked = 0 total_tracked_bytes = 0 tracked_by_session = {} with self.lock: for session_id, files in self.session_files.items(): total_tracked += len(files) session_bytes = sum(f['size_bytes'] for f in files) total_tracked_bytes += session_bytes tracked_by_session[session_id] = { 'file_count': len(files), 'total_bytes': session_bytes, 'total_mb': session_bytes / (1024 * 1024) } return { 'total_tracked_files': total_tracked, 'total_tracked_bytes': total_tracked_bytes, 'total_tracked_mb': total_tracked_bytes / (1024 * 1024), 'by_session': tracked_by_session }