SPARKNET / demo /rag_config.py
MHamdan's picture
the update
4718630
"""
Unified RAG Configuration for SPARKNET Demo
This module provides a single source of truth for RAG system configuration,
ensuring all demo pages use the same vector store, embeddings, and models.
Supports three deployment modes:
1. Backend API (GPU server like Lytos) - Full processing power
2. Local Ollama (for on-premise deployments)
3. Cloud LLM providers (for Streamlit Cloud without backend)
"""
import streamlit as st
from pathlib import Path
import sys
import os
PROJECT_ROOT = Path(__file__).parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
# Configuration constants
OLLAMA_BASE_URL = "http://localhost:11434"
VECTOR_STORE_PATH = "data/sparknet_unified_rag"
COLLECTION_NAME = "sparknet_documents"
# Model preferences (in order of preference)
EMBEDDING_MODELS = ["nomic-embed-text", "mxbai-embed-large:latest", "mxbai-embed-large"]
LLM_MODELS = ["llama3.2:latest", "llama3.1:8b", "mistral:latest", "qwen2.5:14b", "qwen2.5:32b"]
def get_secret(key: str, default: str = None):
"""Get secret from Streamlit secrets or environment."""
try:
if hasattr(st, 'secrets') and key in st.secrets:
return st.secrets[key]
except:
pass
return os.environ.get(key, default)
def check_ollama():
"""Check Ollama availability and get available models."""
try:
import httpx
with httpx.Client(timeout=3.0) as client:
resp = client.get(f"{OLLAMA_BASE_URL}/api/tags")
if resp.status_code == 200:
models = [m["name"] for m in resp.json().get("models", [])]
return True, models
except:
pass
return False, []
def select_model(available_models: list, preferred_models: list) -> str:
"""Select the best available model from preferences."""
for model in preferred_models:
if model in available_models:
return model
return preferred_models[0] if preferred_models else "llama3.2:latest"
def check_cloud_providers():
"""Check which cloud LLM providers are available."""
providers = {}
if get_secret("GROQ_API_KEY"):
providers["groq"] = True
if get_secret("GOOGLE_API_KEY"):
providers["google"] = True
if get_secret("OPENROUTER_API_KEY"):
providers["openrouter"] = True
if get_secret("HF_TOKEN"):
providers["huggingface"] = True
if get_secret("GITHUB_TOKEN"):
providers["github"] = True
if get_secret("MISTRAL_API_KEY"):
providers["mistral"] = True
return providers
def check_backend():
"""Check if backend API is available."""
try:
from backend_client import check_backend_available, get_backend_url
if get_backend_url():
available, status = check_backend_available()
return available, status
except:
pass
return False, {}
@st.cache_resource
def get_unified_rag_system():
"""
Initialize and return the unified RAG system.
This is cached at the Streamlit level so all pages share the same instance.
Priority:
1. Backend API (GPU server) - if BACKEND_URL is configured
2. Local Ollama - if running locally
3. Cloud LLM providers - if API keys configured
4. Demo mode - no backend available
"""
# Check for required dependencies first
try:
import pydantic
except ImportError:
return {
"status": "error",
"error": "Required dependency 'pydantic' is not installed.",
"rag": None,
"store": None,
"embedder": None,
"mode": "error",
}
# Check backend API first (GPU server)
backend_ok, backend_status = check_backend()
if backend_ok:
return {
"status": "ready",
"error": None,
"rag": None, # Use backend API instead
"store": None,
"embedder": None,
"mode": "backend",
"backend_status": backend_status,
"ollama_available": backend_status.get("ollama_available", False),
"gpu_available": backend_status.get("gpu_available", False),
"gpu_name": backend_status.get("gpu_name"),
"embed_model": backend_status.get("embedding_model", "backend"),
"llm_model": backend_status.get("llm_model", "backend"),
"indexed_chunks": backend_status.get("indexed_chunks", 0),
}
# Check Ollama availability
ollama_ok, available_models = check_ollama()
# Check cloud providers
cloud_providers = check_cloud_providers()
if ollama_ok:
# Use Ollama for full RAG functionality
try:
from src.rag.agentic import AgenticRAG, RAGConfig
from src.rag.store import get_vector_store, VectorStoreConfig, reset_vector_store
from src.rag.embeddings import get_embedding_adapter, EmbeddingConfig, reset_embedding_adapter
# Select models
embed_model = select_model(available_models, EMBEDDING_MODELS)
llm_model = select_model(available_models, LLM_MODELS)
# Reset singletons to ensure fresh config
reset_vector_store()
reset_embedding_adapter()
# Initialize embedding adapter
embed_config = EmbeddingConfig(
ollama_model=embed_model,
ollama_base_url=OLLAMA_BASE_URL,
)
embedder = get_embedding_adapter(config=embed_config)
# Initialize vector store
store_config = VectorStoreConfig(
persist_directory=VECTOR_STORE_PATH,
collection_name=COLLECTION_NAME,
similarity_threshold=0.0,
)
store = get_vector_store(config=store_config)
# Initialize RAG config
rag_config = RAGConfig(
model=llm_model,
base_url=OLLAMA_BASE_URL,
max_revision_attempts=1,
enable_query_planning=True,
enable_reranking=True,
enable_validation=True,
retrieval_top_k=10,
final_top_k=5,
min_confidence=0.3,
verbose=False,
)
# Initialize RAG system
rag = AgenticRAG(
config=rag_config,
vector_store=store,
embedding_adapter=embedder,
)
return {
"status": "ready",
"error": None,
"rag": rag,
"store": store,
"embedder": embedder,
"embed_model": embed_model,
"llm_model": llm_model,
"available_models": available_models,
"mode": "ollama",
}
except Exception as e:
import traceback
return {
"status": "error",
"error": f"Ollama RAG init failed: {str(e)}",
"rag": None,
"store": None,
"embedder": None,
"mode": "error",
}
elif cloud_providers:
# Cloud mode - use cloud LLM providers
# RAG with vector store requires local processing
# but we can still do basic document Q&A with cloud LLMs
return {
"status": "cloud",
"error": None,
"rag": None,
"store": None,
"embedder": None,
"mode": "cloud",
"providers": list(cloud_providers.keys()),
"message": "Running in cloud mode. Document Q&A available via cloud LLM providers.",
}
else:
# No backend available
return {
"status": "demo",
"error": "No LLM backend configured. Add API keys to secrets.toml or start Ollama.",
"rag": None,
"store": None,
"embedder": None,
"mode": "demo",
}
def get_store_stats():
"""Get current vector store statistics."""
system = get_unified_rag_system()
# Use backend status if available
if system["mode"] == "backend":
return {
"total_chunks": system.get("indexed_chunks", 0),
"status": "ready",
"mode": "backend",
"embed_model": system.get("embed_model", "backend"),
"llm_model": system.get("llm_model", "backend"),
"gpu_available": system.get("gpu_available", False),
"gpu_name": system.get("gpu_name"),
}
if system["mode"] == "cloud":
return {
"total_chunks": 0,
"status": "cloud",
"message": "Cloud mode - indexing requires backend or Ollama",
}
if system["status"] != "ready":
return {"total_chunks": 0, "status": "error"}
try:
return {
"total_chunks": system["store"].count(),
"status": "ready",
"embed_model": system.get("embed_model", "unknown"),
"llm_model": system.get("llm_model", "unknown"),
}
except:
return {"total_chunks": 0, "status": "error"}
def index_document(text: str, document_id: str, metadata: dict = None) -> dict:
"""Index a document into the unified RAG system."""
system = get_unified_rag_system()
# Use backend API if available
if system["mode"] == "backend":
try:
from backend_client import get_backend_client
client = get_backend_client()
# Simple chunking for backend indexing
chunk_size = 500
overlap = 50
chunks = []
for i in range(0, len(text), chunk_size - overlap):
chunk_text = text[i:i + chunk_size]
if len(chunk_text.strip()) > 20:
chunks.append({
"chunk_id": f"{document_id}_chunk_{len(chunks)}",
"text": chunk_text,
"page": 0,
})
result = client.index_document(document_id, text, chunks, metadata)
if result.success:
return {"success": True, "num_chunks": result.data.get("num_chunks", 0), "error": None}
else:
return {"success": False, "error": result.error, "num_chunks": 0}
except Exception as e:
return {"success": False, "error": str(e), "num_chunks": 0}
if system["mode"] == "cloud":
return {"success": False, "error": "Indexing requires backend or Ollama", "num_chunks": 0}
if system["status"] != "ready":
return {"success": False, "error": system.get("error", "RAG not ready"), "num_chunks": 0}
try:
num_chunks = system["rag"].index_text(
text=text,
document_id=document_id,
metadata=metadata or {},
)
return {"success": True, "num_chunks": num_chunks, "error": None}
except Exception as e:
return {"success": False, "error": str(e), "num_chunks": 0}
def query_rag(question: str, filters: dict = None):
"""Query the unified RAG system."""
system = get_unified_rag_system()
# Use backend API if available
if system["mode"] == "backend":
try:
from backend_client import get_backend_client
client = get_backend_client()
result = client.query(question, filters=filters)
if result.success:
data = result.data
# Create a response object-like dict
return type('RAGResponse', (), {
'answer': data.get('answer', ''),
'citations': [
type('Citation', (), {
'index': s.get('index', i+1),
'text_snippet': s.get('text_snippet', ''),
'relevance_score': s.get('relevance_score', 0),
'document_id': s.get('document_id', ''),
'page': s.get('page', 0),
})() for i, s in enumerate(data.get('sources', []))
],
'confidence': data.get('confidence', 0),
'latency_ms': data.get('latency_ms', 0),
'num_sources': len(data.get('sources', [])),
'validated': data.get('validated', False),
})(), None
else:
return None, result.error
except Exception as e:
return None, str(e)
if system["mode"] == "cloud":
# Use cloud LLM for Q&A
from llm_providers import generate_response
response, error = generate_response(question)
if error:
return None, error
return {"answer": response, "sources": [], "mode": "cloud"}, None
if system["status"] != "ready":
return None, system.get("error", "RAG not ready")
try:
response = system["rag"].query(question, filters=filters)
return response, None
except Exception as e:
return None, str(e)
def clear_index():
"""Clear the vector store index."""
get_unified_rag_system.clear()
return True
def get_indexed_documents() -> list:
"""Get list of indexed document IDs from vector store."""
system = get_unified_rag_system()
# Use backend API if available
if system["mode"] == "backend":
try:
from backend_client import get_backend_client
client = get_backend_client()
result = client.list_documents()
if result.success:
docs = result.data.get("documents", [])
return [
{
"document_id": d.get("doc_id", d.get("document_id", "")),
"source_path": d.get("filename", ""),
"chunk_count": d.get("chunk_count", 0),
}
for d in docs
]
except:
pass
return []
if system["status"] != "ready":
return []
try:
store = system["store"]
collection = store._collection
results = collection.get(include=["metadatas"])
if not results or not results.get("metadatas"):
return []
doc_info = {}
for meta in results["metadatas"]:
doc_id = meta.get("document_id", "unknown")
if doc_id not in doc_info:
doc_info[doc_id] = {
"document_id": doc_id,
"source_path": meta.get("source_path", ""),
"chunk_count": 0,
}
doc_info[doc_id]["chunk_count"] += 1
return list(doc_info.values())
except Exception as e:
return []
def get_chunks_for_document(document_id: str) -> list:
"""Get all chunks for a specific document."""
system = get_unified_rag_system()
if system["status"] != "ready":
return []
try:
store = system["store"]
collection = store._collection
results = collection.get(
where={"document_id": document_id},
include=["documents", "metadatas"]
)
if not results or not results.get("ids"):
return []
chunks = []
for i, chunk_id in enumerate(results["ids"]):
chunks.append({
"chunk_id": chunk_id,
"text": results["documents"][i] if results.get("documents") else "",
"metadata": results["metadatas"][i] if results.get("metadatas") else {},
})
return chunks
except Exception as e:
return []
def search_similar_chunks(query: str, top_k: int = 5, doc_filter: str = None):
"""Search for similar chunks with optional document filter."""
system = get_unified_rag_system()
# Use backend API if available
if system["mode"] == "backend":
try:
from backend_client import get_backend_client
client = get_backend_client()
result = client.search_similar(query, top_k, doc_filter)
if result.success:
return result.data.get("results", [])
except:
pass
return []
if system["status"] != "ready":
return []
try:
embedder = system["embedder"]
store = system["store"]
query_embedding = embedder.embed_text(query)
filters = None
if doc_filter:
filters = {"document_id": doc_filter}
results = store.search(
query_embedding=query_embedding,
top_k=top_k,
filters=filters,
)
return [
{
"chunk_id": r.chunk_id,
"document_id": r.document_id,
"text": r.text,
"similarity": r.similarity,
"page": r.page,
"metadata": r.metadata,
}
for r in results
]
except Exception as e:
return []
def compute_document_similarity(doc_id_1: str, doc_id_2: str) -> dict:
"""Compute semantic similarity between two documents."""
system = get_unified_rag_system()
if system["status"] != "ready":
return {"error": "RAG system not ready", "similarity": 0.0}
try:
chunks_1 = get_chunks_for_document(doc_id_1)
chunks_2 = get_chunks_for_document(doc_id_2)
if not chunks_1 or not chunks_2:
return {"error": "One or both documents not found", "similarity": 0.0}
embedder = system["embedder"]
def avg_embedding(chunks):
embeddings = []
for chunk in chunks[:10]:
emb = embedder.embed_text(chunk["text"])
embeddings.append(emb)
if not embeddings:
return None
import numpy as np
return np.mean(embeddings, axis=0).tolist()
emb1 = avg_embedding(chunks_1)
emb2 = avg_embedding(chunks_2)
if emb1 is None or emb2 is None:
return {"error": "Could not compute embeddings", "similarity": 0.0}
import numpy as np
emb1 = np.array(emb1)
emb2 = np.array(emb2)
similarity = np.dot(emb1, emb2) / (np.linalg.norm(emb1) * np.linalg.norm(emb2))
return {
"similarity": float(similarity),
"doc1_chunks": len(chunks_1),
"doc2_chunks": len(chunks_2),
"error": None,
}
except Exception as e:
return {"error": str(e), "similarity": 0.0}
def auto_index_processed_document(doc_id: str, text: str, chunks: list, metadata: dict = None):
"""
Auto-index a processed document with pre-computed chunks.
"""
system = get_unified_rag_system()
# Use backend API if available
if system["mode"] == "backend":
try:
from backend_client import get_backend_client
client = get_backend_client()
result = client.index_document(doc_id, text, chunks, metadata)
if result.success:
return {"success": True, "num_chunks": result.data.get("num_chunks", 0), "error": None}
else:
return {"success": False, "error": result.error, "num_chunks": 0}
except Exception as e:
return {"success": False, "error": str(e), "num_chunks": 0}
if system["mode"] == "cloud":
return {"success": False, "error": "Indexing requires backend or Ollama", "num_chunks": 0}
if system["status"] != "ready":
return {"success": False, "error": "RAG system not ready", "num_chunks": 0}
try:
store = system["store"]
embedder = system["embedder"]
chunk_dicts = []
embeddings = []
for i, chunk in enumerate(chunks):
chunk_text = chunk.get("text", chunk) if isinstance(chunk, dict) else chunk
if len(chunk_text.strip()) < 20:
continue
chunk_id = f"{doc_id}_chunk_{i}"
chunk_dict = {
"chunk_id": chunk_id,
"document_id": doc_id,
"text": chunk_text,
"page": chunk.get("page", 0) if isinstance(chunk, dict) else 0,
"chunk_type": "text",
"source_path": metadata.get("filename", "") if metadata else "",
"sequence_index": i,
}
chunk_dicts.append(chunk_dict)
embedding = embedder.embed_text(chunk_text)
embeddings.append(embedding)
if not chunk_dicts:
return {"success": False, "error": "No valid chunks to index", "num_chunks": 0}
store.add_chunks(chunk_dicts, embeddings)
return {"success": True, "num_chunks": len(chunk_dicts), "error": None}
except Exception as e:
return {"success": False, "error": str(e), "num_chunks": 0}