"""Service for processing documents and ingesting to vector store.""" from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_core.documents import Document as LangChainDocument from src.db.postgres.vector_store import get_vector_store from src.storage.az_blob.az_blob import blob_storage from src.db.postgres.models import Document as DBDocument from src.config.settings import settings from sqlalchemy.ext.asyncio import AsyncSession from src.middlewares.logging import get_logger from azure.ai.documentintelligence.aio import DocumentIntelligenceClient from azure.core.credentials import AzureKeyCredential from typing import List import pypdf import docx from io import BytesIO logger = get_logger("knowledge_processing") class KnowledgeProcessingService: """Service for processing documents and ingesting to vector store.""" def __init__(self): self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200, length_function=len ) async def process_document(self, db_doc: DBDocument, db: AsyncSession) -> int: """Process document and ingest to vector store. Returns: Number of chunks ingested """ try: logger.info(f"Processing document {db_doc.id}") content = await blob_storage.download_file(db_doc.blob_name) if db_doc.file_type == "pdf": documents = await self._build_pdf_documents(content, db_doc) else: text = self._extract_text(content, db_doc.file_type) if not text.strip(): raise ValueError("No text extracted from document") chunks = self.text_splitter.split_text(text) documents = [ LangChainDocument( page_content=chunk, metadata={ "document_id": db_doc.id, "user_id": db_doc.user_id, "filename": db_doc.filename, "chunk_index": i, } ) for i, chunk in enumerate(chunks) ] if not documents: raise ValueError("No text extracted from document") vector_store = get_vector_store() await vector_store.aadd_documents(documents) logger.info(f"Processed {db_doc.id}: {len(documents)} chunks ingested") return len(documents) except Exception as e: logger.error(f"Failed to process document {db_doc.id}", error=str(e)) raise async def _build_pdf_documents( self, content: bytes, db_doc: DBDocument ) -> List[LangChainDocument]: """Build LangChain documents from PDF with page_label metadata. Uses Azure Document Intelligence (per-page) when credentials are present, falls back to pypdf (also per-page) otherwise. """ documents: List[LangChainDocument] = [] if settings.azureai_docintel_endpoint and settings.azureai_docintel_key: async with DocumentIntelligenceClient( endpoint=settings.azureai_docintel_endpoint, credential=AzureKeyCredential(settings.azureai_docintel_key), ) as client: poller = await client.begin_analyze_document( model_id="prebuilt-read", body=BytesIO(content), content_type="application/pdf", ) result = await poller.result() logger.info(f"Azure DI extracted {len(result.pages or [])} pages") for page in result.pages or []: page_text = "\n".join( line.content for line in (page.lines or []) ) if not page_text.strip(): continue for chunk in self.text_splitter.split_text(page_text): documents.append(LangChainDocument( page_content=chunk, metadata={ "document_id": db_doc.id, "user_id": db_doc.user_id, "filename": db_doc.filename, "chunk_index": len(documents), "page_label": page.page_number, } )) else: logger.warning("Azure DI not configured, using pypdf") pdf_reader = pypdf.PdfReader(BytesIO(content)) for page_num, page in enumerate(pdf_reader.pages, start=1): page_text = page.extract_text() or "" if not page_text.strip(): continue for chunk in self.text_splitter.split_text(page_text): documents.append(LangChainDocument( page_content=chunk, metadata={ "document_id": db_doc.id, "user_id": db_doc.user_id, "filename": db_doc.filename, "chunk_index": len(documents), "page_label": page_num, } )) return documents def _extract_text(self, content: bytes, file_type: str) -> str: """Extract text from DOCX or TXT content.""" if file_type == "docx": doc = docx.Document(BytesIO(content)) return "\n".join(p.text for p in doc.paragraphs) elif file_type == "txt": return content.decode("utf-8") else: raise ValueError(f"Unsupported file type: {file_type}") knowledge_processor = KnowledgeProcessingService()