File size: 7,733 Bytes
ae279de | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 | """File manager with automatic cleanup for uploaded files"""
import os
import time
import threading
from pathlib import Path
from typing import Dict, List
import logging
logger = logging.getLogger(__name__)
class FileManager:
"""Manages uploaded files with automatic cleanup and tracking"""
def __init__(self, upload_folder: str, max_age_seconds: int = 86400):
"""
Initialize FileManager
Args:
upload_folder: Path to folder where files are uploaded
max_age_seconds: Maximum age of files before cleanup (default: 24 hours)
"""
self.upload_folder = upload_folder
self.max_age = max_age_seconds
self.session_files: Dict[str, List[dict]] = {}
self.lock = threading.Lock()
# Create upload folder if it doesn't exist
os.makedirs(upload_folder, exist_ok=True)
# Start cleanup thread
self.cleanup_thread = threading.Thread(
target=self._cleanup_loop,
daemon=True
)
self.cleanup_thread.start()
logger.info(f"FileManager initialized for {upload_folder}")
def register_file(self, session_id: str, filepath: str):
"""
Track a file for a specific session
Args:
session_id: Session that owns the file
filepath: Full path to the file
"""
with self.lock:
if session_id not in self.session_files:
self.session_files[session_id] = []
self.session_files[session_id].append({
'path': filepath,
'created_at': time.time(),
'size_bytes': os.path.getsize(filepath) if os.path.exists(filepath) else 0
})
logger.debug(f"File registered: {filepath} for session {session_id}")
def cleanup_session_files(self, session_id: str) -> int:
"""
Delete all files associated with a session
Args:
session_id: Session whose files to delete
Returns:
Number of files successfully deleted
"""
deleted_count = 0
total_bytes = 0
with self.lock:
if session_id not in self.session_files:
return 0
files = self.session_files[session_id]
for file_data in files:
try:
if os.path.exists(file_data['path']):
total_bytes += file_data['size_bytes']
os.remove(file_data['path'])
deleted_count += 1
logger.info(f"Deleted: {file_data['path']}")
except Exception as e:
logger.error(f"Failed to delete {file_data['path']}: {e}")
del self.session_files[session_id]
if deleted_count > 0:
logger.info(f"Cleaned {deleted_count} files ({total_bytes/1024/1024:.2f} MB) for session {session_id}")
return deleted_count
def cleanup_old_files(self) -> dict:
"""
Remove files older than max_age
Returns:
Dictionary with cleanup statistics
"""
deleted_count = 0
deleted_bytes = 0
current_time = time.time()
try:
if not os.path.exists(self.upload_folder):
return {'deleted': 0, 'freed_bytes': 0}
for filename in os.listdir(self.upload_folder):
filepath = os.path.join(self.upload_folder, filename)
if os.path.isfile(filepath):
try:
file_age = current_time - os.path.getmtime(filepath)
if file_age > self.max_age:
file_size = os.path.getsize(filepath)
os.remove(filepath)
deleted_count += 1
deleted_bytes += file_size
logger.info(f"Deleted old file: {filepath}")
except Exception as e:
logger.error(f"Error processing {filepath}: {e}")
except Exception as e:
logger.error(f"Cleanup old files error: {e}")
if deleted_count > 0:
logger.info(f"Cleanup: Deleted {deleted_count} old files, freed {deleted_bytes/1024/1024:.2f} MB")
return {
'deleted': deleted_count,
'freed_bytes': deleted_bytes
}
def _cleanup_loop(self):
"""Background cleanup thread that runs periodically"""
while True:
try:
time.sleep(600) # Check every 10 minutes
self.cleanup_old_files()
except Exception as e:
logger.error(f"File cleanup loop error: {e}")
def get_disk_usage(self) -> dict:
"""
Get upload folder disk usage statistics
Returns:
Dictionary with file count and size information
"""
total_size = 0
file_count = 0
files_by_age = {'recent': 0, 'day': 0, 'week': 0, 'old': 0}
current_time = time.time()
try:
for filename in os.listdir(self.upload_folder):
filepath = os.path.join(self.upload_folder, filename)
if os.path.isfile(filepath):
file_size = os.path.getsize(filepath)
total_size += file_size
file_count += 1
# Categorize by age
file_age = current_time - os.path.getmtime(filepath)
if file_age < 3600: # 1 hour
files_by_age['recent'] += 1
elif file_age < 86400: # 1 day
files_by_age['day'] += 1
elif file_age < 604800: # 1 week
files_by_age['week'] += 1
else:
files_by_age['old'] += 1
except Exception as e:
logger.error(f"Error calculating disk usage: {e}")
return {
'file_count': file_count,
'total_bytes': total_size,
'total_mb': total_size / (1024 * 1024),
'total_gb': total_size / (1024 * 1024 * 1024),
'files_by_age': files_by_age,
'max_age_seconds': self.max_age,
}
def get_session_files(self, session_id: str) -> list:
"""Get files associated with a session"""
with self.lock:
return self.session_files.get(session_id, [])
def get_file_statistics(self) -> dict:
"""Get statistics about tracked files"""
total_tracked = 0
total_tracked_bytes = 0
tracked_by_session = {}
with self.lock:
for session_id, files in self.session_files.items():
total_tracked += len(files)
session_bytes = sum(f['size_bytes'] for f in files)
total_tracked_bytes += session_bytes
tracked_by_session[session_id] = {
'file_count': len(files),
'total_bytes': session_bytes,
'total_mb': session_bytes / (1024 * 1024)
}
return {
'total_tracked_files': total_tracked,
'total_tracked_bytes': total_tracked_bytes,
'total_tracked_mb': total_tracked_bytes / (1024 * 1024),
'by_session': tracked_by_session
}
|