cognichat / utils /file_manager.py
HYPERXD
feat: add comprehensive memory leak fixes and upgrade documentation
ae279de
"""File manager with automatic cleanup for uploaded files"""
import os
import time
import threading
from pathlib import Path
from typing import Dict, List
import logging
logger = logging.getLogger(__name__)
class FileManager:
"""Manages uploaded files with automatic cleanup and tracking"""
def __init__(self, upload_folder: str, max_age_seconds: int = 86400):
"""
Initialize FileManager
Args:
upload_folder: Path to folder where files are uploaded
max_age_seconds: Maximum age of files before cleanup (default: 24 hours)
"""
self.upload_folder = upload_folder
self.max_age = max_age_seconds
self.session_files: Dict[str, List[dict]] = {}
self.lock = threading.Lock()
# Create upload folder if it doesn't exist
os.makedirs(upload_folder, exist_ok=True)
# Start cleanup thread
self.cleanup_thread = threading.Thread(
target=self._cleanup_loop,
daemon=True
)
self.cleanup_thread.start()
logger.info(f"FileManager initialized for {upload_folder}")
def register_file(self, session_id: str, filepath: str):
"""
Track a file for a specific session
Args:
session_id: Session that owns the file
filepath: Full path to the file
"""
with self.lock:
if session_id not in self.session_files:
self.session_files[session_id] = []
self.session_files[session_id].append({
'path': filepath,
'created_at': time.time(),
'size_bytes': os.path.getsize(filepath) if os.path.exists(filepath) else 0
})
logger.debug(f"File registered: {filepath} for session {session_id}")
def cleanup_session_files(self, session_id: str) -> int:
"""
Delete all files associated with a session
Args:
session_id: Session whose files to delete
Returns:
Number of files successfully deleted
"""
deleted_count = 0
total_bytes = 0
with self.lock:
if session_id not in self.session_files:
return 0
files = self.session_files[session_id]
for file_data in files:
try:
if os.path.exists(file_data['path']):
total_bytes += file_data['size_bytes']
os.remove(file_data['path'])
deleted_count += 1
logger.info(f"Deleted: {file_data['path']}")
except Exception as e:
logger.error(f"Failed to delete {file_data['path']}: {e}")
del self.session_files[session_id]
if deleted_count > 0:
logger.info(f"Cleaned {deleted_count} files ({total_bytes/1024/1024:.2f} MB) for session {session_id}")
return deleted_count
def cleanup_old_files(self) -> dict:
"""
Remove files older than max_age
Returns:
Dictionary with cleanup statistics
"""
deleted_count = 0
deleted_bytes = 0
current_time = time.time()
try:
if not os.path.exists(self.upload_folder):
return {'deleted': 0, 'freed_bytes': 0}
for filename in os.listdir(self.upload_folder):
filepath = os.path.join(self.upload_folder, filename)
if os.path.isfile(filepath):
try:
file_age = current_time - os.path.getmtime(filepath)
if file_age > self.max_age:
file_size = os.path.getsize(filepath)
os.remove(filepath)
deleted_count += 1
deleted_bytes += file_size
logger.info(f"Deleted old file: {filepath}")
except Exception as e:
logger.error(f"Error processing {filepath}: {e}")
except Exception as e:
logger.error(f"Cleanup old files error: {e}")
if deleted_count > 0:
logger.info(f"Cleanup: Deleted {deleted_count} old files, freed {deleted_bytes/1024/1024:.2f} MB")
return {
'deleted': deleted_count,
'freed_bytes': deleted_bytes
}
def _cleanup_loop(self):
"""Background cleanup thread that runs periodically"""
while True:
try:
time.sleep(600) # Check every 10 minutes
self.cleanup_old_files()
except Exception as e:
logger.error(f"File cleanup loop error: {e}")
def get_disk_usage(self) -> dict:
"""
Get upload folder disk usage statistics
Returns:
Dictionary with file count and size information
"""
total_size = 0
file_count = 0
files_by_age = {'recent': 0, 'day': 0, 'week': 0, 'old': 0}
current_time = time.time()
try:
for filename in os.listdir(self.upload_folder):
filepath = os.path.join(self.upload_folder, filename)
if os.path.isfile(filepath):
file_size = os.path.getsize(filepath)
total_size += file_size
file_count += 1
# Categorize by age
file_age = current_time - os.path.getmtime(filepath)
if file_age < 3600: # 1 hour
files_by_age['recent'] += 1
elif file_age < 86400: # 1 day
files_by_age['day'] += 1
elif file_age < 604800: # 1 week
files_by_age['week'] += 1
else:
files_by_age['old'] += 1
except Exception as e:
logger.error(f"Error calculating disk usage: {e}")
return {
'file_count': file_count,
'total_bytes': total_size,
'total_mb': total_size / (1024 * 1024),
'total_gb': total_size / (1024 * 1024 * 1024),
'files_by_age': files_by_age,
'max_age_seconds': self.max_age,
}
def get_session_files(self, session_id: str) -> list:
"""Get files associated with a session"""
with self.lock:
return self.session_files.get(session_id, [])
def get_file_statistics(self) -> dict:
"""Get statistics about tracked files"""
total_tracked = 0
total_tracked_bytes = 0
tracked_by_session = {}
with self.lock:
for session_id, files in self.session_files.items():
total_tracked += len(files)
session_bytes = sum(f['size_bytes'] for f in files)
total_tracked_bytes += session_bytes
tracked_by_session[session_id] = {
'file_count': len(files),
'total_bytes': session_bytes,
'total_mb': session_bytes / (1024 * 1024)
}
return {
'total_tracked_files': total_tracked,
'total_tracked_bytes': total_tracked_bytes,
'total_tracked_mb': total_tracked_bytes / (1024 * 1024),
'by_session': tracked_by_session
}