campus-Me / src /ai_engine /file_manager.py
Mithun-999's picture
Add v5.0: Material Upload & Analysis System with Auto-Cleanup
a0da205
"""
File Manager - Secure file upload, processing, and automatic cleanup for HF Spaces
Handles temporary files with automatic deletion after processing
"""
import os
import shutil
import tempfile
from typing import List, Optional, Tuple, Dict, Any
from pathlib import Path
from datetime import datetime, timedelta
import logging
import atexit
import threading
logger = logging.getLogger(__name__)
class FileManager:
"""
Manage file uploads with automatic cleanup.
- Accept file uploads (PDFs, Word, slides, images, etc.)
- Track uploaded files
- Auto-delete after processing
- Clean up on app shutdown
- Memory efficient for HF Spaces
"""
def __init__(self, temp_base_dir: Optional[str] = None, max_age_minutes: int = 60):
"""
Initialize file manager.
Args:
temp_base_dir: Base directory for temporary files (uses system temp if None)
max_age_minutes: Maximum age of files before auto-cleanup (default 60 minutes)
"""
self.temp_base_dir = temp_base_dir or os.path.join(tempfile.gettempdir(), "campus_me_uploads")
self.max_age = timedelta(minutes=max_age_minutes)
self.tracked_files: Dict[str, Dict[str, Any]] = {}
self.lock = threading.Lock()
# Create temp directory
os.makedirs(self.temp_base_dir, exist_ok=True)
# Register cleanup on exit
atexit.register(self.cleanup_all)
logger.info(f"File Manager initialized. Temp directory: {self.temp_base_dir}")
def upload_file(self, source_path: str, original_filename: str = "") -> Tuple[bool, str]:
"""
Upload and track a file for processing.
Args:
source_path: Path to source file
original_filename: Original filename for reference
Returns:
Tuple of (success: bool, file_id: str or error_message: str)
"""
try:
source_path = Path(source_path)
if not source_path.exists():
return False, f"File not found: {source_path}"
# Validate file size (limit to 50MB for safety)
file_size = source_path.stat().st_size
max_size = 50 * 1024 * 1024 # 50MB
if file_size > max_size:
return False, f"File too large: {file_size / (1024*1024):.1f}MB (max: 50MB)"
# Generate unique file ID
file_id = self._generate_file_id(source_path)
# Copy to temp directory
dest_path = self._get_dest_path(file_id, source_path.suffix)
shutil.copy2(source_path, dest_path)
# Track file
with self.lock:
self.tracked_files[file_id] = {
"source": str(source_path),
"destination": str(dest_path),
"original_name": original_filename or source_path.name,
"file_size": file_size,
"upload_time": datetime.now(),
"file_type": source_path.suffix,
"status": "uploaded",
}
logger.info(f"File uploaded: {file_id} ({file_size / (1024*1024):.1f}MB)")
return True, file_id
except Exception as e:
logger.error(f"File upload error: {str(e)}")
return False, f"Upload error: {str(e)}"
def get_file_path(self, file_id: str) -> Optional[str]:
"""
Get path to uploaded file.
Args:
file_id: File identifier
Returns:
Path to file or None if not found
"""
with self.lock:
if file_id in self.tracked_files:
file_info = self.tracked_files[file_id]
path = file_info.get("destination")
if path and os.path.exists(path):
return path
return None
def mark_processed(self, file_id: str, delete_after: int = 0) -> bool:
"""
Mark file as processed.
Args:
file_id: File identifier
delete_after: Delete file after this many seconds (0 = delete immediately)
Returns:
Success status
"""
try:
with self.lock:
if file_id in self.tracked_files:
self.tracked_files[file_id]["status"] = "processed"
self.tracked_files[file_id]["processed_time"] = datetime.now()
if delete_after <= 0:
# Delete immediately
return self.delete_file(file_id)
else:
# Schedule deletion
self.tracked_files[file_id]["delete_at"] = (
datetime.now() + timedelta(seconds=delete_after)
)
return True
return False
except Exception as e:
logger.error(f"Error marking file as processed: {str(e)}")
return False
def delete_file(self, file_id: str) -> bool:
"""
Delete a tracked file.
Args:
file_id: File identifier
Returns:
Success status
"""
try:
with self.lock:
if file_id in self.tracked_files:
file_info = self.tracked_files[file_id]
dest_path = file_info.get("destination")
if dest_path and os.path.exists(dest_path):
os.remove(dest_path)
logger.info(f"File deleted: {file_id}")
# Remove from tracking
del self.tracked_files[file_id]
return True
return False
except Exception as e:
logger.error(f"Error deleting file {file_id}: {str(e)}")
return False
def cleanup_expired_files(self) -> int:
"""
Clean up files that have expired.
Returns:
Number of files cleaned up
"""
cleaned_count = 0
now = datetime.now()
with self.lock:
expired_files = []
for file_id, file_info in list(self.tracked_files.items()):
# Check if file has expired based on age
upload_time = file_info.get("upload_time")
if upload_time and (now - upload_time) > self.max_age:
expired_files.append(file_id)
# Check if scheduled for deletion
delete_at = file_info.get("delete_at")
if delete_at and now >= delete_at:
expired_files.append(file_id)
# Delete expired files
for file_id in expired_files:
if self._delete_file_internal(file_id):
cleaned_count += 1
if cleaned_count > 0:
logger.info(f"Cleaned up {cleaned_count} expired files")
return cleaned_count
def _delete_file_internal(self, file_id: str) -> bool:
"""Internal file deletion (without lock)."""
try:
if file_id in self.tracked_files:
file_info = self.tracked_files[file_id]
dest_path = file_info.get("destination")
if dest_path and os.path.exists(dest_path):
os.remove(dest_path)
del self.tracked_files[file_id]
return True
return False
except Exception as e:
logger.error(f"Error in internal delete: {str(e)}")
return False
def cleanup_all(self) -> int:
"""
Clean up all tracked files (usually on app shutdown).
Returns:
Number of files cleaned up
"""
cleaned_count = 0
with self.lock:
file_ids = list(self.tracked_files.keys())
for file_id in file_ids:
if self._delete_file_internal(file_id):
cleaned_count += 1
logger.info(f"Total cleanup: {cleaned_count} files removed")
return cleaned_count
def get_file_info(self, file_id: str) -> Optional[Dict[str, Any]]:
"""
Get information about a tracked file.
Args:
file_id: File identifier
Returns:
File information dictionary or None
"""
with self.lock:
if file_id in self.tracked_files:
return self.tracked_files[file_id].copy()
return None
def get_all_files_info(self) -> List[Dict[str, Any]]:
"""
Get information about all tracked files.
Returns:
List of file information dictionaries
"""
with self.lock:
return [info.copy() for info in self.tracked_files.values()]
def get_storage_usage(self) -> Dict[str, Any]:
"""
Get current storage usage statistics.
Returns:
Storage information
"""
with self.lock:
total_size = sum(
info.get("file_size", 0) for info in self.tracked_files.values()
)
return {
"total_files": len(self.tracked_files),
"total_size_mb": total_size / (1024 * 1024),
"total_size_bytes": total_size,
"files_by_type": self._get_files_by_type(),
"files_by_status": self._get_files_by_status(),
}
def _get_files_by_type(self) -> Dict[str, int]:
"""Get count of files by type."""
by_type = {}
for info in self.tracked_files.values():
file_type = info.get("file_type", "unknown")
by_type[file_type] = by_type.get(file_type, 0) + 1
return by_type
def _get_files_by_status(self) -> Dict[str, int]:
"""Get count of files by status."""
by_status = {}
for info in self.tracked_files.values():
status = info.get("status", "unknown")
by_status[status] = by_status.get(status, 0) + 1
return by_status
def _generate_file_id(self, source_path: Path) -> str:
"""Generate unique file ID."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
name_hash = hash(str(source_path)) % 100000
return f"file_{timestamp}_{name_hash:05d}"
def _get_dest_path(self, file_id: str, file_ext: str) -> str:
"""Get destination path for file."""
return os.path.join(self.temp_base_dir, f"{file_id}{file_ext}")
def batch_upload(self, file_list: List[str]) -> Tuple[List[str], List[str]]:
"""
Upload multiple files.
Args:
file_list: List of file paths
Returns:
Tuple of (successful_ids: List[str], failed_files: List[str])
"""
successful_ids = []
failed_files = []
for file_path in file_list:
success, result = self.upload_file(file_path)
if success:
successful_ids.append(result)
else:
failed_files.append(file_path)
logger.info(f"Batch upload: {len(successful_ids)} success, {len(failed_files)} failed")
return successful_ids, failed_files
def batch_cleanup(self, file_ids: List[str]) -> int:
"""
Delete multiple files.
Args:
file_ids: List of file IDs to delete
Returns:
Number of files deleted
"""
deleted_count = 0
for file_id in file_ids:
if self.delete_file(file_id):
deleted_count += 1
return deleted_count
def validate_file_type(self, file_path: str, allowed_extensions: List[str] = None) -> bool:
"""
Validate if file type is allowed.
Args:
file_path: Path to file
allowed_extensions: List of allowed extensions (default: all supported)
Returns:
True if file type is allowed
"""
if allowed_extensions is None:
allowed_extensions = [
'.pdf', '.docx', '.doc', '.txt', '.md',
'.pptx', '.ppt', '.jpg', '.jpeg', '.png', '.gif'
]
file_ext = Path(file_path).suffix.lower()
return file_ext in allowed_extensions
class FileCleanupScheduler:
"""
Background scheduler for automatic file cleanup.
"""
def __init__(self, file_manager: FileManager, cleanup_interval_seconds: int = 300):
"""
Initialize cleanup scheduler.
Args:
file_manager: FileManager instance
cleanup_interval_seconds: How often to run cleanup (default: 5 minutes)
"""
self.file_manager = file_manager
self.cleanup_interval = cleanup_interval_seconds
self.running = False
self.thread = None
def start(self) -> None:
"""Start the background cleanup scheduler."""
if not self.running:
self.running = True
self.thread = threading.Thread(daemon=True, target=self._cleanup_loop)
self.thread.start()
logger.info(f"File cleanup scheduler started (interval: {self.cleanup_interval}s)")
def stop(self) -> None:
"""Stop the background cleanup scheduler."""
self.running = False
if self.thread:
self.thread.join(timeout=5)
logger.info("File cleanup scheduler stopped")
def _cleanup_loop(self) -> None:
"""Background cleanup loop."""
import time
while self.running:
try:
self.file_manager.cleanup_expired_files()
except Exception as e:
logger.error(f"Cleanup loop error: {str(e)}")
time.sleep(self.cleanup_interval)