Spaces:
Sleeping
Sleeping
| """ | |
| MediGuard AI — Airflow DAG: Ingest Medical PDFs | |
| Periodically scans the medical_pdfs directory, parses new PDFs, | |
| chunks them, generates embeddings, and indexes into OpenSearch. | |
| """ | |
| from __future__ import annotations | |
| from datetime import datetime, timedelta | |
| from airflow.operators.python import PythonOperator | |
| from airflow import DAG | |
| default_args = { | |
| "owner": "mediguard", | |
| "retries": 2, | |
| "retry_delay": timedelta(minutes=5), | |
| "email_on_failure": False, | |
| } | |
| def _ingest_pdfs(**kwargs): | |
| """Parse all PDFs and index into OpenSearch.""" | |
| from pathlib import Path | |
| from src.services.embeddings.service import make_embedding_service | |
| from src.services.indexing.service import IndexingService | |
| from src.services.indexing.text_chunker import MedicalTextChunker | |
| from src.services.opensearch.client import make_opensearch_client | |
| from src.services.pdf_parser.service import make_pdf_parser_service | |
| from src.settings import get_settings | |
| settings = get_settings() | |
| pdf_dir = Path(settings.pdf.pdf_directory) | |
| parser = make_pdf_parser_service() | |
| embedding_svc = make_embedding_service() | |
| os_client = make_opensearch_client() | |
| chunker = MedicalTextChunker( | |
| target_words=settings.chunking.chunk_size, | |
| overlap_words=settings.chunking.chunk_overlap, | |
| min_words=settings.chunking.min_chunk_size, | |
| ) | |
| indexing_svc = IndexingService(chunker, embedding_svc, os_client) | |
| docs = parser.parse_directory(pdf_dir) | |
| indexed = 0 | |
| for doc in docs: | |
| if doc.full_text and not doc.error: | |
| indexing_svc.index_text(doc.full_text, title=doc.filename, source_file=doc.filename) | |
| indexed += 1 | |
| print(f"Ingested {indexed}/{len(docs)} documents") | |
| return {"total": len(docs), "indexed": indexed} | |
| with DAG( | |
| dag_id="mediguard_ingest_pdfs", | |
| default_args=default_args, | |
| description="Parse and index medical PDFs into OpenSearch", | |
| schedule="@daily", | |
| start_date=datetime(2025, 1, 1), | |
| catchup=False, | |
| tags=["mediguard", "indexing"], | |
| ) as dag: | |
| ingest = PythonOperator( | |
| task_id="ingest_medical_pdfs", | |
| python_callable=_ingest_pdfs, | |
| ) | |