""" Knowledge Base Indexing and Retrieval using LlamaIndex Modern LlamaIndex framework integration with: - Foundation for knowledge base indexing (VectorStoreIndex, PropertyGraphIndex) - Vector similarity search with retrieval - Document retrieval with storage context - Ingestion pipeline for data processing """ import os from typing import List, Dict, Any, Optional, Union from pathlib import Path import logging from llama_index.core import ( VectorStoreIndex, SimpleDirectoryReader, Document, Settings, StorageContext, load_index_from_storage, ) from llama_index.core.ingestion import IngestionPipeline from llama_index.core.node_parser import SimpleNodeParser from llama_index.core.extractors import TitleExtractor, KeywordExtractor from llama_index.embeddings.openai import OpenAIEmbedding from llama_index.vector_stores.pinecone import PineconeVectorStore from llama_index.llms.openai import OpenAI from pydantic import BaseModel, Field logger = logging.getLogger(__name__) class IndexConfig(BaseModel): """Configuration for knowledge base index following LlamaIndex best practices""" # Embedding settings embedding_model: str = Field( default="text-embedding-3-small", description="OpenAI embedding model" ) # LLM settings llm_model: str = Field( default="gpt-4-turbo", description="OpenAI LLM for query/synthesis" ) # Chunking settings chunk_size: int = Field( default=1024, description="Size of text chunks" ) chunk_overlap: int = Field( default=20, description="Overlap between chunks" ) # Vector store backend use_pinecone: bool = Field( default=False, description="Use Pinecone for vector store" ) pinecone_index_name: str = Field( default="ecomcp-knowledge", description="Pinecone index name" ) pinecone_dimension: int = Field( default=1536, description="Dimension for embeddings" ) # Retrieval settings similarity_top_k: int = Field( default=5, description="Number of similar items to retrieve" ) # Storage settings persist_dir: str = Field( default="./kb_storage", description="Directory for persisting index" ) class KnowledgeBase: """ Knowledge base for indexing and retrieving product/documentation information """ def __init__(self, config: Optional[IndexConfig] = None): """ Initialize knowledge base with modern LlamaIndex patterns Args: config: IndexConfig object for customization """ self.config = config or IndexConfig() self.index = None self.retriever = None self.storage_context = None self.ingestion_pipeline = None self._setup_models() self._setup_ingestion_pipeline() def _setup_models(self): """Configure LLM and embedding models following LlamaIndex patterns""" api_key = os.getenv("OPENAI_API_KEY") if not api_key: logger.warning("OPENAI_API_KEY not set. Models may not work.") # Setup embedding model self.embed_model = OpenAIEmbedding( model=self.config.embedding_model, api_key=api_key, ) # Setup LLM self.llm = OpenAI( model=self.config.llm_model, api_key=api_key, ) # Configure global settings for LlamaIndex Settings.embed_model = self.embed_model Settings.llm = self.llm Settings.chunk_size = self.config.chunk_size Settings.chunk_overlap = self.config.chunk_overlap def _setup_ingestion_pipeline(self): """Setup ingestion pipeline with metadata extraction""" # Create node parser with metadata extraction node_parser = SimpleNodeParser.from_defaults( chunk_size=self.config.chunk_size, chunk_overlap=self.config.chunk_overlap, ) # Create metadata extractors extractors = [ TitleExtractor(nodes=5), KeywordExtractor(keywords=10), ] # Create pipeline self.ingestion_pipeline = IngestionPipeline( transformations=[node_parser] + extractors, ) def index_documents(self, documents_path: str) -> VectorStoreIndex: """ Index documents from a directory using ingestion pipeline Args: documents_path: Path to directory containing documents Returns: VectorStoreIndex: Indexed documents """ logger.info(f"Indexing documents from {documents_path}") if not os.path.exists(documents_path): logger.error(f"Document path not found: {documents_path}") raise FileNotFoundError(f"Document path not found: {documents_path}") # Load documents reader = SimpleDirectoryReader(documents_path) documents = reader.load_data() logger.info(f"Loaded {len(documents)} documents") # Process through ingestion pipeline nodes = self.ingestion_pipeline.run(documents=documents) logger.info(f"Processed into {len(nodes)} nodes with metadata") # Create storage context if self.config.use_pinecone: self.storage_context = self._create_pinecone_storage() else: self.storage_context = StorageContext.from_defaults() # Create index from nodes self.index = VectorStoreIndex( nodes=nodes, storage_context=self.storage_context, show_progress=True, ) # Create retriever with configured top_k self.retriever = self.index.as_retriever( similarity_top_k=self.config.similarity_top_k ) logger.info(f"Index created successfully with {len(nodes)} nodes") return self.index def _create_pinecone_storage(self) -> StorageContext: """ Create Pinecone-backed storage context Returns: StorageContext backed by Pinecone """ try: from pinecone import Pinecone api_key = os.getenv("PINECONE_API_KEY") if not api_key: logger.warning("PINECONE_API_KEY not set. Falling back to in-memory storage.") return StorageContext.from_defaults() pc = Pinecone(api_key=api_key) # Get or create index index_name = self.config.pinecone_index_name if index_name not in pc.list_indexes().names(): logger.info(f"Creating Pinecone index: {index_name}") pc.create_index( name=index_name, dimension=self.config.pinecone_dimension, metric="cosine" ) pinecone_index = pc.Index(index_name) vector_store = PineconeVectorStore(pinecone_index=pinecone_index) return StorageContext.from_defaults(vector_store=vector_store) except ImportError: logger.warning("Pinecone not available. Falling back to in-memory storage.") return StorageContext.from_defaults() def add_documents(self, documents: List[Document]) -> None: """ Add documents to existing index Args: documents: List of documents to add """ if self.index is None: raise ValueError("Index not initialized. Call index_documents() first.") logger.info(f"Adding {len(documents)} documents to index") for doc in documents: self.index.insert(doc) def search(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]: """ Search knowledge base by query Args: query: Search query string top_k: Number of top results to return Returns: List of results with score and content """ if self.index is None: logger.error("Index not initialized") return [] try: results = self.index.as_retriever(similarity_top_k=top_k).retrieve(query) output = [] for node in results: output.append({ "content": node.get_content(), "score": node.score if hasattr(node, 'score') else None, "metadata": node.metadata if hasattr(node, 'metadata') else {}, }) return output except Exception as e: logger.error(f"Search error: {e}") return [] def query(self, query_str: str, top_k: Optional[int] = None) -> str: """ Query knowledge base with natural language using query engine Args: query_str: Natural language query top_k: Number of top results to use (uses config if not specified) Returns: Query response string """ if self.index is None: return "Index not initialized" try: if top_k is None: top_k = self.config.similarity_top_k # Create query engine with response synthesis query_engine = self.index.as_query_engine( similarity_top_k=top_k, response_mode="compact", # or "tree_summarize", "refine" ) response = query_engine.query(query_str) return str(response) except Exception as e: logger.error(f"Query error: {e}") return f"Error processing query: {e}" def chat(self, messages: List[Dict[str, str]]) -> str: """ Multi-turn chat with knowledge base Args: messages: List of messages in format [{"role": "user", "content": "..."}, ...] Returns: Chat response string """ if self.index is None: return "Index not initialized" try: # Create chat engine for conversational interface chat_engine = self.index.as_chat_engine() # Process last user message last_message = None for msg in reversed(messages): if msg.get("role") == "user": last_message = msg.get("content") break if not last_message: return "No user message found" response = chat_engine.chat(last_message) return str(response) except Exception as e: logger.error(f"Chat error: {e}") return f"Error processing chat: {e}" def save_index(self, output_path: str) -> None: """ Save index to disk Args: output_path: Path to save index """ if self.index is None: logger.warning("No index to save") return Path(output_path).mkdir(parents=True, exist_ok=True) self.index.storage_context.persist(persist_dir=output_path) logger.info(f"Index saved to {output_path}") def load_index(self, input_path: str) -> VectorStoreIndex: """ Load index from disk Args: input_path: Path to saved index Returns: Loaded VectorStoreIndex """ if not os.path.exists(input_path): logger.error(f"Index path not found: {input_path}") raise FileNotFoundError(f"Index path not found: {input_path}") # Load storage context from disk self.storage_context = StorageContext.from_defaults(persist_dir=input_path) self.index = load_index_from_storage( self.storage_context, settings=Settings, # Use current settings ) self.retriever = self.index.as_retriever( similarity_top_k=self.config.similarity_top_k ) logger.info(f"Index loaded from {input_path}") return self.index def get_index_info(self) -> Dict[str, Any]: """Get information about current index""" if self.index is None: return {"status": "No index loaded"} return { "status": "Index loaded", "embedding_model": self.config.embedding_model, "chunk_size": self.config.chunk_size, "vector_store": "Pinecone" if self.config.use_pinecone else "In-memory", }