Srushti-Kamble commited on
Commit
fc1232d
·
1 Parent(s): f72065c

Add hourly Google Drive PDF ingestion

Browse files
backend/app/config.py CHANGED
@@ -24,6 +24,11 @@ class Settings(BaseSettings):
24
  JWT_REFRESH_EXPIRY_DAYS: int = 7
25
  GOOGLE_CLIENT_ID: str = ""
26
 
 
 
 
 
 
27
  # ── File Upload ──────────────────────────────────────
28
  UPLOAD_DIR: str = "./data/uploads"
29
  MAX_UPLOAD_SIZE_MB: int = 20
 
24
  JWT_REFRESH_EXPIRY_DAYS: int = 7
25
  GOOGLE_CLIENT_ID: str = ""
26
 
27
+ # Google Drive background sync
28
+ DRIVE_SYNC_ENABLED: bool = False
29
+ DRIVE_SYNC_INTERVAL_MINUTES: int = 60
30
+ GOOGLE_SERVICE_ACCOUNT_FILE: str = ""
31
+
32
  # ── File Upload ──────────────────────────────────────
33
  UPLOAD_DIR: str = "./data/uploads"
34
  MAX_UPLOAD_SIZE_MB: int = 20
backend/app/database.py CHANGED
@@ -44,13 +44,15 @@ def _migrate_schema():
44
  for non-destructive changes such as new nullable columns.
45
  """
46
  inspector = inspect(engine)
47
- existing_columns = {c["name"] for c in inspector.get_columns("users")}
48
-
49
  migrations = [
50
  ("users", "hf_token", "ALTER TABLE users ADD COLUMN hf_token VARCHAR(255)"),
 
 
 
51
  ]
52
 
53
  for table, column, ddl in migrations:
 
54
  if column not in existing_columns:
55
  try:
56
  with engine.begin() as conn:
 
44
  for non-destructive changes such as new nullable columns.
45
  """
46
  inspector = inspect(engine)
 
 
47
  migrations = [
48
  ("users", "hf_token", "ALTER TABLE users ADD COLUMN hf_token VARCHAR(255)"),
49
+ ("documents", "drive_file_id", "ALTER TABLE documents ADD COLUMN drive_file_id VARCHAR(255)"),
50
+ ("documents", "drive_folder_id", "ALTER TABLE documents ADD COLUMN drive_folder_id VARCHAR(255)"),
51
+ ("documents", "drive_synced_at", "ALTER TABLE documents ADD COLUMN drive_synced_at TIMESTAMP"),
52
  ]
53
 
54
  for table, column, ddl in migrations:
55
+ existing_columns = {c["name"] for c in inspector.get_columns(table)}
56
  if column not in existing_columns:
57
  try:
58
  with engine.begin() as conn:
backend/app/main.py CHANGED
@@ -20,6 +20,7 @@ from app.config import get_settings
20
  from app.rate_limit import limiter
21
  from app.database import init_db, get_db
22
  from app.rag.vectorstore import get_chroma_client
 
23
 
24
  # Configure logging
25
  logging.basicConfig(
@@ -53,9 +54,12 @@ async def lifespan(app: FastAPI):
53
  except Exception as e:
54
  logger.warning(f"Failed to pre-load embedding model: {e}")
55
 
 
 
56
  yield
57
 
58
  # ── Shutdown ─────────────────────────────────────
 
59
  logger.info("Shutting down")
60
 
61
 
 
20
  from app.rate_limit import limiter
21
  from app.database import init_db, get_db
22
  from app.rag.vectorstore import get_chroma_client
23
+ from app.scheduler import start_scheduler, stop_scheduler
24
 
25
  # Configure logging
26
  logging.basicConfig(
 
54
  except Exception as e:
55
  logger.warning(f"Failed to pre-load embedding model: {e}")
56
 
57
+ start_scheduler()
58
+
59
  yield
60
 
61
  # ── Shutdown ─────────────────────────────────────
62
+ stop_scheduler()
63
  logger.info("Shutting down")
64
 
65
 
backend/app/models.py CHANGED
@@ -28,6 +28,7 @@ class User(Base):
28
  documents = relationship("Document", back_populates="owner", cascade="all, delete-orphan")
29
  messages = relationship("ChatMessage", back_populates="user", cascade="all, delete-orphan")
30
  api_keys = relationship("ApiKey", back_populates="user", cascade="all, delete-orphan")
 
31
 
32
 
33
  class ApiKey(Base):
@@ -58,6 +59,9 @@ class Document(Base):
58
  error_message = Column(Text, nullable=True)
59
  uploaded_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
60
  summary = Column(Text, nullable=True) # Optional summary of the document's content
 
 
 
61
 
62
  # Relationships
63
  owner = relationship("User", back_populates="documents")
@@ -81,6 +85,22 @@ class ChatMessage(Base):
81
  shared_message = relationship("SharedMessage", back_populates="message", uselist=False, cascade="all, delete-orphan")
82
 
83
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
84
  class SharedMessage(Base):
85
  __tablename__ = "shared_messages"
86
 
 
28
  documents = relationship("Document", back_populates="owner", cascade="all, delete-orphan")
29
  messages = relationship("ChatMessage", back_populates="user", cascade="all, delete-orphan")
30
  api_keys = relationship("ApiKey", back_populates="user", cascade="all, delete-orphan")
31
+ drive_connections = relationship("DriveConnection", back_populates="user", cascade="all, delete-orphan")
32
 
33
 
34
  class ApiKey(Base):
 
59
  error_message = Column(Text, nullable=True)
60
  uploaded_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
61
  summary = Column(Text, nullable=True) # Optional summary of the document's content
62
+ drive_file_id = Column(String(255), unique=True, nullable=True, index=True)
63
+ drive_folder_id = Column(String(255), nullable=True, index=True)
64
+ drive_synced_at = Column(DateTime, nullable=True)
65
 
66
  # Relationships
67
  owner = relationship("User", back_populates="documents")
 
85
  shared_message = relationship("SharedMessage", back_populates="message", uselist=False, cascade="all, delete-orphan")
86
 
87
 
88
+ class DriveConnection(Base):
89
+ __tablename__ = "drive_connections"
90
+
91
+ id = Column(String, primary_key=True, default=generate_uuid)
92
+ user_id = Column(String, ForeignKey("users.id"), nullable=False, index=True)
93
+ folder_id = Column(String(255), nullable=False, index=True)
94
+ credentials_json = Column(Text, nullable=True)
95
+ service_account_file = Column(String(500), nullable=True)
96
+ enabled = Column(Boolean, default=True, nullable=False)
97
+ last_synced_at = Column(DateTime, nullable=True)
98
+ created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
99
+ updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
100
+
101
+ user = relationship("User", back_populates="drive_connections")
102
+
103
+
104
  class SharedMessage(Base):
105
  __tablename__ = "shared_messages"
106
 
backend/app/routes/documents.py CHANGED
@@ -19,8 +19,8 @@ from app.models import User, Document
19
  from app.schemas import DocumentResponse, DocumentListResponse, DocumentStatusResponse
20
  from app.auth import get_current_user
21
  from app.config import get_settings
22
- from app.rag.chunker import chunk_document, get_page_count
23
- from app.rag.vectorstore import store_chunks, delete_document_chunks
24
  from sqlalchemy import select
25
  logger = logging.getLogger(__name__)
26
  settings = get_settings()
@@ -113,96 +113,7 @@ async def validate_upload(file: UploadFile):
113
  pass
114
 
115
 
116
- def _ingest_document(document_id: str, filepath: str, original_name: str, user_id: str):
117
- """
118
- Process a document in the background: chunk document, generate embeddings, and store in ChromaDB,
119
- calls document summary function, and update the database record.
120
-
121
- This function is intended to be run as a background task.
122
- It creates its own database session, updates the
123
- document status, extracts text, splits into chunks, generates embeddings,
124
- stores everything in ChromaDB, calls summary function, updates the document record with page count,
125
- chunk count, and summary, and marks the document as 'ready'.
126
- On failure, it sets status to 'failed' and records the error message.
127
-
128
- Args:
129
- document_id: Unique identifier of the document in the database.
130
- filepath: Absolute or relative path to the uploaded file on disk.
131
- original_name: original filename provided by the user (for logging and metadata).
132
- user_id: Identifier of the user who owns the document.
133
-
134
- Returns:
135
- None
136
-
137
- Note:
138
- This function does not raise exceptions to the caller;
139
- all errors are logged and the database record is updated accordingly.
140
- """
141
- from app.database import SessionLocal
142
-
143
- db = SessionLocal()
144
- try:
145
- doc = db.query(Document).filter(Document.id == document_id).first()
146
- if not doc:
147
- logger.error(f"Document {document_id} not found for ingestion")
148
- return
149
-
150
- # Update status to processing
151
- doc.status = "processing"
152
- db.commit()
153
-
154
- # Get page count
155
- page_count = get_page_count(filepath)
156
- doc.page_count = page_count
157
-
158
- # Chunk the document
159
- chunks = chunk_document(filepath)
160
-
161
- if not chunks:
162
- doc.status = "failed"
163
- doc.error_message = "No text could be extracted from the document"
164
- db.commit()
165
- return
166
-
167
- # Store embeddings in ChromaDB
168
- chunk_count = store_chunks(
169
- chunks=chunks,
170
- document_id=document_id,
171
- filename=original_name,
172
- user_id=user_id,
173
- )
174
-
175
- # Generate summary and update document record
176
- try:
177
- from app.rag.summarizer import generate_document_summary
178
-
179
- summary = generate_document_summary(filepath, max_sentences=2)
180
- if summary:
181
- doc.summary = summary
182
- db.commit() # Update document record with summary
183
- except Exception as e:
184
- logger.warning(f"Could not import summarizer for document {document_id}: {e}")
185
- doc.summary = None
186
-
187
- # Update document record
188
- doc.chunk_count = chunk_count
189
- doc.status = "ready"
190
- db.commit()
191
-
192
- logger.info(f"Document {document_id} ingested: {page_count} pages, {chunk_count} chunks")
193
-
194
- except Exception as e:
195
- logger.error(f"Ingestion error for {document_id}: {e}")
196
- try:
197
- doc = db.query(Document).filter(Document.id == document_id).first()
198
- if doc:
199
- doc.status = "failed"
200
- doc.error_message = str(e)[:500]
201
- db.commit()
202
- except Exception:
203
- pass
204
- finally:
205
- db.close()
206
 
207
 
208
  @router.post("/upload", response_model=DocumentResponse, status_code=status.HTTP_202_ACCEPTED)
 
19
  from app.schemas import DocumentResponse, DocumentListResponse, DocumentStatusResponse
20
  from app.auth import get_current_user
21
  from app.config import get_settings
22
+ from app.rag.vectorstore import delete_document_chunks
23
+ from app.services.document_ingestion import ingest_document
24
  from sqlalchemy import select
25
  logger = logging.getLogger(__name__)
26
  settings = get_settings()
 
113
  pass
114
 
115
 
116
+ _ingest_document = ingest_document
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
117
 
118
 
119
  @router.post("/upload", response_model=DocumentResponse, status_code=status.HTTP_202_ACCEPTED)
backend/app/scheduler.py ADDED
@@ -0,0 +1,58 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Application background scheduler."""
2
+ import logging
3
+
4
+ from app.config import get_settings
5
+ from app.services.drive_sync import sync_drive_pdfs_with_session
6
+
7
+ logger = logging.getLogger(__name__)
8
+ settings = get_settings()
9
+ _scheduler = None
10
+
11
+
12
+ def start_scheduler():
13
+ """Start recurring backend jobs."""
14
+ global _scheduler
15
+
16
+ if _scheduler and _scheduler.running:
17
+ return _scheduler
18
+
19
+ try:
20
+ from apscheduler.schedulers.background import BackgroundScheduler
21
+ from apscheduler.triggers.interval import IntervalTrigger
22
+ except ImportError:
23
+ if settings.DRIVE_SYNC_ENABLED:
24
+ logger.warning("Drive PDF sync enabled but APScheduler is not installed")
25
+ return None
26
+
27
+ _scheduler = BackgroundScheduler(timezone="UTC")
28
+
29
+ if settings.DRIVE_SYNC_ENABLED:
30
+ _scheduler.add_job(
31
+ sync_drive_pdfs_with_session,
32
+ trigger=IntervalTrigger(minutes=settings.DRIVE_SYNC_INTERVAL_MINUTES),
33
+ id="drive_pdf_sync",
34
+ name="Hourly Google Drive PDF sync",
35
+ replace_existing=True,
36
+ max_instances=1,
37
+ coalesce=True,
38
+ misfire_grace_time=300,
39
+ )
40
+ logger.info(
41
+ "Drive PDF sync scheduled every %s minutes",
42
+ settings.DRIVE_SYNC_INTERVAL_MINUTES,
43
+ )
44
+ else:
45
+ logger.info("Drive PDF sync disabled")
46
+
47
+ _scheduler.start()
48
+ return _scheduler
49
+
50
+
51
+ def stop_scheduler():
52
+ """Stop recurring backend jobs."""
53
+ global _scheduler
54
+
55
+ if _scheduler and _scheduler.running:
56
+ _scheduler.shutdown(wait=False)
57
+ logger.info("Background scheduler stopped")
58
+ _scheduler = None
backend/app/services/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ """Backend service layer helpers."""
backend/app/services/document_ingestion.py ADDED
@@ -0,0 +1,80 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Reusable document ingestion pipeline."""
2
+ import logging
3
+
4
+ from app.models import Document
5
+ from app.rag.chunker import chunk_document, get_page_count
6
+ from app.rag.vectorstore import store_chunks
7
+
8
+ logger = logging.getLogger(__name__)
9
+
10
+
11
+ def ingest_document(document_id: str, filepath: str, original_name: str, user_id: str):
12
+ """
13
+ Process a document: chunk it, generate embeddings, store vectors, summarize,
14
+ and update the database record.
15
+ """
16
+ from app.database import SessionLocal
17
+
18
+ db = SessionLocal()
19
+ try:
20
+ doc = db.query(Document).filter(Document.id == document_id).first()
21
+ if not doc:
22
+ logger.error("Document %s not found for ingestion", document_id)
23
+ return
24
+
25
+ doc.status = "processing"
26
+ db.commit()
27
+
28
+ page_count = get_page_count(filepath)
29
+ doc.page_count = page_count
30
+
31
+ chunks = chunk_document(filepath)
32
+
33
+ if not chunks:
34
+ doc.status = "failed"
35
+ doc.error_message = "No text could be extracted from the document"
36
+ db.commit()
37
+ return
38
+
39
+ chunk_count = store_chunks(
40
+ chunks=chunks,
41
+ document_id=document_id,
42
+ filename=original_name,
43
+ user_id=user_id,
44
+ )
45
+
46
+ try:
47
+ from app.rag.summarizer import generate_document_summary
48
+
49
+ summary = generate_document_summary(filepath, max_sentences=2)
50
+ if summary:
51
+ doc.summary = summary
52
+ db.commit()
53
+ except Exception as e:
54
+ logger.warning("Could not generate summary for document %s: %s", document_id, e)
55
+ doc.summary = None
56
+
57
+ doc.chunk_count = chunk_count
58
+ doc.status = "ready"
59
+ doc.error_message = None
60
+ db.commit()
61
+
62
+ logger.info(
63
+ "Document %s ingested: %s pages, %s chunks",
64
+ document_id,
65
+ page_count,
66
+ chunk_count,
67
+ )
68
+
69
+ except Exception as e:
70
+ logger.error("Ingestion error for %s: %s", document_id, e)
71
+ try:
72
+ doc = db.query(Document).filter(Document.id == document_id).first()
73
+ if doc:
74
+ doc.status = "failed"
75
+ doc.error_message = str(e)[:500]
76
+ db.commit()
77
+ except Exception:
78
+ logger.exception("Failed to mark document %s as failed", document_id)
79
+ finally:
80
+ db.close()
backend/app/services/drive_sync.py ADDED
@@ -0,0 +1,188 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Google Drive PDF discovery and ingestion service."""
2
+ import io
3
+ import json
4
+ import logging
5
+ import os
6
+ import threading
7
+ import uuid
8
+ from datetime import datetime, timezone
9
+ from pathlib import Path
10
+
11
+ from sqlalchemy.orm import Session
12
+
13
+ from app.config import get_settings
14
+ from app.models import Document, DriveConnection
15
+ from app.services.document_ingestion import ingest_document
16
+
17
+ logger = logging.getLogger(__name__)
18
+ settings = get_settings()
19
+ _sync_lock = threading.Lock()
20
+
21
+ PDF_MIME_TYPE = "application/pdf"
22
+ DRIVE_SCOPES = ["https://www.googleapis.com/auth/drive.readonly"]
23
+
24
+
25
+ def _build_drive_service(connection: DriveConnection):
26
+ """Build a Google Drive API client for a saved connection."""
27
+ try:
28
+ from google.oauth2.credentials import Credentials
29
+ from google.oauth2 import service_account
30
+ from googleapiclient.discovery import build
31
+ except ImportError as exc:
32
+ raise RuntimeError(
33
+ "Google Drive sync requires google-api-python-client and google-auth"
34
+ ) from exc
35
+
36
+ if connection.credentials_json:
37
+ credentials = Credentials.from_authorized_user_info(
38
+ json.loads(connection.credentials_json),
39
+ scopes=DRIVE_SCOPES,
40
+ )
41
+ else:
42
+ service_account_file = (
43
+ connection.service_account_file or settings.GOOGLE_SERVICE_ACCOUNT_FILE
44
+ )
45
+ if not service_account_file:
46
+ raise RuntimeError("Drive connection has no OAuth credentials or service account file")
47
+ credentials = service_account.Credentials.from_service_account_file(
48
+ service_account_file,
49
+ scopes=DRIVE_SCOPES,
50
+ )
51
+
52
+ return build("drive", "v3", credentials=credentials, cache_discovery=False)
53
+
54
+
55
+ def _list_pdf_files(service, folder_id: str) -> list[dict]:
56
+ """Return PDF files directly inside a Drive folder."""
57
+ files: list[dict] = []
58
+ page_token = None
59
+ query = (
60
+ f"'{folder_id}' in parents and "
61
+ f"mimeType = '{PDF_MIME_TYPE}' and trashed = false"
62
+ )
63
+
64
+ while True:
65
+ response = (
66
+ service.files()
67
+ .list(
68
+ q=query,
69
+ spaces="drive",
70
+ fields="nextPageToken, files(id, name, mimeType, size, modifiedTime)",
71
+ pageToken=page_token,
72
+ )
73
+ .execute()
74
+ )
75
+ files.extend(response.get("files", []))
76
+ page_token = response.get("nextPageToken")
77
+ if not page_token:
78
+ return files
79
+
80
+
81
+ def _download_drive_file(service, file_id: str, destination: str):
82
+ """Download a Drive file to disk."""
83
+ from googleapiclient.http import MediaIoBaseDownload
84
+
85
+ request = service.files().get_media(fileId=file_id)
86
+ with io.FileIO(destination, "wb") as file_handle:
87
+ downloader = MediaIoBaseDownload(file_handle, request)
88
+ done = False
89
+ while not done:
90
+ _, done = downloader.next_chunk()
91
+
92
+
93
+ def sync_drive_pdfs(db: Session) -> dict:
94
+ """
95
+ Discover new PDFs from enabled Drive connections, download them, and ingest them.
96
+ """
97
+ if not _sync_lock.acquire(blocking=False):
98
+ logger.info("Drive sync skipped because a previous run is still active")
99
+ return {"connections": 0, "discovered": 0, "ingested": 0, "skipped": 0, "failed": 0}
100
+
101
+ stats = {"connections": 0, "discovered": 0, "ingested": 0, "skipped": 0, "failed": 0}
102
+ try:
103
+ connections = (
104
+ db.query(DriveConnection)
105
+ .filter(DriveConnection.enabled.is_(True))
106
+ .all()
107
+ )
108
+ stats["connections"] = len(connections)
109
+
110
+ for connection in connections:
111
+ try:
112
+ service = _build_drive_service(connection)
113
+ files = _list_pdf_files(service, connection.folder_id)
114
+ stats["discovered"] += len(files)
115
+
116
+ for drive_file in files:
117
+ existing = (
118
+ db.query(Document)
119
+ .filter(Document.drive_file_id == drive_file["id"])
120
+ .first()
121
+ )
122
+ if existing:
123
+ stats["skipped"] += 1
124
+ continue
125
+
126
+ user_dir = os.path.join(settings.UPLOAD_DIR, connection.user_id)
127
+ os.makedirs(user_dir, exist_ok=True)
128
+ stored_filename = f"{uuid.uuid4().hex}.pdf"
129
+ filepath = os.path.join(user_dir, stored_filename)
130
+
131
+ document = Document(
132
+ user_id=connection.user_id,
133
+ filename=stored_filename,
134
+ original_name=drive_file.get("name") or stored_filename,
135
+ file_size=int(drive_file.get("size") or 0),
136
+ status="pending",
137
+ drive_file_id=drive_file["id"],
138
+ drive_folder_id=connection.folder_id,
139
+ drive_synced_at=datetime.now(timezone.utc),
140
+ )
141
+ db.add(document)
142
+ db.commit()
143
+ db.refresh(document)
144
+
145
+ try:
146
+ _download_drive_file(service, drive_file["id"], filepath)
147
+ document.file_size = Path(filepath).stat().st_size
148
+ db.commit()
149
+ ingest_document(
150
+ document_id=document.id,
151
+ filepath=filepath,
152
+ original_name=document.original_name,
153
+ user_id=connection.user_id,
154
+ )
155
+ stats["ingested"] += 1
156
+ except Exception as exc:
157
+ logger.exception(
158
+ "Drive ingestion failed for file %s",
159
+ drive_file.get("id"),
160
+ )
161
+ db.refresh(document)
162
+ document.status = "failed"
163
+ document.error_message = str(exc)[:500]
164
+ db.commit()
165
+ stats["failed"] += 1
166
+
167
+ connection.last_synced_at = datetime.now(timezone.utc)
168
+ connection.updated_at = datetime.now(timezone.utc)
169
+ db.commit()
170
+ except Exception:
171
+ logger.exception("Drive sync failed for connection %s", connection.id)
172
+ stats["failed"] += 1
173
+
174
+ logger.info("Drive sync complete: %s", stats)
175
+ return stats
176
+ finally:
177
+ _sync_lock.release()
178
+
179
+
180
+ def sync_drive_pdfs_with_session() -> dict:
181
+ """Run Drive sync with an owned database session."""
182
+ from app.database import SessionLocal
183
+
184
+ db = SessionLocal()
185
+ try:
186
+ return sync_drive_pdfs(db)
187
+ finally:
188
+ db.close()
backend/requirements.txt CHANGED
@@ -15,6 +15,8 @@ pyjwt
15
  passlib[bcrypt]
16
  python-dotenv
17
  google-auth
 
 
18
 
19
  # Config
20
  pydantic-settings
 
15
  passlib[bcrypt]
16
  python-dotenv
17
  google-auth
18
+ google-api-python-client
19
+ APScheduler
20
 
21
  # Config
22
  pydantic-settings
backend/tests/test_drive_sync.py ADDED
@@ -0,0 +1,43 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from pathlib import Path
2
+
3
+ from app.models import Document, DriveConnection
4
+ from app.services import drive_sync
5
+
6
+
7
+ def test_drive_sync_ingests_new_pdf_and_skips_existing(db_session, user, tmp_path, monkeypatch):
8
+ monkeypatch.setattr(drive_sync.settings, "UPLOAD_DIR", str(tmp_path))
9
+ monkeypatch.setattr(drive_sync, "_build_drive_service", lambda connection: object())
10
+ monkeypatch.setattr(
11
+ drive_sync,
12
+ "_list_pdf_files",
13
+ lambda service, folder_id: [
14
+ {"id": "drive-file-1", "name": "Guide.pdf", "size": "0"},
15
+ ],
16
+ )
17
+
18
+ def fake_download(service, file_id, destination):
19
+ Path(destination).write_bytes(b"%PDF-1.4\n")
20
+
21
+ def fake_ingest(document_id, filepath, original_name, user_id):
22
+ document = db_session.query(Document).filter(Document.id == document_id).one()
23
+ document.status = "ready"
24
+ document.page_count = 1
25
+ document.chunk_count = 1
26
+ db_session.commit()
27
+
28
+ monkeypatch.setattr(drive_sync, "_download_drive_file", fake_download)
29
+ monkeypatch.setattr(drive_sync, "ingest_document", fake_ingest)
30
+
31
+ connection = DriveConnection(user_id=user.id, folder_id="folder-1")
32
+ db_session.add(connection)
33
+ db_session.commit()
34
+
35
+ first_stats = drive_sync.sync_drive_pdfs(db_session)
36
+ second_stats = drive_sync.sync_drive_pdfs(db_session)
37
+
38
+ document = db_session.query(Document).filter(Document.drive_file_id == "drive-file-1").one()
39
+ assert first_stats["ingested"] == 1
40
+ assert second_stats["skipped"] == 1
41
+ assert document.status == "ready"
42
+ assert document.original_name == "Guide.pdf"
43
+ assert document.drive_folder_id == "folder-1"