Spaces:
Running
Running
| """ | |
| MediGuard AI β Indexing Service | |
| Orchestrates: PDF parse β chunk β embed β index into OpenSearch. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import uuid | |
| from datetime import UTC, datetime | |
| from src.services.indexing.text_chunker import MedicalChunk | |
| logger = logging.getLogger(__name__) | |
| class IndexingService: | |
| """Coordinates chunking β embedding β OpenSearch indexing.""" | |
| def __init__(self, chunker, embedding_service, opensearch_client): | |
| self.chunker = chunker | |
| self.embedding_service = embedding_service | |
| self.opensearch_client = opensearch_client | |
| def index_text( | |
| self, | |
| text: str, | |
| *, | |
| document_id: str = "", | |
| title: str = "", | |
| source_file: str = "", | |
| ) -> int: | |
| """Chunk, embed, and index a single document's text. Returns count of indexed chunks.""" | |
| if not document_id: | |
| document_id = str(uuid.uuid4()) | |
| chunks = self.chunker.chunk_text( | |
| text, | |
| document_id=document_id, | |
| title=title, | |
| source_file=source_file, | |
| ) | |
| if not chunks: | |
| logger.warning("No chunks generated for document '%s'", title) | |
| return 0 | |
| # Embed all chunks | |
| texts = [c.text for c in chunks] | |
| embeddings = self.embedding_service.embed_documents(texts) | |
| # Prepare OpenSearch documents | |
| now = datetime.now(UTC).isoformat() | |
| docs: list[dict] = [] | |
| for chunk, emb in zip(chunks, embeddings): | |
| doc = chunk.to_dict() | |
| doc["_id"] = f"{document_id}_{chunk.chunk_index}" | |
| doc["embedding"] = emb | |
| doc["indexed_at"] = now | |
| docs.append(doc) | |
| indexed = self.opensearch_client.bulk_index(docs) | |
| logger.info( | |
| "Indexed %d chunks for '%s' (document_id=%s)", | |
| indexed, | |
| title, | |
| document_id, | |
| ) | |
| return indexed | |
| def index_chunks(self, chunks: list[MedicalChunk]) -> int: | |
| """Embed and index pre-built chunks.""" | |
| if not chunks: | |
| return 0 | |
| texts = [c.text for c in chunks] | |
| embeddings = self.embedding_service.embed_documents(texts) | |
| now = datetime.now(UTC).isoformat() | |
| docs: list[dict] = [] | |
| for chunk, emb in zip(chunks, embeddings): | |
| doc = chunk.to_dict() | |
| doc["_id"] = f"{chunk.document_id}_{chunk.chunk_index}" | |
| doc["embedding"] = emb | |
| doc["indexed_at"] = now | |
| docs.append(doc) | |
| return self.opensearch_client.bulk_index(docs) | |