Spaces:
Running
Running
File size: 11,974 Bytes
c001f24 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 |
import sqlite3
import os
from datetime import datetime, timedelta
from rich.console import Console
from rich.table import Table
# --- Configuration ---
DB_PATH = 'database.db'
UPLOAD_FOLDER = 'uploads'
PROCESSED_FOLDER = 'processed'
OUTPUT_FOLDER = 'output'
OLDER_THAN_DAYS = 5
DRY_RUN = True # Set to False to perform actual deletion
# --- Immunity Reasons ---
REASON_PERSISTED = "Persisted"
REASON_NEETPREP = "NeetPrep/JSON"
REASON_CLASSIFIED = "Classified"
REASON_RECENT = "Too Recent"
def get_db_connection():
"""Establishes a connection to the SQLite database."""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def is_classified_session(conn, session_id):
"""Checks if a session contains any classified questions."""
if not session_id:
return False
cursor = conn.cursor()
cursor.execute("""
SELECT 1 FROM questions
WHERE session_id = ? AND subject IS NOT NULL AND chapter IS NOT NULL
LIMIT 1
""", (session_id,))
return cursor.fetchone() is not None
def show_disk_usage_report(console):
"""Calculates and displays a report of disk usage by category."""
console.print("\n[bold cyan]Disk Usage Report[/bold cyan]")
def sizeof_fmt(num, suffix="B"):
"""Formats a size in bytes to a human-readable string."""
for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]:
if abs(num) < 1024.0:
return f"{num:3.1f}{unit}{suffix}"
num /= 1024.0
return f"{num:.1f}Y{suffix}"
# --- Summary Report ---
usage_data = {}
folders_to_scan = {
"Uploaded Originals": UPLOAD_FOLDER,
"Processed Images": PROCESSED_FOLDER,
"Generated PDFs": OUTPUT_FOLDER,
}
for category, folder in folders_to_scan.items():
total_size = 0
file_count = 0
try:
for dirpath, _, filenames in os.walk(folder):
for f in filenames:
fp = os.path.join(dirpath, f)
if not os.path.islink(fp):
try:
total_size += os.path.getsize(fp)
file_count += 1
except FileNotFoundError:
pass
except FileNotFoundError:
pass
usage_data[category] = {"size": total_size, "count": file_count}
summary_table = Table(title="Disk Space Usage by Category")
summary_table.add_column("Category", style="cyan")
summary_table.add_column("File Count", style="magenta", justify="right")
summary_table.add_column("Total Size", style="green", justify="right")
total_size_all = 0
total_count_all = 0
for category, data in usage_data.items():
summary_table.add_row(category, str(data["count"]), sizeof_fmt(data["size"]))
total_size_all += data["size"]
total_count_all += data["count"]
summary_table.add_section()
summary_table.add_row("Total", f"[bold]{total_count_all}[/bold]", f"[bold]{sizeof_fmt(total_size_all)}[/bold]")
console.print(summary_table)
# --- Detailed Breakdown for Uploaded Originals ---
console.print("\n[bold]Breakdown of 'Uploaded Originals':[/bold]")
conn = get_db_connection()
sessions = conn.execute('SELECT id, original_filename FROM sessions').fetchall()
session_sizes = []
with console.status("[cyan]Calculating size per session...[/cyan]"):
for session in sessions:
session_id = session['id']
images = conn.execute("SELECT filename FROM images WHERE session_id = ? AND image_type = 'original'", (session_id,)).fetchall()
total_size = 0
file_count = 0
for img in images:
if not img['filename']: continue
try:
fp = os.path.join(UPLOAD_FOLDER, img['filename'])
if not os.path.islink(fp):
total_size += os.path.getsize(fp)
file_count += 1
except FileNotFoundError:
pass # File may not exist, that's okay
if file_count > 0:
session_sizes.append({
"id": session_id,
"name": session['original_filename'],
"size": total_size,
"count": file_count
})
# Sort sessions by size, descending
session_sizes.sort(key=lambda x: x['size'], reverse=True)
breakdown_table = Table(show_header=True, header_style="bold magenta")
breakdown_table.add_column("Session ID", style="dim", min_width=15)
breakdown_table.add_column("Original Filename", style="cyan", min_width=30)
breakdown_table.add_column("File Count", style="magenta", justify="right")
breakdown_table.add_column("Total Size", style="green", justify="right")
for session_data in session_sizes:
breakdown_table.add_row(
session_data['id'],
session_data['name'],
str(session_data['count']),
sizeof_fmt(session_data['size'])
)
console.print(breakdown_table)
conn.close()
def main():
"""Main function to identify and clean up old data."""
console = Console()
console.print(f"[bold cyan]Starting Cleanup Process...[/bold cyan]")
console.print(f"Mode: [bold {'yellow' if DRY_RUN else 'red'}]{'DRY RUN' if DRY_RUN else 'DELETION ENABLED'}[/]")
console.print(f"Looking for items older than {OLDER_THAN_DAYS} days.")
show_disk_usage_report(console)
conn = get_db_connection()
cutoff_date = datetime.now() - timedelta(days=OLDER_THAN_DAYS)
sessions_to_delete = []
pdfs_to_delete = []
# --- 1. Identify Sessions to Delete ---
all_sessions = conn.execute('SELECT id, created_at, original_filename, persist FROM sessions').fetchall()
with console.status("[cyan]Analyzing sessions...[/cyan]") as status:
for session in all_sessions:
session_id = session['id']
reason = ""
created_at = datetime.fromisoformat(session['created_at'])
if created_at > cutoff_date:
reason = REASON_RECENT
elif session['persist'] == 1:
reason = REASON_PERSISTED
elif session['original_filename'] and ('.json' in session['original_filename'].lower() or 'neetprep' in session['original_filename'].lower()):
reason = REASON_NEETPREP
elif is_classified_session(conn, session_id):
reason = REASON_CLASSIFIED
if not reason:
sessions_to_delete.append(session)
status.update(f"[cyan]Analyzed {len(all_sessions)} sessions. Found {len(sessions_to_delete)} candidates for deletion.[/cyan]")
# --- 2. Identify Generated PDFs to Delete ---
all_pdfs = conn.execute('SELECT id, session_id, filename, created_at, persist, source_filename, notes FROM generated_pdfs').fetchall()
with console.status("[cyan]Analyzing generated PDFs...[/cyan]") as status:
for pdf in all_pdfs:
reason = ""
created_at = datetime.fromisoformat(pdf['created_at'])
if created_at > cutoff_date:
reason = REASON_RECENT
elif pdf['persist'] == 1:
reason = REASON_PERSISTED
elif pdf['source_filename'] and ('.json' in pdf['source_filename'].lower() or 'neetprep' in pdf['source_filename'].lower()):
reason = REASON_NEETPREP
elif pdf['notes'] and 'json upload' in pdf['notes'].lower():
reason = REASON_NEETPREP
elif is_classified_session(conn, pdf['session_id']):
reason = REASON_CLASSIFIED
if not reason:
pdfs_to_delete.append(pdf)
status.update(f"[cyan]Analyzed {len(all_pdfs)} PDFs. Found {len(pdfs_to_delete)} candidates for deletion.[/cyan]")
# --- 3. Display Findings ---
table = Table(title="Items Marked for Deletion", show_header=True, header_style="bold magenta")
table.add_column("Type", style="dim", min_width=10)
table.add_column("ID / Filename", style="cyan", min_width=30)
table.add_column("Created At", style="green", min_width=20)
table.add_column("Age (Days)", style="yellow", min_width=10)
table.add_column("Details", min_width=30)
if not sessions_to_delete and not pdfs_to_delete:
console.print("\n[bold green]No items found to delete. Everything is up to date.[/bold green]")
conn.close()
return
for session in sessions_to_delete:
age = (datetime.now() - datetime.fromisoformat(session['created_at'])).days
table.add_row("Session", session['id'], session['created_at'], str(age), session['original_filename'])
for pdf in pdfs_to_delete:
age = (datetime.now() - datetime.fromisoformat(pdf['created_at'])).days
table.add_row("Generated PDF", pdf['filename'], pdf['created_at'], str(age), f"Source: {pdf['source_filename']}")
console.print(table)
if DRY_RUN:
console.print("\n[bold yellow]This was a DRY RUN. No files or database records were deleted.[/bold yellow]")
console.print("To run the deletion, change the [code]DRY_RUN[/code] flag to [code]False[/code] in the script.")
else:
# --- 4. Perform Deletion ---
console.print("\n[bold red]PERFORMING DELETION...[/bold red]")
# Delete Sessions and associated files
for session in sessions_to_delete:
session_id = session['id']
console.print(f"Deleting session [cyan]{session_id}[/cyan]...")
images_to_delete = conn.execute('SELECT filename, processed_filename FROM images WHERE session_id = ?', (session_id,)).fetchall()
for img in images_to_delete:
if img['filename']:
try:
f_path = os.path.join(UPLOAD_FOLDER, img['filename'])
os.remove(f_path)
console.print(f" - Deleted upload: [dim]{f_path}[/dim]")
except OSError as e:
console.print(f" - [red]Error deleting {f_path}: {e}[/red]")
if img['processed_filename']:
try:
f_path = os.path.join(PROCESSED_FOLDER, img['processed_filename'])
os.remove(f_path)
console.print(f" - Deleted processed: [dim]{f_path}[/dim]")
except OSError as e:
console.print(f" - [red]Error deleting {f_path}: {e}[/red]")
conn.execute('DELETE FROM questions WHERE session_id = ?', (session_id,))
conn.execute('DELETE FROM images WHERE session_id = ?', (session_id,))
conn.execute('DELETE FROM sessions WHERE id = ?', (session_id,))
console.print(f" - Deleted DB records for session {session_id}")
# Delete Generated PDFs and their files
for pdf in pdfs_to_delete:
pdf_id, pdf_filename = pdf['id'], pdf['filename']
console.print(f"Deleting generated PDF [cyan]{pdf_filename}[/cyan]...")
try:
f_path = os.path.join(OUTPUT_FOLDER, pdf_filename)
os.remove(f_path)
console.print(f" - Deleted file: [dim]{f_path}[/dim]")
except OSError as e:
console.print(f" - [red]Error deleting {f_path}: {e}[/red]")
conn.execute('DELETE FROM generated_pdfs WHERE id = ?', (pdf_id,))
console.print(f" - Deleted DB record for PDF {pdf_id}")
conn.commit()
console.print("\n[bold green]Deletion complete.[/bold green]")
conn.close()
if __name__ == "__main__":
main()
|