""" File Manager - Secure file upload, processing, and automatic cleanup for HF Spaces Handles temporary files with automatic deletion after processing """ import os import shutil import tempfile from typing import List, Optional, Tuple, Dict, Any from pathlib import Path from datetime import datetime, timedelta import logging import atexit import threading logger = logging.getLogger(__name__) class FileManager: """ Manage file uploads with automatic cleanup. - Accept file uploads (PDFs, Word, slides, images, etc.) - Track uploaded files - Auto-delete after processing - Clean up on app shutdown - Memory efficient for HF Spaces """ def __init__(self, temp_base_dir: Optional[str] = None, max_age_minutes: int = 60): """ Initialize file manager. Args: temp_base_dir: Base directory for temporary files (uses system temp if None) max_age_minutes: Maximum age of files before auto-cleanup (default 60 minutes) """ self.temp_base_dir = temp_base_dir or os.path.join(tempfile.gettempdir(), "campus_me_uploads") self.max_age = timedelta(minutes=max_age_minutes) self.tracked_files: Dict[str, Dict[str, Any]] = {} self.lock = threading.Lock() # Create temp directory os.makedirs(self.temp_base_dir, exist_ok=True) # Register cleanup on exit atexit.register(self.cleanup_all) logger.info(f"File Manager initialized. Temp directory: {self.temp_base_dir}") def upload_file(self, source_path: str, original_filename: str = "") -> Tuple[bool, str]: """ Upload and track a file for processing. Args: source_path: Path to source file original_filename: Original filename for reference Returns: Tuple of (success: bool, file_id: str or error_message: str) """ try: source_path = Path(source_path) if not source_path.exists(): return False, f"File not found: {source_path}" # Validate file size (limit to 50MB for safety) file_size = source_path.stat().st_size max_size = 50 * 1024 * 1024 # 50MB if file_size > max_size: return False, f"File too large: {file_size / (1024*1024):.1f}MB (max: 50MB)" # Generate unique file ID file_id = self._generate_file_id(source_path) # Copy to temp directory dest_path = self._get_dest_path(file_id, source_path.suffix) shutil.copy2(source_path, dest_path) # Track file with self.lock: self.tracked_files[file_id] = { "source": str(source_path), "destination": str(dest_path), "original_name": original_filename or source_path.name, "file_size": file_size, "upload_time": datetime.now(), "file_type": source_path.suffix, "status": "uploaded", } logger.info(f"File uploaded: {file_id} ({file_size / (1024*1024):.1f}MB)") return True, file_id except Exception as e: logger.error(f"File upload error: {str(e)}") return False, f"Upload error: {str(e)}" def get_file_path(self, file_id: str) -> Optional[str]: """ Get path to uploaded file. Args: file_id: File identifier Returns: Path to file or None if not found """ with self.lock: if file_id in self.tracked_files: file_info = self.tracked_files[file_id] path = file_info.get("destination") if path and os.path.exists(path): return path return None def mark_processed(self, file_id: str, delete_after: int = 0) -> bool: """ Mark file as processed. Args: file_id: File identifier delete_after: Delete file after this many seconds (0 = delete immediately) Returns: Success status """ try: with self.lock: if file_id in self.tracked_files: self.tracked_files[file_id]["status"] = "processed" self.tracked_files[file_id]["processed_time"] = datetime.now() if delete_after <= 0: # Delete immediately return self.delete_file(file_id) else: # Schedule deletion self.tracked_files[file_id]["delete_at"] = ( datetime.now() + timedelta(seconds=delete_after) ) return True return False except Exception as e: logger.error(f"Error marking file as processed: {str(e)}") return False def delete_file(self, file_id: str) -> bool: """ Delete a tracked file. Args: file_id: File identifier Returns: Success status """ try: with self.lock: if file_id in self.tracked_files: file_info = self.tracked_files[file_id] dest_path = file_info.get("destination") if dest_path and os.path.exists(dest_path): os.remove(dest_path) logger.info(f"File deleted: {file_id}") # Remove from tracking del self.tracked_files[file_id] return True return False except Exception as e: logger.error(f"Error deleting file {file_id}: {str(e)}") return False def cleanup_expired_files(self) -> int: """ Clean up files that have expired. Returns: Number of files cleaned up """ cleaned_count = 0 now = datetime.now() with self.lock: expired_files = [] for file_id, file_info in list(self.tracked_files.items()): # Check if file has expired based on age upload_time = file_info.get("upload_time") if upload_time and (now - upload_time) > self.max_age: expired_files.append(file_id) # Check if scheduled for deletion delete_at = file_info.get("delete_at") if delete_at and now >= delete_at: expired_files.append(file_id) # Delete expired files for file_id in expired_files: if self._delete_file_internal(file_id): cleaned_count += 1 if cleaned_count > 0: logger.info(f"Cleaned up {cleaned_count} expired files") return cleaned_count def _delete_file_internal(self, file_id: str) -> bool: """Internal file deletion (without lock).""" try: if file_id in self.tracked_files: file_info = self.tracked_files[file_id] dest_path = file_info.get("destination") if dest_path and os.path.exists(dest_path): os.remove(dest_path) del self.tracked_files[file_id] return True return False except Exception as e: logger.error(f"Error in internal delete: {str(e)}") return False def cleanup_all(self) -> int: """ Clean up all tracked files (usually on app shutdown). Returns: Number of files cleaned up """ cleaned_count = 0 with self.lock: file_ids = list(self.tracked_files.keys()) for file_id in file_ids: if self._delete_file_internal(file_id): cleaned_count += 1 logger.info(f"Total cleanup: {cleaned_count} files removed") return cleaned_count def get_file_info(self, file_id: str) -> Optional[Dict[str, Any]]: """ Get information about a tracked file. Args: file_id: File identifier Returns: File information dictionary or None """ with self.lock: if file_id in self.tracked_files: return self.tracked_files[file_id].copy() return None def get_all_files_info(self) -> List[Dict[str, Any]]: """ Get information about all tracked files. Returns: List of file information dictionaries """ with self.lock: return [info.copy() for info in self.tracked_files.values()] def get_storage_usage(self) -> Dict[str, Any]: """ Get current storage usage statistics. Returns: Storage information """ with self.lock: total_size = sum( info.get("file_size", 0) for info in self.tracked_files.values() ) return { "total_files": len(self.tracked_files), "total_size_mb": total_size / (1024 * 1024), "total_size_bytes": total_size, "files_by_type": self._get_files_by_type(), "files_by_status": self._get_files_by_status(), } def _get_files_by_type(self) -> Dict[str, int]: """Get count of files by type.""" by_type = {} for info in self.tracked_files.values(): file_type = info.get("file_type", "unknown") by_type[file_type] = by_type.get(file_type, 0) + 1 return by_type def _get_files_by_status(self) -> Dict[str, int]: """Get count of files by status.""" by_status = {} for info in self.tracked_files.values(): status = info.get("status", "unknown") by_status[status] = by_status.get(status, 0) + 1 return by_status def _generate_file_id(self, source_path: Path) -> str: """Generate unique file ID.""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") name_hash = hash(str(source_path)) % 100000 return f"file_{timestamp}_{name_hash:05d}" def _get_dest_path(self, file_id: str, file_ext: str) -> str: """Get destination path for file.""" return os.path.join(self.temp_base_dir, f"{file_id}{file_ext}") def batch_upload(self, file_list: List[str]) -> Tuple[List[str], List[str]]: """ Upload multiple files. Args: file_list: List of file paths Returns: Tuple of (successful_ids: List[str], failed_files: List[str]) """ successful_ids = [] failed_files = [] for file_path in file_list: success, result = self.upload_file(file_path) if success: successful_ids.append(result) else: failed_files.append(file_path) logger.info(f"Batch upload: {len(successful_ids)} success, {len(failed_files)} failed") return successful_ids, failed_files def batch_cleanup(self, file_ids: List[str]) -> int: """ Delete multiple files. Args: file_ids: List of file IDs to delete Returns: Number of files deleted """ deleted_count = 0 for file_id in file_ids: if self.delete_file(file_id): deleted_count += 1 return deleted_count def validate_file_type(self, file_path: str, allowed_extensions: List[str] = None) -> bool: """ Validate if file type is allowed. Args: file_path: Path to file allowed_extensions: List of allowed extensions (default: all supported) Returns: True if file type is allowed """ if allowed_extensions is None: allowed_extensions = [ '.pdf', '.docx', '.doc', '.txt', '.md', '.pptx', '.ppt', '.jpg', '.jpeg', '.png', '.gif' ] file_ext = Path(file_path).suffix.lower() return file_ext in allowed_extensions class FileCleanupScheduler: """ Background scheduler for automatic file cleanup. """ def __init__(self, file_manager: FileManager, cleanup_interval_seconds: int = 300): """ Initialize cleanup scheduler. Args: file_manager: FileManager instance cleanup_interval_seconds: How often to run cleanup (default: 5 minutes) """ self.file_manager = file_manager self.cleanup_interval = cleanup_interval_seconds self.running = False self.thread = None def start(self) -> None: """Start the background cleanup scheduler.""" if not self.running: self.running = True self.thread = threading.Thread(daemon=True, target=self._cleanup_loop) self.thread.start() logger.info(f"File cleanup scheduler started (interval: {self.cleanup_interval}s)") def stop(self) -> None: """Stop the background cleanup scheduler.""" self.running = False if self.thread: self.thread.join(timeout=5) logger.info("File cleanup scheduler stopped") def _cleanup_loop(self) -> None: """Background cleanup loop.""" import time while self.running: try: self.file_manager.cleanup_expired_files() except Exception as e: logger.error(f"Cleanup loop error: {str(e)}") time.sleep(self.cleanup_interval)