| """Service for processing documents and ingesting to vector store.""" |
|
|
| from langchain.text_splitter import RecursiveCharacterTextSplitter |
| from langchain_core.documents import Document as LangChainDocument |
| from src.db.postgres.vector_store import get_vector_store |
| from src.storage.az_blob.az_blob import blob_storage |
| from src.db.postgres.models import Document as DBDocument |
| from src.config.settings import settings |
| from sqlalchemy.ext.asyncio import AsyncSession |
| from src.middlewares.logging import get_logger |
| from azure.ai.documentintelligence.aio import DocumentIntelligenceClient |
| from azure.core.credentials import AzureKeyCredential |
| from typing import List |
| import pypdf |
| import docx |
| from io import BytesIO |
|
|
| logger = get_logger("knowledge_processing") |
|
|
|
|
| class KnowledgeProcessingService: |
| """Service for processing documents and ingesting to vector store.""" |
|
|
| def __init__(self): |
| self.text_splitter = RecursiveCharacterTextSplitter( |
| chunk_size=1000, |
| chunk_overlap=200, |
| length_function=len |
| ) |
|
|
| async def process_document(self, db_doc: DBDocument, db: AsyncSession) -> int: |
| """Process document and ingest to vector store. |
| |
| Returns: |
| Number of chunks ingested |
| """ |
| try: |
| logger.info(f"Processing document {db_doc.id}") |
| content = await blob_storage.download_file(db_doc.blob_name) |
|
|
| if db_doc.file_type == "pdf": |
| documents = await self._build_pdf_documents(content, db_doc) |
| else: |
| text = self._extract_text(content, db_doc.file_type) |
| if not text.strip(): |
| raise ValueError("No text extracted from document") |
| chunks = self.text_splitter.split_text(text) |
| documents = [ |
| LangChainDocument( |
| page_content=chunk, |
| metadata={ |
| "document_id": db_doc.id, |
| "user_id": db_doc.user_id, |
| "filename": db_doc.filename, |
| "chunk_index": i, |
| } |
| ) |
| for i, chunk in enumerate(chunks) |
| ] |
|
|
| if not documents: |
| raise ValueError("No text extracted from document") |
|
|
| vector_store = get_vector_store() |
| await vector_store.aadd_documents(documents) |
|
|
| logger.info(f"Processed {db_doc.id}: {len(documents)} chunks ingested") |
| return len(documents) |
|
|
| except Exception as e: |
| logger.error(f"Failed to process document {db_doc.id}", error=str(e)) |
| raise |
|
|
| async def _build_pdf_documents( |
| self, content: bytes, db_doc: DBDocument |
| ) -> List[LangChainDocument]: |
| """Build LangChain documents from PDF with page_label metadata. |
| |
| Uses Azure Document Intelligence (per-page) when credentials are present, |
| falls back to pypdf (also per-page) otherwise. |
| """ |
| documents: List[LangChainDocument] = [] |
|
|
| if settings.azureai_docintel_endpoint and settings.azureai_docintel_key: |
| async with DocumentIntelligenceClient( |
| endpoint=settings.azureai_docintel_endpoint, |
| credential=AzureKeyCredential(settings.azureai_docintel_key), |
| ) as client: |
| poller = await client.begin_analyze_document( |
| model_id="prebuilt-read", |
| body=BytesIO(content), |
| content_type="application/pdf", |
| ) |
| result = await poller.result() |
| logger.info(f"Azure DI extracted {len(result.pages or [])} pages") |
|
|
| for page in result.pages or []: |
| page_text = "\n".join( |
| line.content for line in (page.lines or []) |
| ) |
| if not page_text.strip(): |
| continue |
| for chunk in self.text_splitter.split_text(page_text): |
| documents.append(LangChainDocument( |
| page_content=chunk, |
| metadata={ |
| "document_id": db_doc.id, |
| "user_id": db_doc.user_id, |
| "filename": db_doc.filename, |
| "chunk_index": len(documents), |
| "page_label": page.page_number, |
| } |
| )) |
| else: |
| logger.warning("Azure DI not configured, using pypdf") |
| pdf_reader = pypdf.PdfReader(BytesIO(content)) |
| for page_num, page in enumerate(pdf_reader.pages, start=1): |
| page_text = page.extract_text() or "" |
| if not page_text.strip(): |
| continue |
| for chunk in self.text_splitter.split_text(page_text): |
| documents.append(LangChainDocument( |
| page_content=chunk, |
| metadata={ |
| "document_id": db_doc.id, |
| "user_id": db_doc.user_id, |
| "filename": db_doc.filename, |
| "chunk_index": len(documents), |
| "page_label": page_num, |
| } |
| )) |
|
|
| return documents |
|
|
| def _extract_text(self, content: bytes, file_type: str) -> str: |
| """Extract text from DOCX or TXT content.""" |
| if file_type == "docx": |
| doc = docx.Document(BytesIO(content)) |
| return "\n".join(p.text for p in doc.paragraphs) |
| elif file_type == "txt": |
| return content.decode("utf-8") |
| else: |
| raise ValueError(f"Unsupported file type: {file_type}") |
|
|
|
|
| knowledge_processor = KnowledgeProcessingService() |
|
|