Spaces:
Sleeping
Sleeping
| """ | |
| GeminiRAG β PDF Pipeline Processor | |
| ==================================== | |
| Uploads every PDF from Data set/PDF/ through the live API pipeline: | |
| upload β Celery (extract β summarise β chunk β embed β ChromaDB) β COMPLETED | |
| Logs every step end-to-end with structlog JSON. | |
| Prints a summary table when all PDFs are done. | |
| Usage (from the geminirag directory): | |
| py scripts/process_pdfs.py | |
| """ | |
| import json | |
| import sys | |
| import time | |
| import requests | |
| from pathlib import Path | |
| from datetime import datetime | |
| ROOT = Path(__file__).resolve().parent.parent | |
| sys.path.insert(0, str(ROOT)) | |
| from dotenv import load_dotenv | |
| load_dotenv(ROOT / ".env") | |
| from app.observability.logging import configure_logging, get_logger | |
| configure_logging() | |
| log = get_logger().bind(script="process_pdfs") | |
| # ββ config ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| API_BASE = "http://localhost:8000" | |
| PDF_DIR = ROOT / "Data set" / "PDF" | |
| ADMIN_EMAIL = "admin@test.com" | |
| ADMIN_PASSWORD = "Admin1234!" | |
| POLL_INTERVAL = 5 # seconds between status polls | |
| POLL_TIMEOUT = 600 # max seconds to wait per file (10 min) | |
| PDF_FILES = [ | |
| "1706.03762v7 (1).pdf", | |
| "2303.08774v6.pdf", | |
| "9789240094703-eng.pdf", | |
| "WPP2024_Summary-of-Results.pdf", | |
| ] | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Auth | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_token() -> str: | |
| log.info("auth_login", email=ADMIN_EMAIL) | |
| r = requests.post( | |
| f"{API_BASE}/auth/login", | |
| json={"email": ADMIN_EMAIL, "password": ADMIN_PASSWORD}, | |
| timeout=15, | |
| ) | |
| r.raise_for_status() | |
| token = r.json()["access_token"] | |
| log.info("auth_login_ok", email=ADMIN_EMAIL) | |
| return token | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Upload | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def upload_pdf(token: str, pdf_path: Path) -> dict: | |
| size_mb = pdf_path.stat().st_size / (1024 * 1024) | |
| log.info("upload_start", filename=pdf_path.name, size_mb=round(size_mb, 2)) | |
| with open(pdf_path, "rb") as f: | |
| r = requests.post( | |
| f"{API_BASE}/v1/files/upload", | |
| headers={"Authorization": f"Bearer {token}"}, | |
| files={"file": (pdf_path.name, f, "application/pdf")}, | |
| timeout=60, | |
| ) | |
| r.raise_for_status() | |
| data = r.json() | |
| log.info( | |
| "upload_ok", | |
| filename=pdf_path.name, | |
| job_id=data["job_id"], | |
| status=data["status"], | |
| ) | |
| return data | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Poll until done | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def poll_job(token: str, job_id: str, filename: str) -> dict: | |
| headers = {"Authorization": f"Bearer {token}"} | |
| deadline = time.time() + POLL_TIMEOUT | |
| last_step = None | |
| while time.time() < deadline: | |
| r = requests.get( | |
| f"{API_BASE}/v1/jobs/{job_id}", | |
| headers=headers, | |
| timeout=15, | |
| ) | |
| r.raise_for_status() | |
| job = r.json() | |
| status = job.get("status", "") | |
| step = job.get("step", "") | |
| # Log each new step transition | |
| if step != last_step: | |
| log.info( | |
| "job_step", | |
| job_id=job_id, | |
| filename=filename, | |
| status=status, | |
| step=step, | |
| retry_count=job.get("retry_count", 0), | |
| ) | |
| last_step = step | |
| # Print live progress to console | |
| ts = datetime.now().strftime("%H:%M:%S") | |
| print(f" [{ts}] {filename[:45]:45} {status:12} {step}", flush=True) | |
| if status in ("COMPLETED", "FAILED", "FAILED_PERMANENT"): | |
| if status == "COMPLETED": | |
| log.info( | |
| "job_completed", | |
| job_id=job_id, | |
| filename=filename, | |
| chunk_count=job.get("chunk_count"), | |
| ) | |
| else: | |
| log.error( | |
| "job_failed", | |
| job_id=job_id, | |
| filename=filename, | |
| status=status, | |
| error_type=job.get("error_type"), | |
| error_message=job.get("error_message", "")[:300], | |
| ) | |
| return job | |
| time.sleep(POLL_INTERVAL) | |
| log.error("job_poll_timeout", job_id=job_id, filename=filename, timeout_s=POLL_TIMEOUT) | |
| return {"job_id": job_id, "status": "TIMEOUT", "filename": filename} | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Summary | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_document_summary(token: str, job_id: str) -> dict: | |
| try: | |
| r = requests.get( | |
| f"{API_BASE}/v1/documents/{job_id}/summary", | |
| headers={"Authorization": f"Bearer {token}"}, | |
| timeout=15, | |
| ) | |
| r.raise_for_status() | |
| return r.json().get("summary", {}) | |
| except Exception as e: | |
| log.warning("summary_fetch_failed", job_id=job_id, error=str(e)) | |
| return {} | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Main | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def main(): | |
| print("\n" + "=" * 70) | |
| print(" GeminiRAG β PDF Pipeline Processor") | |
| print(f" Processing {len(PDF_FILES)} PDFs | {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| print("=" * 70) | |
| # Verify PDFs exist | |
| missing = [f for f in PDF_FILES if not (PDF_DIR / f).exists()] | |
| if missing: | |
| print(f"\n[ERROR] Missing files: {missing}") | |
| sys.exit(1) | |
| token = get_token() | |
| results = [] | |
| print(f"\n{'FILE':47} {'STATUS':12} STEP") | |
| print("-" * 70) | |
| for filename in PDF_FILES: | |
| pdf_path = PDF_DIR / filename | |
| t_start = time.time() | |
| try: | |
| # 1 β upload | |
| upload_data = upload_pdf(token, pdf_path) | |
| job_id = upload_data["job_id"] | |
| # 2 β poll until done | |
| final_job = poll_job(token, job_id, filename) | |
| elapsed = int(time.time() - t_start) | |
| # 3 β fetch summary if completed | |
| summary = {} | |
| if final_job.get("status") == "COMPLETED": | |
| summary = get_document_summary(token, job_id) | |
| results.append({ | |
| "filename": filename, | |
| "job_id": job_id, | |
| "status": final_job.get("status"), | |
| "chunk_count": final_job.get("chunk_count"), | |
| "error": final_job.get("error_message"), | |
| "elapsed_s": elapsed, | |
| "summary": summary, | |
| }) | |
| except Exception as e: | |
| log.error("pipeline_error", filename=filename, error=str(e)) | |
| results.append({"filename": filename, "status": "ERROR", "error": str(e)}) | |
| # ββ Final summary table βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("\n" + "=" * 70) | |
| print(" RESULTS SUMMARY") | |
| print("=" * 70) | |
| print(f"\n {'FILE':47} {'STATUS':12} {'CHUNKS':>7} {'TIME':>6}") | |
| print(" " + "-" * 66) | |
| total_chunks = 0 | |
| for r in results: | |
| chunks = r.get("chunk_count") or "β" | |
| elapsed = f"{r.get('elapsed_s','?')}s" | |
| status = r.get("status", "?") | |
| color = "β" if status == "COMPLETED" else "β" | |
| print(f" {color} {r['filename'][:45]:45} {status:12} {str(chunks):>7} {elapsed:>6}") | |
| if isinstance(chunks, int): | |
| total_chunks += chunks | |
| print(f"\n Total chunks indexed to ChromaDB: {total_chunks}") | |
| # ββ Per-file summaries ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("\n" + "=" * 70) | |
| print(" DOCUMENT SUMMARIES") | |
| print("=" * 70) | |
| for r in results: | |
| if r.get("status") == "COMPLETED" and r.get("summary"): | |
| s = r["summary"] | |
| print(f"\n π {r['filename']}") | |
| print(f" Title : {s.get('title', 'β')}") | |
| print(f" Type : {s.get('document_type', 'β')}") | |
| print(f" Summary : {s.get('summary', 'β')[:200]}") | |
| kp = s.get("key_points", []) | |
| if kp: | |
| print(f" Key Points : {kp[0][:100]}") | |
| for k in kp[1:3]: | |
| print(f" {k[:100]}") | |
| elif r.get("status") != "COMPLETED": | |
| print(f"\n β {r['filename']} β {r.get('error','unknown error')[:200]}") | |
| print("\n" + "=" * 70) | |
| print(f" Done. Job IDs saved below for RAG queries / RAGAS evaluation.") | |
| print("=" * 70) | |
| for r in results: | |
| if r.get("status") == "COMPLETED": | |
| print(f" {r['filename'][:45]} β job_id: {r['job_id']}") | |
| print() | |
| if __name__ == "__main__": | |
| main() | |