Spaces:
Sleeping
Sleeping
| """ | |
| Knowledge Base Indexing and Retrieval using LlamaIndex | |
| Modern LlamaIndex framework integration with: | |
| - Foundation for knowledge base indexing (VectorStoreIndex, PropertyGraphIndex) | |
| - Vector similarity search with retrieval | |
| - Document retrieval with storage context | |
| - Ingestion pipeline for data processing | |
| """ | |
| import os | |
| from typing import List, Dict, Any, Optional, Union | |
| from pathlib import Path | |
| import logging | |
| from llama_index.core import ( | |
| VectorStoreIndex, | |
| SimpleDirectoryReader, | |
| Document, | |
| Settings, | |
| StorageContext, | |
| load_index_from_storage, | |
| ) | |
| from llama_index.core.ingestion import IngestionPipeline | |
| from llama_index.core.node_parser import SimpleNodeParser | |
| from llama_index.core.extractors import TitleExtractor, KeywordExtractor | |
| from llama_index.embeddings.openai import OpenAIEmbedding | |
| from llama_index.vector_stores.pinecone import PineconeVectorStore | |
| from llama_index.llms.openai import OpenAI | |
| from pydantic import BaseModel, Field | |
| logger = logging.getLogger(__name__) | |
| class IndexConfig(BaseModel): | |
| """Configuration for knowledge base index following LlamaIndex best practices""" | |
| # Embedding settings | |
| embedding_model: str = Field( | |
| default="text-embedding-3-small", | |
| description="OpenAI embedding model" | |
| ) | |
| # LLM settings | |
| llm_model: str = Field( | |
| default="gpt-4-turbo", | |
| description="OpenAI LLM for query/synthesis" | |
| ) | |
| # Chunking settings | |
| chunk_size: int = Field( | |
| default=1024, | |
| description="Size of text chunks" | |
| ) | |
| chunk_overlap: int = Field( | |
| default=20, | |
| description="Overlap between chunks" | |
| ) | |
| # Vector store backend | |
| use_pinecone: bool = Field( | |
| default=False, | |
| description="Use Pinecone for vector store" | |
| ) | |
| pinecone_index_name: str = Field( | |
| default="ecomcp-knowledge", | |
| description="Pinecone index name" | |
| ) | |
| pinecone_dimension: int = Field( | |
| default=1536, | |
| description="Dimension for embeddings" | |
| ) | |
| # Retrieval settings | |
| similarity_top_k: int = Field( | |
| default=5, | |
| description="Number of similar items to retrieve" | |
| ) | |
| # Storage settings | |
| persist_dir: str = Field( | |
| default="./kb_storage", | |
| description="Directory for persisting index" | |
| ) | |
| class KnowledgeBase: | |
| """ | |
| Knowledge base for indexing and retrieving product/documentation information | |
| """ | |
| def __init__(self, config: Optional[IndexConfig] = None): | |
| """ | |
| Initialize knowledge base with modern LlamaIndex patterns | |
| Args: | |
| config: IndexConfig object for customization | |
| """ | |
| self.config = config or IndexConfig() | |
| self.index = None | |
| self.retriever = None | |
| self.storage_context = None | |
| self.ingestion_pipeline = None | |
| self._setup_models() | |
| self._setup_ingestion_pipeline() | |
| def _setup_models(self): | |
| """Configure LLM and embedding models following LlamaIndex patterns""" | |
| api_key = os.getenv("OPENAI_API_KEY") | |
| if not api_key: | |
| logger.warning("OPENAI_API_KEY not set. Models may not work.") | |
| # Setup embedding model | |
| self.embed_model = OpenAIEmbedding( | |
| model=self.config.embedding_model, | |
| api_key=api_key, | |
| ) | |
| # Setup LLM | |
| self.llm = OpenAI( | |
| model=self.config.llm_model, | |
| api_key=api_key, | |
| ) | |
| # Configure global settings for LlamaIndex | |
| Settings.embed_model = self.embed_model | |
| Settings.llm = self.llm | |
| Settings.chunk_size = self.config.chunk_size | |
| Settings.chunk_overlap = self.config.chunk_overlap | |
| def _setup_ingestion_pipeline(self): | |
| """Setup ingestion pipeline with metadata extraction""" | |
| # Create node parser with metadata extraction | |
| node_parser = SimpleNodeParser.from_defaults( | |
| chunk_size=self.config.chunk_size, | |
| chunk_overlap=self.config.chunk_overlap, | |
| ) | |
| # Create metadata extractors | |
| extractors = [ | |
| TitleExtractor(nodes=5), | |
| KeywordExtractor(keywords=10), | |
| ] | |
| # Create pipeline | |
| self.ingestion_pipeline = IngestionPipeline( | |
| transformations=[node_parser] + extractors, | |
| ) | |
| def index_documents(self, documents_path: str) -> VectorStoreIndex: | |
| """ | |
| Index documents from a directory using ingestion pipeline | |
| Args: | |
| documents_path: Path to directory containing documents | |
| Returns: | |
| VectorStoreIndex: Indexed documents | |
| """ | |
| logger.info(f"Indexing documents from {documents_path}") | |
| if not os.path.exists(documents_path): | |
| logger.error(f"Document path not found: {documents_path}") | |
| raise FileNotFoundError(f"Document path not found: {documents_path}") | |
| # Load documents | |
| reader = SimpleDirectoryReader(documents_path) | |
| documents = reader.load_data() | |
| logger.info(f"Loaded {len(documents)} documents") | |
| # Process through ingestion pipeline | |
| nodes = self.ingestion_pipeline.run(documents=documents) | |
| logger.info(f"Processed into {len(nodes)} nodes with metadata") | |
| # Create storage context | |
| if self.config.use_pinecone: | |
| self.storage_context = self._create_pinecone_storage() | |
| else: | |
| self.storage_context = StorageContext.from_defaults() | |
| # Create index from nodes | |
| self.index = VectorStoreIndex( | |
| nodes=nodes, | |
| storage_context=self.storage_context, | |
| show_progress=True, | |
| ) | |
| # Create retriever with configured top_k | |
| self.retriever = self.index.as_retriever( | |
| similarity_top_k=self.config.similarity_top_k | |
| ) | |
| logger.info(f"Index created successfully with {len(nodes)} nodes") | |
| return self.index | |
| def _create_pinecone_storage(self) -> StorageContext: | |
| """ | |
| Create Pinecone-backed storage context | |
| Returns: | |
| StorageContext backed by Pinecone | |
| """ | |
| try: | |
| from pinecone import Pinecone | |
| api_key = os.getenv("PINECONE_API_KEY") | |
| if not api_key: | |
| logger.warning("PINECONE_API_KEY not set. Falling back to in-memory storage.") | |
| return StorageContext.from_defaults() | |
| pc = Pinecone(api_key=api_key) | |
| # Get or create index | |
| index_name = self.config.pinecone_index_name | |
| if index_name not in pc.list_indexes().names(): | |
| logger.info(f"Creating Pinecone index: {index_name}") | |
| pc.create_index( | |
| name=index_name, | |
| dimension=self.config.pinecone_dimension, | |
| metric="cosine" | |
| ) | |
| pinecone_index = pc.Index(index_name) | |
| vector_store = PineconeVectorStore(pinecone_index=pinecone_index) | |
| return StorageContext.from_defaults(vector_store=vector_store) | |
| except ImportError: | |
| logger.warning("Pinecone not available. Falling back to in-memory storage.") | |
| return StorageContext.from_defaults() | |
| def add_documents(self, documents: List[Document]) -> None: | |
| """ | |
| Add documents to existing index | |
| Args: | |
| documents: List of documents to add | |
| """ | |
| if self.index is None: | |
| raise ValueError("Index not initialized. Call index_documents() first.") | |
| logger.info(f"Adding {len(documents)} documents to index") | |
| for doc in documents: | |
| self.index.insert(doc) | |
| def search(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]: | |
| """ | |
| Search knowledge base by query | |
| Args: | |
| query: Search query string | |
| top_k: Number of top results to return | |
| Returns: | |
| List of results with score and content | |
| """ | |
| if self.index is None: | |
| logger.error("Index not initialized") | |
| return [] | |
| try: | |
| results = self.index.as_retriever(similarity_top_k=top_k).retrieve(query) | |
| output = [] | |
| for node in results: | |
| output.append({ | |
| "content": node.get_content(), | |
| "score": node.score if hasattr(node, 'score') else None, | |
| "metadata": node.metadata if hasattr(node, 'metadata') else {}, | |
| }) | |
| return output | |
| except Exception as e: | |
| logger.error(f"Search error: {e}") | |
| return [] | |
| def query(self, query_str: str, top_k: Optional[int] = None) -> str: | |
| """ | |
| Query knowledge base with natural language using query engine | |
| Args: | |
| query_str: Natural language query | |
| top_k: Number of top results to use (uses config if not specified) | |
| Returns: | |
| Query response string | |
| """ | |
| if self.index is None: | |
| return "Index not initialized" | |
| try: | |
| if top_k is None: | |
| top_k = self.config.similarity_top_k | |
| # Create query engine with response synthesis | |
| query_engine = self.index.as_query_engine( | |
| similarity_top_k=top_k, | |
| response_mode="compact", # or "tree_summarize", "refine" | |
| ) | |
| response = query_engine.query(query_str) | |
| return str(response) | |
| except Exception as e: | |
| logger.error(f"Query error: {e}") | |
| return f"Error processing query: {e}" | |
| def chat(self, messages: List[Dict[str, str]]) -> str: | |
| """ | |
| Multi-turn chat with knowledge base | |
| Args: | |
| messages: List of messages in format [{"role": "user", "content": "..."}, ...] | |
| Returns: | |
| Chat response string | |
| """ | |
| if self.index is None: | |
| return "Index not initialized" | |
| try: | |
| # Create chat engine for conversational interface | |
| chat_engine = self.index.as_chat_engine() | |
| # Process last user message | |
| last_message = None | |
| for msg in reversed(messages): | |
| if msg.get("role") == "user": | |
| last_message = msg.get("content") | |
| break | |
| if not last_message: | |
| return "No user message found" | |
| response = chat_engine.chat(last_message) | |
| return str(response) | |
| except Exception as e: | |
| logger.error(f"Chat error: {e}") | |
| return f"Error processing chat: {e}" | |
| def save_index(self, output_path: str) -> None: | |
| """ | |
| Save index to disk | |
| Args: | |
| output_path: Path to save index | |
| """ | |
| if self.index is None: | |
| logger.warning("No index to save") | |
| return | |
| Path(output_path).mkdir(parents=True, exist_ok=True) | |
| self.index.storage_context.persist(persist_dir=output_path) | |
| logger.info(f"Index saved to {output_path}") | |
| def load_index(self, input_path: str) -> VectorStoreIndex: | |
| """ | |
| Load index from disk | |
| Args: | |
| input_path: Path to saved index | |
| Returns: | |
| Loaded VectorStoreIndex | |
| """ | |
| if not os.path.exists(input_path): | |
| logger.error(f"Index path not found: {input_path}") | |
| raise FileNotFoundError(f"Index path not found: {input_path}") | |
| # Load storage context from disk | |
| self.storage_context = StorageContext.from_defaults(persist_dir=input_path) | |
| self.index = load_index_from_storage( | |
| self.storage_context, | |
| settings=Settings, # Use current settings | |
| ) | |
| self.retriever = self.index.as_retriever( | |
| similarity_top_k=self.config.similarity_top_k | |
| ) | |
| logger.info(f"Index loaded from {input_path}") | |
| return self.index | |
| def get_index_info(self) -> Dict[str, Any]: | |
| """Get information about current index""" | |
| if self.index is None: | |
| return {"status": "No index loaded"} | |
| return { | |
| "status": "Index loaded", | |
| "embedding_model": self.config.embedding_model, | |
| "chunk_size": self.config.chunk_size, | |
| "vector_store": "Pinecone" if self.config.use_pinecone else "In-memory", | |
| } | |