aayushprsingh commited on
Commit
36bf090
·
1 Parent(s): d2b3700

chore(api): add auto-cleanup job for old vector collections (#99)

Browse files
backend/app/database.py CHANGED
@@ -44,14 +44,29 @@ def _migrate_schema():
44
  for non-destructive changes such as new nullable columns.
45
  """
46
  inspector = inspect(engine)
47
- existing_columns = {c["name"] for c in inspector.get_columns("users")}
48
-
49
- migrations = [
50
  ("users", "hf_token", "ALTER TABLE users ADD COLUMN hf_token VARCHAR(255)"),
51
  ]
 
 
 
 
 
 
 
 
 
 
52
 
53
- for table, column, ddl in migrations:
54
- if column not in existing_columns:
 
 
 
 
 
55
  try:
56
  with engine.begin() as conn:
57
  conn.execute(text(ddl))
 
44
  for non-destructive changes such as new nullable columns.
45
  """
46
  inspector = inspect(engine)
47
+ # Migrate users
48
+ existing_users_columns = {c["name"] for c in inspector.get_columns("users")}
49
+ users_migrations = [
50
  ("users", "hf_token", "ALTER TABLE users ADD COLUMN hf_token VARCHAR(255)"),
51
  ]
52
+ for table, column, ddl in users_migrations:
53
+ if column not in existing_users_columns:
54
+ try:
55
+ with engine.begin() as conn:
56
+ conn.execute(text(ddl))
57
+ logger.info("Migration: added column %s.%s", table, column)
58
+ except Exception:
59
+ logger.warning(
60
+ "Migration skipped (may already exist): %s.%s", table, column
61
+ )
62
 
63
+ # Migrate documents
64
+ existing_docs_columns = {c["name"] for c in inspector.get_columns("documents")}
65
+ docs_migrations = [
66
+ ("documents", "last_accessed_at", "ALTER TABLE documents ADD COLUMN last_accessed_at TIMESTAMP"),
67
+ ]
68
+ for table, column, ddl in docs_migrations:
69
+ if column not in existing_docs_columns:
70
  try:
71
  with engine.begin() as conn:
72
  conn.execute(text(ddl))
backend/app/main.py CHANGED
@@ -31,6 +31,63 @@ logger = logging.getLogger(__name__)
31
  settings = get_settings()
32
 
33
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
34
  @asynccontextmanager
35
  async def lifespan(app: FastAPI):
36
  """Application startup/shutdown lifecycle."""
@@ -53,10 +110,21 @@ async def lifespan(app: FastAPI):
53
  except Exception as e:
54
  logger.warning(f"Failed to pre-load embedding model: {e}")
55
 
 
 
 
 
56
  yield
57
 
58
  # ── Shutdown ─────────────────────────────────────
59
  logger.info("Shutting down")
 
 
 
 
 
 
 
60
 
61
 
62
  # ── Create App ───────────────────────────────────────
 
31
  settings = get_settings()
32
 
33
 
34
+ async def document_cleanup_job():
35
+ """Background loop to periodically purge documents not accessed in 30 days."""
36
+ import asyncio
37
+ from datetime import datetime, timedelta, timezone
38
+ logger.info("Starting document cleanup background job loop")
39
+ while True:
40
+ try:
41
+ from app.database import SessionLocal
42
+ from app.models import Document
43
+ from app.rag.vectorstore import delete_document_chunks
44
+ from sqlalchemy import or_
45
+
46
+ db = SessionLocal()
47
+ try:
48
+ cutoff = datetime.now(timezone.utc) - timedelta(days=30)
49
+ expired_docs = db.query(Document).filter(
50
+ or_(
51
+ Document.last_accessed_at < cutoff,
52
+ Document.last_accessed_at.is_(None) & (Document.uploaded_at < cutoff)
53
+ )
54
+ ).all()
55
+
56
+ for doc in expired_docs:
57
+ logger.info(f"Auto-cleanup: Purging document {doc.id} ('{doc.original_name}') due to inactivity since {doc.last_accessed_at or doc.uploaded_at}")
58
+
59
+ # Delete physical file
60
+ filepath = os.path.join(settings.UPLOAD_DIR, doc.user_id, doc.filename)
61
+ if os.path.exists(filepath):
62
+ try:
63
+ os.remove(filepath)
64
+ except Exception as e:
65
+ logger.warning(f"Auto-cleanup: Failed to delete physical file {filepath}: {e}")
66
+
67
+ # Delete vectors
68
+ try:
69
+ delete_document_chunks(document_id=doc.id, user_id=doc.user_id)
70
+ except Exception as e:
71
+ logger.warning(f"Auto-cleanup: Error deleting vectors for document {doc.id}: {e}")
72
+
73
+ # Delete database record
74
+ db.delete(doc)
75
+
76
+ db.commit()
77
+ if expired_docs:
78
+ logger.info(f"Auto-cleanup: Purged {len(expired_docs)} documents.")
79
+ except Exception as exc:
80
+ logger.error(f"Auto-cleanup job encountered error: {exc}", exc_info=True)
81
+ finally:
82
+ db.close()
83
+
84
+ except Exception as e:
85
+ logger.error(f"Error in document cleanup background loop: {e}", exc_info=True)
86
+
87
+ # Run every 24 hours (86400 seconds)
88
+ await asyncio.sleep(86400)
89
+
90
+
91
  @asynccontextmanager
92
  async def lifespan(app: FastAPI):
93
  """Application startup/shutdown lifecycle."""
 
110
  except Exception as e:
111
  logger.warning(f"Failed to pre-load embedding model: {e}")
112
 
113
+ # Start background cleanup task
114
+ import asyncio
115
+ cleanup_task = asyncio.create_task(document_cleanup_job())
116
+
117
  yield
118
 
119
  # ── Shutdown ─────────────────────────────────────
120
  logger.info("Shutting down")
121
+ cleanup_task.cancel()
122
+ try:
123
+ await cleanup_task
124
+ except asyncio.CancelledError:
125
+ pass
126
+ except Exception as e:
127
+ logger.warning(f"Error cancelling cleanup task: {e}")
128
 
129
 
130
  # ── Create App ───────────────────────────────────────
backend/app/models.py CHANGED
@@ -90,6 +90,7 @@ class Document(Base):
90
  status = Column(String(20), default="pending") # pending | processing | ready | failed
91
  error_message = Column(Text, nullable=True)
92
  uploaded_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
 
93
  summary = Column(Text, nullable=True) # Optional summary of the document's content
94
 
95
  # Relationships
 
90
  status = Column(String(20), default="pending") # pending | processing | ready | failed
91
  error_message = Column(Text, nullable=True)
92
  uploaded_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
93
+ last_accessed_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), nullable=True)
94
  summary = Column(Text, nullable=True) # Optional summary of the document's content
95
 
96
  # Relationships
backend/app/routes/chat.py CHANGED
@@ -146,6 +146,10 @@ def ask_question(
146
  status_code=400,
147
  detail=f"Document is still {doc.status}. Please wait for processing to complete.",
148
  )
 
 
 
 
149
 
150
  result = generate_answer(
151
  question=payload.question,
@@ -224,6 +228,10 @@ def ask_question_stream(
224
  status_code=400,
225
  detail=f"Document is still {doc.status}. Please wait for processing to complete.",
226
  )
 
 
 
 
227
 
228
  started_at = time.perf_counter()
229
 
 
146
  status_code=400,
147
  detail=f"Document is still {doc.status}. Please wait for processing to complete.",
148
  )
149
+
150
+ # Update last_accessed_at timestamp
151
+ doc.last_accessed_at = datetime.now(timezone.utc)
152
+ db.commit()
153
 
154
  result = generate_answer(
155
  question=payload.question,
 
228
  status_code=400,
229
  detail=f"Document is still {doc.status}. Please wait for processing to complete.",
230
  )
231
+
232
+ # Update last_accessed_at timestamp
233
+ doc.last_accessed_at = datetime.now(timezone.utc)
234
+ db.commit()
235
 
236
  started_at = time.perf_counter()
237
 
backend/scripts/document_cleanup.py ADDED
@@ -0,0 +1,137 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Cleanup script for ChromaDB vectors and records of documents inactive for 30 days.
3
+
4
+ By default, a document is considered inactive if it has not been accessed (last_accessed_at)
5
+ or uploaded (if last_accessed_at is missing) for 30 days.
6
+
7
+ Run manually:
8
+ python backend/scripts/document_cleanup.py
9
+
10
+ Environment:
11
+ DOCUMENT_CLEANUP_INACTIVE_DAYS=30
12
+ DOCUMENT_CLEANUP_DRY_RUN=true
13
+ """
14
+ from __future__ import annotations
15
+
16
+ import logging
17
+ import os
18
+ import sys
19
+ from datetime import datetime, timedelta, timezone
20
+ from pathlib import Path
21
+
22
+ from sqlalchemy import or_, inspect, text
23
+
24
+ # Allow running this file directly from the repository root.
25
+ BACKEND_DIR = Path(__file__).resolve().parents[1]
26
+ if str(BACKEND_DIR) not in sys.path:
27
+ sys.path.insert(0, str(BACKEND_DIR))
28
+
29
+ from app.database import SessionLocal # noqa: E402
30
+ from app.models import Document # noqa: E402
31
+ from app.rag.vectorstore import delete_document_chunks # noqa: E402
32
+ from app.config import get_settings # noqa: E402
33
+
34
+ logger = logging.getLogger("document_cleanup")
35
+ logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
36
+
37
+ settings = get_settings()
38
+
39
+ def _env_bool(name: str, default: bool = False) -> bool:
40
+ value = os.getenv(name)
41
+ if value is None:
42
+ return default
43
+ return value.strip().lower() in {"1", "true", "yes", "on"}
44
+
45
+
46
+ def ensure_last_accessed_at_column() -> None:
47
+ """Ensure last_accessed_at column exists in database (handles SQLite local installs)."""
48
+ db = SessionLocal()
49
+ try:
50
+ bind = db.get_bind()
51
+ inspector = inspect(bind)
52
+ columns = {column["name"] for column in inspector.get_columns("documents")}
53
+ if "last_accessed_at" not in columns:
54
+ logger.info("Adding missing documents.last_accessed_at column")
55
+ db.execute(text("ALTER TABLE documents ADD COLUMN last_accessed_at TIMESTAMP"))
56
+ db.commit()
57
+ finally:
58
+ db.close()
59
+
60
+
61
+ def cleanup_inactive_documents(
62
+ inactive_days: int | None = None,
63
+ dry_run: bool | None = None,
64
+ ) -> dict[str, int]:
65
+ """Delete database records, physical files, and Chroma collections for inactive documents."""
66
+ ensure_last_accessed_at_column()
67
+
68
+ days = inactive_days or int(os.getenv("DOCUMENT_CLEANUP_INACTIVE_DAYS", "30"))
69
+ is_dry_run = _env_bool("DOCUMENT_CLEANUP_DRY_RUN", False) if dry_run is None else dry_run
70
+ cutoff = datetime.now(timezone.utc) - timedelta(days=days)
71
+
72
+ stats = {
73
+ "scanned": 0,
74
+ "eligible": 0,
75
+ "deleted": 0,
76
+ "failed": 0,
77
+ }
78
+
79
+ db = SessionLocal()
80
+ try:
81
+ # Select documents whose last_accessed_at or uploaded_at is before cutoff
82
+ docs = db.query(Document).filter(
83
+ or_(
84
+ Document.last_accessed_at < cutoff,
85
+ Document.last_accessed_at.is_(None) & (Document.uploaded_at < cutoff)
86
+ )
87
+ ).all()
88
+
89
+ for doc in docs:
90
+ stats["scanned"] += 1
91
+ last_activity = doc.last_accessed_at or doc.uploaded_at
92
+
93
+ stats["eligible"] += 1
94
+ logger.info(
95
+ "Document %s ('%s') inactive since %s; purging dry_run=%s",
96
+ doc.id,
97
+ doc.original_name,
98
+ last_activity,
99
+ is_dry_run,
100
+ )
101
+
102
+ if is_dry_run:
103
+ continue
104
+
105
+ try:
106
+ # 1. Delete file from disk
107
+ filepath = os.path.join(settings.UPLOAD_DIR, doc.user_id, doc.filename)
108
+ if os.path.exists(filepath):
109
+ os.remove(filepath)
110
+ logger.info("Deleted physical file: %s", filepath)
111
+
112
+ # 2. Delete vectors from ChromaDB
113
+ delete_document_chunks(document_id=doc.id, user_id=doc.user_id)
114
+
115
+ # 3. Delete from SQL database
116
+ db.delete(doc)
117
+ stats["deleted"] += 1
118
+ except Exception as exc:
119
+ stats["failed"] += 1
120
+ logger.warning(
121
+ "Failed purging document %s: %s",
122
+ doc.id,
123
+ exc,
124
+ exc_info=True,
125
+ )
126
+
127
+ if not is_dry_run:
128
+ db.commit()
129
+
130
+ logger.info("Document cleanup complete: %s", stats)
131
+ return stats
132
+ finally:
133
+ db.close()
134
+
135
+
136
+ if __name__ == "__main__":
137
+ cleanup_inactive_documents()