Agentic-RagBot / airflow /dags /ingest_pdfs.py
T0X1N's picture
chore: codebase audit and fixes (ruff, mypy, pytest)
9659593
"""
MediGuard AI — Airflow DAG: Ingest Medical PDFs
Periodically scans the medical_pdfs directory, parses new PDFs,
chunks them, generates embeddings, and indexes into OpenSearch.
"""
from __future__ import annotations
from datetime import datetime, timedelta
from airflow.operators.python import PythonOperator
from airflow import DAG
default_args = {
"owner": "mediguard",
"retries": 2,
"retry_delay": timedelta(minutes=5),
"email_on_failure": False,
}
def _ingest_pdfs(**kwargs):
"""Parse all PDFs and index into OpenSearch."""
from pathlib import Path
from src.services.embeddings.service import make_embedding_service
from src.services.indexing.service import IndexingService
from src.services.indexing.text_chunker import MedicalTextChunker
from src.services.opensearch.client import make_opensearch_client
from src.services.pdf_parser.service import make_pdf_parser_service
from src.settings import get_settings
settings = get_settings()
pdf_dir = Path(settings.pdf.pdf_directory)
parser = make_pdf_parser_service()
embedding_svc = make_embedding_service()
os_client = make_opensearch_client()
chunker = MedicalTextChunker(
target_words=settings.chunking.chunk_size,
overlap_words=settings.chunking.chunk_overlap,
min_words=settings.chunking.min_chunk_size,
)
indexing_svc = IndexingService(chunker, embedding_svc, os_client)
docs = parser.parse_directory(pdf_dir)
indexed = 0
for doc in docs:
if doc.full_text and not doc.error:
indexing_svc.index_text(doc.full_text, title=doc.filename, source_file=doc.filename)
indexed += 1
print(f"Ingested {indexed}/{len(docs)} documents")
return {"total": len(docs), "indexed": indexed}
with DAG(
dag_id="mediguard_ingest_pdfs",
default_args=default_args,
description="Parse and index medical PDFs into OpenSearch",
schedule="@daily",
start_date=datetime(2025, 1, 1),
catchup=False,
tags=["mediguard", "indexing"],
) as dag:
ingest = PythonOperator(
task_id="ingest_medical_pdfs",
python_callable=_ingest_pdfs,
)