Multimodel_Rag / scripts /process_pdfs.py
Dhrumil Parikh
deploy GeminiRAG
cdc55f4
Raw
History Blame Contribute Delete
10.9 kB
"""
GeminiRAG β€” PDF Pipeline Processor
====================================
Uploads every PDF from Data set/PDF/ through the live API pipeline:
upload β†’ Celery (extract β†’ summarise β†’ chunk β†’ embed β†’ ChromaDB) β†’ COMPLETED
Logs every step end-to-end with structlog JSON.
Prints a summary table when all PDFs are done.
Usage (from the geminirag directory):
py scripts/process_pdfs.py
"""
import json
import sys
import time
import requests
from pathlib import Path
from datetime import datetime
ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT))
from dotenv import load_dotenv
load_dotenv(ROOT / ".env")
from app.observability.logging import configure_logging, get_logger
configure_logging()
log = get_logger().bind(script="process_pdfs")
# ── config ────────────────────────────────────────────────────────────────────
API_BASE = "http://localhost:8000"
PDF_DIR = ROOT / "Data set" / "PDF"
ADMIN_EMAIL = "admin@test.com"
ADMIN_PASSWORD = "Admin1234!"
POLL_INTERVAL = 5 # seconds between status polls
POLL_TIMEOUT = 600 # max seconds to wait per file (10 min)
PDF_FILES = [
"1706.03762v7 (1).pdf",
"2303.08774v6.pdf",
"9789240094703-eng.pdf",
"WPP2024_Summary-of-Results.pdf",
]
# ─────────────────────────────────────────────────────────────────────────────
# Auth
# ─────────────────────────────────────────────────────────────────────────────
def get_token() -> str:
log.info("auth_login", email=ADMIN_EMAIL)
r = requests.post(
f"{API_BASE}/auth/login",
json={"email": ADMIN_EMAIL, "password": ADMIN_PASSWORD},
timeout=15,
)
r.raise_for_status()
token = r.json()["access_token"]
log.info("auth_login_ok", email=ADMIN_EMAIL)
return token
# ─────────────────────────────────────────────────────────────────────────────
# Upload
# ─────────────────────────────────────────────────────────────────────────────
def upload_pdf(token: str, pdf_path: Path) -> dict:
size_mb = pdf_path.stat().st_size / (1024 * 1024)
log.info("upload_start", filename=pdf_path.name, size_mb=round(size_mb, 2))
with open(pdf_path, "rb") as f:
r = requests.post(
f"{API_BASE}/v1/files/upload",
headers={"Authorization": f"Bearer {token}"},
files={"file": (pdf_path.name, f, "application/pdf")},
timeout=60,
)
r.raise_for_status()
data = r.json()
log.info(
"upload_ok",
filename=pdf_path.name,
job_id=data["job_id"],
status=data["status"],
)
return data
# ─────────────────────────────────────────────────────────────────────────────
# Poll until done
# ─────────────────────────────────────────────────────────────────────────────
def poll_job(token: str, job_id: str, filename: str) -> dict:
headers = {"Authorization": f"Bearer {token}"}
deadline = time.time() + POLL_TIMEOUT
last_step = None
while time.time() < deadline:
r = requests.get(
f"{API_BASE}/v1/jobs/{job_id}",
headers=headers,
timeout=15,
)
r.raise_for_status()
job = r.json()
status = job.get("status", "")
step = job.get("step", "")
# Log each new step transition
if step != last_step:
log.info(
"job_step",
job_id=job_id,
filename=filename,
status=status,
step=step,
retry_count=job.get("retry_count", 0),
)
last_step = step
# Print live progress to console
ts = datetime.now().strftime("%H:%M:%S")
print(f" [{ts}] {filename[:45]:45} {status:12} {step}", flush=True)
if status in ("COMPLETED", "FAILED", "FAILED_PERMANENT"):
if status == "COMPLETED":
log.info(
"job_completed",
job_id=job_id,
filename=filename,
chunk_count=job.get("chunk_count"),
)
else:
log.error(
"job_failed",
job_id=job_id,
filename=filename,
status=status,
error_type=job.get("error_type"),
error_message=job.get("error_message", "")[:300],
)
return job
time.sleep(POLL_INTERVAL)
log.error("job_poll_timeout", job_id=job_id, filename=filename, timeout_s=POLL_TIMEOUT)
return {"job_id": job_id, "status": "TIMEOUT", "filename": filename}
# ─────────────────────────────────────────────────────────────────────────────
# Summary
# ─────────────────────────────────────────────────────────────────────────────
def get_document_summary(token: str, job_id: str) -> dict:
try:
r = requests.get(
f"{API_BASE}/v1/documents/{job_id}/summary",
headers={"Authorization": f"Bearer {token}"},
timeout=15,
)
r.raise_for_status()
return r.json().get("summary", {})
except Exception as e:
log.warning("summary_fetch_failed", job_id=job_id, error=str(e))
return {}
# ─────────────────────────────────────────────────────────────────────────────
# Main
# ─────────────────────────────────────────────────────────────────────────────
def main():
print("\n" + "=" * 70)
print(" GeminiRAG β€” PDF Pipeline Processor")
print(f" Processing {len(PDF_FILES)} PDFs | {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 70)
# Verify PDFs exist
missing = [f for f in PDF_FILES if not (PDF_DIR / f).exists()]
if missing:
print(f"\n[ERROR] Missing files: {missing}")
sys.exit(1)
token = get_token()
results = []
print(f"\n{'FILE':47} {'STATUS':12} STEP")
print("-" * 70)
for filename in PDF_FILES:
pdf_path = PDF_DIR / filename
t_start = time.time()
try:
# 1 β€” upload
upload_data = upload_pdf(token, pdf_path)
job_id = upload_data["job_id"]
# 2 β€” poll until done
final_job = poll_job(token, job_id, filename)
elapsed = int(time.time() - t_start)
# 3 β€” fetch summary if completed
summary = {}
if final_job.get("status") == "COMPLETED":
summary = get_document_summary(token, job_id)
results.append({
"filename": filename,
"job_id": job_id,
"status": final_job.get("status"),
"chunk_count": final_job.get("chunk_count"),
"error": final_job.get("error_message"),
"elapsed_s": elapsed,
"summary": summary,
})
except Exception as e:
log.error("pipeline_error", filename=filename, error=str(e))
results.append({"filename": filename, "status": "ERROR", "error": str(e)})
# ── Final summary table ───────────────────────────────────────────────────
print("\n" + "=" * 70)
print(" RESULTS SUMMARY")
print("=" * 70)
print(f"\n {'FILE':47} {'STATUS':12} {'CHUNKS':>7} {'TIME':>6}")
print(" " + "-" * 66)
total_chunks = 0
for r in results:
chunks = r.get("chunk_count") or "β€”"
elapsed = f"{r.get('elapsed_s','?')}s"
status = r.get("status", "?")
color = "βœ“" if status == "COMPLETED" else "βœ—"
print(f" {color} {r['filename'][:45]:45} {status:12} {str(chunks):>7} {elapsed:>6}")
if isinstance(chunks, int):
total_chunks += chunks
print(f"\n Total chunks indexed to ChromaDB: {total_chunks}")
# ── Per-file summaries ────────────────────────────────────────────────────
print("\n" + "=" * 70)
print(" DOCUMENT SUMMARIES")
print("=" * 70)
for r in results:
if r.get("status") == "COMPLETED" and r.get("summary"):
s = r["summary"]
print(f"\n πŸ“„ {r['filename']}")
print(f" Title : {s.get('title', 'β€”')}")
print(f" Type : {s.get('document_type', 'β€”')}")
print(f" Summary : {s.get('summary', 'β€”')[:200]}")
kp = s.get("key_points", [])
if kp:
print(f" Key Points : {kp[0][:100]}")
for k in kp[1:3]:
print(f" {k[:100]}")
elif r.get("status") != "COMPLETED":
print(f"\n βœ— {r['filename']} β†’ {r.get('error','unknown error')[:200]}")
print("\n" + "=" * 70)
print(f" Done. Job IDs saved below for RAG queries / RAGAS evaluation.")
print("=" * 70)
for r in results:
if r.get("status") == "COMPLETED":
print(f" {r['filename'][:45]} β†’ job_id: {r['job_id']}")
print()
if __name__ == "__main__":
main()