#!/usr/bin/env python """Incrementally add documents to a llamaindex knowledge base.""" from __future__ import annotations import argparse import asyncio from datetime import datetime import hashlib import json import os from pathlib import Path import shutil from typing import List, Optional from dotenv import load_dotenv from deeptutor.logging import get_logger from deeptutor.services.rag.factory import DEFAULT_PROVIDER from deeptutor.services.rag.pipelines.llamaindex import LlamaIndexPipeline from deeptutor.services.path_service import PathService logger = get_logger("KnowledgeInit") DEFAULT_BASE_DIR = str(PathService.get_instance().get_knowledge_bases_dir()) class DocumentAdder: """Add documents to an existing llamaindex knowledge base.""" def __init__( self, kb_name: str, base_dir: str = DEFAULT_BASE_DIR, api_key: str | None = None, base_url: str | None = None, progress_tracker=None, rag_provider: str | None = None, ): self.kb_name = kb_name self.base_dir = Path(base_dir) self.kb_dir = self.base_dir / kb_name if not self.kb_dir.exists(): raise ValueError(f"Knowledge base does not exist: {kb_name}") self.raw_dir = self.kb_dir / "raw" self.llamaindex_storage_dir = self.kb_dir / "llamaindex_storage" self.legacy_rag_storage_dir = self.kb_dir / "rag_storage" self.metadata_file = self.kb_dir / "metadata.json" if not self.llamaindex_storage_dir.exists() and self.legacy_rag_storage_dir.exists(): raise ValueError( f"Knowledge base '{kb_name}' uses legacy index format and requires reindex before incremental add" ) if not self.llamaindex_storage_dir.exists(): raise ValueError(f"Knowledge base not initialized (llamaindex): {kb_name}") if rag_provider and rag_provider != DEFAULT_PROVIDER: logger.warning( f"Requested provider '{rag_provider}' ignored. Using '{DEFAULT_PROVIDER}' for consistency." ) self.api_key = api_key self.base_url = base_url self.progress_tracker = progress_tracker self.raw_dir.mkdir(parents=True, exist_ok=True) def _get_file_hash(self, file_path: Path) -> str: sha256_hash = hashlib.sha256() with open(file_path, "rb") as f: for byte_block in iter(lambda: f.read(65536), b""): sha256_hash.update(byte_block) return sha256_hash.hexdigest() def get_ingested_hashes(self) -> dict[str, str]: if self.metadata_file.exists(): try: with open(self.metadata_file, "r", encoding="utf-8") as f: data = json.load(f) return data.get("file_hashes", {}) except Exception: return {} return {} def add_documents(self, source_files: List[str], allow_duplicates: bool = False) -> List[Path]: """Validate and stage files into raw/ before indexing.""" logger.info(f"Validating documents for '{self.kb_name}'...") ingested_hashes = self.get_ingested_hashes() files_to_process: list[Path] = [] for source in source_files: source_path = Path(source) if not source_path.exists() or not source_path.is_file(): logger.warning(f"Missing file: {source}") continue current_hash = self._get_file_hash(source_path) if current_hash in ingested_hashes.values() and not allow_duplicates: logger.info(f"Skipped (content already indexed): {source_path.name}") continue dest_path = self.raw_dir / source_path.name if dest_path.exists(): dest_hash = self._get_file_hash(dest_path) if dest_hash == current_hash: logger.info(f"Recovering staged file: {source_path.name}") files_to_process.append(dest_path) continue if not allow_duplicates: logger.info(f"Skipped (filename collision): {source_path.name}") continue shutil.copy2(source_path, dest_path) logger.info(f"Staged to raw: {source_path.name}") files_to_process.append(dest_path) return files_to_process async def process_new_documents(self, new_files: List[Path]) -> List[Path]: """Index staged files via llamaindex incremental add.""" if not new_files: return [] pipeline = LlamaIndexPipeline(kb_base_dir=str(self.base_dir)) processed_files: list[Path] = [] total_files = len(new_files) for idx, doc_file in enumerate(new_files, 1): try: if self.progress_tracker is not None: from deeptutor.knowledge.progress_tracker import ProgressStage self.progress_tracker.update( ProgressStage.PROCESSING_FILE, f"Indexing (LlamaIndex) {doc_file.name}", current=idx, total=total_files, ) success = await pipeline.add_documents(self.kb_name, [str(doc_file)]) if success: processed_files.append(doc_file) self._record_successful_hash(doc_file) logger.info(f"Processed (LlamaIndex): {doc_file.name}") else: logger.error(f"Failed to index: {doc_file.name}") except Exception as e: logger.exception(f"Failed {doc_file.name}: {e}") return processed_files def _record_successful_hash(self, file_path: Path) -> None: file_hash = self._get_file_hash(file_path) metadata: dict = {} if self.metadata_file.exists(): try: with open(self.metadata_file, "r", encoding="utf-8") as f: metadata = json.load(f) except Exception: metadata = {} metadata.setdefault("file_hashes", {})[file_path.name] = file_hash with open(self.metadata_file, "w", encoding="utf-8") as f: json.dump(metadata, f, indent=2, ensure_ascii=False) def extract_numbered_items_for_new_docs(self, processed_files: List[Path], batch_size: int = 20) -> None: """Compatibility no-op: numbered-item extraction is deprecated.""" _ = batch_size if processed_files: logger.info("Skipping numbered items extraction for incremental add (feature removed)") def update_metadata(self, added_count: int) -> None: """Update metadata after incremental add.""" metadata: dict = {} if self.metadata_file.exists(): try: with open(self.metadata_file, "r", encoding="utf-8") as f: metadata = json.load(f) except Exception: metadata = {} metadata["rag_provider"] = DEFAULT_PROVIDER metadata["needs_reindex"] = False metadata["last_updated"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") history = metadata.get("update_history", []) history.append( { "timestamp": metadata["last_updated"], "action": "incremental_add", "count": added_count, "provider": DEFAULT_PROVIDER, } ) metadata["update_history"] = history with open(self.metadata_file, "w", encoding="utf-8") as f: json.dump(metadata, f, indent=2, ensure_ascii=False) async def add_documents( kb_name: str, source_files: list[str], base_dir: str = DEFAULT_BASE_DIR, api_key: Optional[str] = None, base_url: Optional[str] = None, allow_duplicates: bool = False, ) -> int: """Convenience function used by CLI wrappers.""" from deeptutor.knowledge.manager import KnowledgeBaseManager manager = KnowledgeBaseManager(base_dir=base_dir) try: manager.update_kb_status( name=kb_name, status="processing", progress={ "stage": "processing_documents", "message": "Processing uploaded documents...", "percent": 0, "current": 0, "total": max(len(source_files), 1), "file_name": "", "error": None, "timestamp": datetime.now().isoformat(), }, ) adder = DocumentAdder( kb_name=kb_name, base_dir=base_dir, api_key=api_key, base_url=base_url, rag_provider=DEFAULT_PROVIDER, ) new_files = adder.add_documents(source_files, allow_duplicates=allow_duplicates) if not new_files: manager.update_kb_status( name=kb_name, status="ready", progress={ "stage": "completed", "message": "No new unique documents to process.", "percent": 100, "current": 1, "total": 1, "file_name": "", "error": None, "timestamp": datetime.now().isoformat(), }, ) return 0 processed = await adder.process_new_documents(new_files) adder.extract_numbered_items_for_new_docs(processed) adder.update_metadata(len(processed)) manager.update_kb_status( name=kb_name, status="ready", progress={ "stage": "completed", "message": f"Successfully processed {len(processed)} files!", "percent": 100, "current": len(processed), "total": max(len(new_files), 1), "file_name": "", "error": None, "timestamp": datetime.now().isoformat(), }, ) return len(processed) except Exception as exc: manager.update_kb_status( name=kb_name, status="error", progress={ "stage": "error", "message": "Document upload failed", "percent": 0, "current": 0, "total": max(len(source_files), 1), "file_name": "", "error": str(exc), "timestamp": datetime.now().isoformat(), }, ) raise async def main() -> None: parser = argparse.ArgumentParser(description="Incrementally add documents to a KB") parser.add_argument("kb_name", help="KB Name") parser.add_argument("--docs", nargs="+", help="Files") parser.add_argument("--docs-dir", help="Directory") parser.add_argument("--base-dir", default=DEFAULT_BASE_DIR) parser.add_argument("--api-key", default=os.getenv("LLM_API_KEY")) parser.add_argument("--base-url", default=os.getenv("LLM_HOST")) parser.add_argument("--allow-duplicates", action="store_true") args = parser.parse_args() load_dotenv() doc_files: list[str] = [] if args.docs: doc_files.extend(args.docs) if args.docs_dir: p = Path(args.docs_dir) for ext in ["*.pdf", "*.txt", "*.md", "*.json", "*.csv"]: doc_files.extend([str(f) for f in p.glob(ext)]) if not doc_files: logger.error("No documents provided.") return processed_count = await add_documents( kb_name=args.kb_name, source_files=doc_files, base_dir=args.base_dir, api_key=args.api_key, base_url=args.base_url, allow_duplicates=args.allow_duplicates, ) if processed_count: logger.info(f"Done! Successfully added {processed_count} documents.") else: logger.info("No new unique documents to add.") if __name__ == "__main__": asyncio.run(main())