ecomcp / src /core /knowledge_base.py
vinhnx90's picture
Enhance server and UI with logging, error handling, and new features
216bd52
"""
Knowledge Base Indexing and Retrieval using LlamaIndex
Modern LlamaIndex framework integration with:
- Foundation for knowledge base indexing (VectorStoreIndex, PropertyGraphIndex)
- Vector similarity search with retrieval
- Document retrieval with storage context
- Ingestion pipeline for data processing
"""
import os
from typing import List, Dict, Any, Optional, Union
from pathlib import Path
import logging
from llama_index.core import (
VectorStoreIndex,
SimpleDirectoryReader,
Document,
Settings,
StorageContext,
load_index_from_storage,
)
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.node_parser import SimpleNodeParser
from llama_index.core.extractors import TitleExtractor, KeywordExtractor
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.vector_stores.pinecone import PineconeVectorStore
from llama_index.llms.openai import OpenAI
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
class IndexConfig(BaseModel):
"""Configuration for knowledge base index following LlamaIndex best practices"""
# Embedding settings
embedding_model: str = Field(
default="text-embedding-3-small",
description="OpenAI embedding model"
)
# LLM settings
llm_model: str = Field(
default="gpt-4-turbo",
description="OpenAI LLM for query/synthesis"
)
# Chunking settings
chunk_size: int = Field(
default=1024,
description="Size of text chunks"
)
chunk_overlap: int = Field(
default=20,
description="Overlap between chunks"
)
# Vector store backend
use_pinecone: bool = Field(
default=False,
description="Use Pinecone for vector store"
)
pinecone_index_name: str = Field(
default="ecomcp-knowledge",
description="Pinecone index name"
)
pinecone_dimension: int = Field(
default=1536,
description="Dimension for embeddings"
)
# Retrieval settings
similarity_top_k: int = Field(
default=5,
description="Number of similar items to retrieve"
)
# Storage settings
persist_dir: str = Field(
default="./kb_storage",
description="Directory for persisting index"
)
class KnowledgeBase:
"""
Knowledge base for indexing and retrieving product/documentation information
"""
def __init__(self, config: Optional[IndexConfig] = None):
"""
Initialize knowledge base with modern LlamaIndex patterns
Args:
config: IndexConfig object for customization
"""
self.config = config or IndexConfig()
self.index = None
self.retriever = None
self.storage_context = None
self.ingestion_pipeline = None
self._setup_models()
self._setup_ingestion_pipeline()
def _setup_models(self):
"""Configure LLM and embedding models following LlamaIndex patterns"""
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
logger.warning("OPENAI_API_KEY not set. Models may not work.")
# Setup embedding model
self.embed_model = OpenAIEmbedding(
model=self.config.embedding_model,
api_key=api_key,
)
# Setup LLM
self.llm = OpenAI(
model=self.config.llm_model,
api_key=api_key,
)
# Configure global settings for LlamaIndex
Settings.embed_model = self.embed_model
Settings.llm = self.llm
Settings.chunk_size = self.config.chunk_size
Settings.chunk_overlap = self.config.chunk_overlap
def _setup_ingestion_pipeline(self):
"""Setup ingestion pipeline with metadata extraction"""
# Create node parser with metadata extraction
node_parser = SimpleNodeParser.from_defaults(
chunk_size=self.config.chunk_size,
chunk_overlap=self.config.chunk_overlap,
)
# Create metadata extractors
extractors = [
TitleExtractor(nodes=5),
KeywordExtractor(keywords=10),
]
# Create pipeline
self.ingestion_pipeline = IngestionPipeline(
transformations=[node_parser] + extractors,
)
def index_documents(self, documents_path: str) -> VectorStoreIndex:
"""
Index documents from a directory using ingestion pipeline
Args:
documents_path: Path to directory containing documents
Returns:
VectorStoreIndex: Indexed documents
"""
logger.info(f"Indexing documents from {documents_path}")
if not os.path.exists(documents_path):
logger.error(f"Document path not found: {documents_path}")
raise FileNotFoundError(f"Document path not found: {documents_path}")
# Load documents
reader = SimpleDirectoryReader(documents_path)
documents = reader.load_data()
logger.info(f"Loaded {len(documents)} documents")
# Process through ingestion pipeline
nodes = self.ingestion_pipeline.run(documents=documents)
logger.info(f"Processed into {len(nodes)} nodes with metadata")
# Create storage context
if self.config.use_pinecone:
self.storage_context = self._create_pinecone_storage()
else:
self.storage_context = StorageContext.from_defaults()
# Create index from nodes
self.index = VectorStoreIndex(
nodes=nodes,
storage_context=self.storage_context,
show_progress=True,
)
# Create retriever with configured top_k
self.retriever = self.index.as_retriever(
similarity_top_k=self.config.similarity_top_k
)
logger.info(f"Index created successfully with {len(nodes)} nodes")
return self.index
def _create_pinecone_storage(self) -> StorageContext:
"""
Create Pinecone-backed storage context
Returns:
StorageContext backed by Pinecone
"""
try:
from pinecone import Pinecone
api_key = os.getenv("PINECONE_API_KEY")
if not api_key:
logger.warning("PINECONE_API_KEY not set. Falling back to in-memory storage.")
return StorageContext.from_defaults()
pc = Pinecone(api_key=api_key)
# Get or create index
index_name = self.config.pinecone_index_name
if index_name not in pc.list_indexes().names():
logger.info(f"Creating Pinecone index: {index_name}")
pc.create_index(
name=index_name,
dimension=self.config.pinecone_dimension,
metric="cosine"
)
pinecone_index = pc.Index(index_name)
vector_store = PineconeVectorStore(pinecone_index=pinecone_index)
return StorageContext.from_defaults(vector_store=vector_store)
except ImportError:
logger.warning("Pinecone not available. Falling back to in-memory storage.")
return StorageContext.from_defaults()
def add_documents(self, documents: List[Document]) -> None:
"""
Add documents to existing index
Args:
documents: List of documents to add
"""
if self.index is None:
raise ValueError("Index not initialized. Call index_documents() first.")
logger.info(f"Adding {len(documents)} documents to index")
for doc in documents:
self.index.insert(doc)
def search(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
"""
Search knowledge base by query
Args:
query: Search query string
top_k: Number of top results to return
Returns:
List of results with score and content
"""
if self.index is None:
logger.error("Index not initialized")
return []
try:
results = self.index.as_retriever(similarity_top_k=top_k).retrieve(query)
output = []
for node in results:
output.append({
"content": node.get_content(),
"score": node.score if hasattr(node, 'score') else None,
"metadata": node.metadata if hasattr(node, 'metadata') else {},
})
return output
except Exception as e:
logger.error(f"Search error: {e}")
return []
def query(self, query_str: str, top_k: Optional[int] = None) -> str:
"""
Query knowledge base with natural language using query engine
Args:
query_str: Natural language query
top_k: Number of top results to use (uses config if not specified)
Returns:
Query response string
"""
if self.index is None:
return "Index not initialized"
try:
if top_k is None:
top_k = self.config.similarity_top_k
# Create query engine with response synthesis
query_engine = self.index.as_query_engine(
similarity_top_k=top_k,
response_mode="compact", # or "tree_summarize", "refine"
)
response = query_engine.query(query_str)
return str(response)
except Exception as e:
logger.error(f"Query error: {e}")
return f"Error processing query: {e}"
def chat(self, messages: List[Dict[str, str]]) -> str:
"""
Multi-turn chat with knowledge base
Args:
messages: List of messages in format [{"role": "user", "content": "..."}, ...]
Returns:
Chat response string
"""
if self.index is None:
return "Index not initialized"
try:
# Create chat engine for conversational interface
chat_engine = self.index.as_chat_engine()
# Process last user message
last_message = None
for msg in reversed(messages):
if msg.get("role") == "user":
last_message = msg.get("content")
break
if not last_message:
return "No user message found"
response = chat_engine.chat(last_message)
return str(response)
except Exception as e:
logger.error(f"Chat error: {e}")
return f"Error processing chat: {e}"
def save_index(self, output_path: str) -> None:
"""
Save index to disk
Args:
output_path: Path to save index
"""
if self.index is None:
logger.warning("No index to save")
return
Path(output_path).mkdir(parents=True, exist_ok=True)
self.index.storage_context.persist(persist_dir=output_path)
logger.info(f"Index saved to {output_path}")
def load_index(self, input_path: str) -> VectorStoreIndex:
"""
Load index from disk
Args:
input_path: Path to saved index
Returns:
Loaded VectorStoreIndex
"""
if not os.path.exists(input_path):
logger.error(f"Index path not found: {input_path}")
raise FileNotFoundError(f"Index path not found: {input_path}")
# Load storage context from disk
self.storage_context = StorageContext.from_defaults(persist_dir=input_path)
self.index = load_index_from_storage(
self.storage_context,
settings=Settings, # Use current settings
)
self.retriever = self.index.as_retriever(
similarity_top_k=self.config.similarity_top_k
)
logger.info(f"Index loaded from {input_path}")
return self.index
def get_index_info(self) -> Dict[str, Any]:
"""Get information about current index"""
if self.index is None:
return {"status": "No index loaded"}
return {
"status": "Index loaded",
"embedding_model": self.config.embedding_model,
"chunk_size": self.config.chunk_size,
"vector_store": "Pinecone" if self.config.use_pinecone else "In-memory",
}