""" Ingestion API for UI integration. Provides functions to ingest documents from a directory and optionally sync to Pinecone. Supports both legacy markdown-only loading and multi-format loading via Docling. """ import json import os import logging from pathlib import Path from typing import Dict, Any, List, Optional from dataclasses import dataclass from src.ingestion.load_docs import load_markdown_docs from src.ingestion.chunker import chunk_documents, chunk_documents_with_structure from src.ingestion.embeddings import batch_embed_chunks logger = logging.getLogger(__name__) # Try to import Docling loader (optional dependency) DOCLING_AVAILABLE = False try: from src.ingestion.docling_loader import ( load_documents_with_docling, convert_to_legacy_format, SUPPORTED_EXTENSIONS ) DOCLING_AVAILABLE = True except ImportError: logger.info("Docling not available, using markdown-only loader") SUPPORTED_EXTENSIONS = {".md", ".markdown"} @dataclass class IngestionResult: """Result of document ingestion.""" status: str documents: int chunks: int output_path: str errors: List[str] @dataclass class SyncResult: """Result of Pinecone sync.""" status: str vectors_upserted: int errors: List[str] def ingest_from_directory( docs_dir: str, output_path: str = "data/chunks.jsonl", provider: str = "sentence-transformers", dim: int = 384, use_docling: bool = True, extensions: Optional[List[str]] = None, use_structure: bool = True, recursive: bool = False ) -> IngestionResult: """ Ingest documents from a directory and save to chunks.jsonl. Args: docs_dir: Path to directory containing documents output_path: Path to save chunks.jsonl provider: Embedding provider ("sentence-transformers" or "local") dim: Embedding dimension use_docling: Use Docling for multi-format parsing (if available) extensions: File extensions to process (None = all supported) use_structure: Use structure-aware chunking (requires Docling) recursive: Search subdirectories recursively Returns: IngestionResult with status and counts """ errors = [] # Validate directory if not os.path.isdir(docs_dir): return IngestionResult( status="error", documents=0, chunks=0, output_path=output_path, errors=[f"Directory not found: {docs_dir}"] ) try: # Choose loader based on availability and preference if use_docling and DOCLING_AVAILABLE: logger.info("Using Docling for multi-format document loading") parsed_docs = load_documents_with_docling( docs_dir, extensions=extensions, recursive=recursive ) docs = convert_to_legacy_format(parsed_docs) else: logger.info("Using legacy markdown loader") docs = load_markdown_docs(docs_dir) use_structure = False # No structure without Docling if not docs: return IngestionResult( status="warning", documents=0, chunks=0, output_path=output_path, errors=["No documents found in directory"] ) # Count successful loads doc_count = len([d for d in docs if d.get("status") == "OK"]) # Chunk documents (structure-aware or legacy) if use_structure and DOCLING_AVAILABLE: chunks = chunk_documents_with_structure( docs, max_tokens=300, overlap=50, use_structure=True ) else: chunks = chunk_documents(docs, max_tokens=300, overlap=50) if not chunks: return IngestionResult( status="warning", documents=doc_count, chunks=0, output_path=output_path, errors=["No chunks generated from documents"] ) # Generate embeddings embedded = batch_embed_chunks(chunks, provider=provider, dim=dim) # Merge text and metadata back into embedded chunks chunk_map = {(c["filename"], c["chunk_id"]): c for c in chunks} for e in embedded: key = (e["filename"], e["chunk_id"]) if key in chunk_map: src = chunk_map[key] e["text"] = src.get("text", "") e["element_type"] = src.get("element_type", "text") e["section_heading"] = src.get("section_heading", "") # Save to file save_path = Path(output_path) save_path.parent.mkdir(parents=True, exist_ok=True) with save_path.open("w", encoding="utf-8") as fh: for e in embedded: obj = { "id": f"{e['filename']}::{e['chunk_id']}", "filename": e["filename"], "chunk_id": e["chunk_id"], "text": e.get("text", ""), "chars": e.get("chars", 0), "element_type": e.get("element_type", "text"), "section_heading": e.get("section_heading", ""), "embedding": e["embedding"] } fh.write(json.dumps(obj, ensure_ascii=False) + "\n") return IngestionResult( status="success", documents=doc_count, chunks=len(embedded), output_path=output_path, errors=errors ) except Exception as e: logger.exception("Ingestion failed") return IngestionResult( status="error", documents=0, chunks=0, output_path=output_path, errors=[str(e)] ) def sync_to_pinecone( chunks_path: str = "data/chunks.jsonl", index_name: str = None, batch_size: int = 100 ) -> SyncResult: """ Upload embeddings from chunks.jsonl to Pinecone. Args: chunks_path: Path to chunks.jsonl file index_name: Pinecone index name (uses config default if None) batch_size: Number of vectors to upsert per batch Returns: SyncResult with status and count """ errors = [] # Validate file exists if not os.path.isfile(chunks_path): return SyncResult( status="error", vectors_upserted=0, errors=[f"Chunks file not found: {chunks_path}"] ) try: # Load Pinecone config import src.config as cfg from pinecone import Pinecone if index_name is None: index_name = cfg.PINECONE_INDEX_NAME # Initialize Pinecone pc = Pinecone(api_key=cfg.PINECONE_API_KEY) idx_meta = pc.describe_index(index_name) # Get host host = getattr(idx_meta, "host", None) or idx_meta.get("host") if not host: return SyncResult( status="error", vectors_upserted=0, errors=[f"Could not get host for index: {index_name}"] ) index = pc.Index(host=host) # Load chunks chunks = [] with open(chunks_path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if line: chunks.append(json.loads(line)) if not chunks: return SyncResult( status="warning", vectors_upserted=0, errors=["No chunks to upload"] ) # Prepare vectors vectors = [] for chunk in chunks: embedding = chunk.get("embedding", []) if not embedding: continue vectors.append({ "id": chunk["id"], "values": embedding, "metadata": { "filename": chunk.get("filename", ""), "chunk_id": chunk.get("chunk_id", 0), "text": chunk.get("text", "")[:1000] # Limit metadata size } }) # Upsert in batches upserted = 0 for i in range(0, len(vectors), batch_size): batch = vectors[i:i + batch_size] try: index.upsert(vectors=batch) upserted += len(batch) except Exception as e: errors.append(f"Batch {i // batch_size} failed: {str(e)[:100]}") return SyncResult( status="success" if not errors else "partial", vectors_upserted=upserted, errors=errors ) except Exception as e: return SyncResult( status="error", vectors_upserted=0, errors=[str(e)] ) def get_supported_formats() -> Dict[str, Any]: """ Get information about supported document formats. Returns: Dict with docling availability and supported extensions """ return { "docling_available": DOCLING_AVAILABLE, "supported_extensions": list(SUPPORTED_EXTENSIONS), "loader": "docling" if DOCLING_AVAILABLE else "markdown-only" } def get_index_status(chunks_path: str = "data/chunks.jsonl") -> Dict[str, Any]: """ Get status of the current index. Args: chunks_path: Path to chunks.jsonl file Returns: Dict with chunk count, document count, and file info """ if not os.path.isfile(chunks_path): return { "exists": False, "chunks": 0, "documents": 0, "path": chunks_path } try: chunks = 0 documents = set() with open(chunks_path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if line: obj = json.loads(line) chunks += 1 documents.add(obj.get("filename", "")) return { "exists": True, "chunks": chunks, "documents": len(documents), "path": chunks_path } except Exception as e: return { "exists": True, "chunks": 0, "documents": 0, "path": chunks_path, "error": str(e) }