|
|
""" |
|
|
Unified RAG Configuration for SPARKNET Demo |
|
|
|
|
|
This module provides a single source of truth for RAG system configuration, |
|
|
ensuring all demo pages use the same vector store, embeddings, and models. |
|
|
|
|
|
Supports three deployment modes: |
|
|
1. Backend API (GPU server like Lytos) - Full processing power |
|
|
2. Local Ollama (for on-premise deployments) |
|
|
3. Cloud LLM providers (for Streamlit Cloud without backend) |
|
|
""" |
|
|
|
|
|
import streamlit as st |
|
|
from pathlib import Path |
|
|
import sys |
|
|
import os |
|
|
|
|
|
PROJECT_ROOT = Path(__file__).parent.parent |
|
|
sys.path.insert(0, str(PROJECT_ROOT)) |
|
|
|
|
|
|
|
|
OLLAMA_BASE_URL = "http://localhost:11434" |
|
|
VECTOR_STORE_PATH = "data/sparknet_unified_rag" |
|
|
COLLECTION_NAME = "sparknet_documents" |
|
|
|
|
|
|
|
|
EMBEDDING_MODELS = ["nomic-embed-text", "mxbai-embed-large:latest", "mxbai-embed-large"] |
|
|
LLM_MODELS = ["llama3.2:latest", "llama3.1:8b", "mistral:latest", "qwen2.5:14b", "qwen2.5:32b"] |
|
|
|
|
|
|
|
|
def get_secret(key: str, default: str = None): |
|
|
"""Get secret from Streamlit secrets or environment.""" |
|
|
try: |
|
|
if hasattr(st, 'secrets') and key in st.secrets: |
|
|
return st.secrets[key] |
|
|
except: |
|
|
pass |
|
|
return os.environ.get(key, default) |
|
|
|
|
|
|
|
|
def check_ollama(): |
|
|
"""Check Ollama availability and get available models.""" |
|
|
try: |
|
|
import httpx |
|
|
with httpx.Client(timeout=3.0) as client: |
|
|
resp = client.get(f"{OLLAMA_BASE_URL}/api/tags") |
|
|
if resp.status_code == 200: |
|
|
models = [m["name"] for m in resp.json().get("models", [])] |
|
|
return True, models |
|
|
except: |
|
|
pass |
|
|
return False, [] |
|
|
|
|
|
|
|
|
def select_model(available_models: list, preferred_models: list) -> str: |
|
|
"""Select the best available model from preferences.""" |
|
|
for model in preferred_models: |
|
|
if model in available_models: |
|
|
return model |
|
|
return preferred_models[0] if preferred_models else "llama3.2:latest" |
|
|
|
|
|
|
|
|
def check_cloud_providers(): |
|
|
"""Check which cloud LLM providers are available.""" |
|
|
providers = {} |
|
|
|
|
|
if get_secret("GROQ_API_KEY"): |
|
|
providers["groq"] = True |
|
|
if get_secret("GOOGLE_API_KEY"): |
|
|
providers["google"] = True |
|
|
if get_secret("OPENROUTER_API_KEY"): |
|
|
providers["openrouter"] = True |
|
|
if get_secret("HF_TOKEN"): |
|
|
providers["huggingface"] = True |
|
|
if get_secret("GITHUB_TOKEN"): |
|
|
providers["github"] = True |
|
|
if get_secret("MISTRAL_API_KEY"): |
|
|
providers["mistral"] = True |
|
|
|
|
|
return providers |
|
|
|
|
|
|
|
|
def check_backend(): |
|
|
"""Check if backend API is available.""" |
|
|
try: |
|
|
from backend_client import check_backend_available, get_backend_url |
|
|
if get_backend_url(): |
|
|
available, status = check_backend_available() |
|
|
return available, status |
|
|
except: |
|
|
pass |
|
|
return False, {} |
|
|
|
|
|
|
|
|
@st.cache_resource |
|
|
def get_unified_rag_system(): |
|
|
""" |
|
|
Initialize and return the unified RAG system. |
|
|
|
|
|
This is cached at the Streamlit level so all pages share the same instance. |
|
|
|
|
|
Priority: |
|
|
1. Backend API (GPU server) - if BACKEND_URL is configured |
|
|
2. Local Ollama - if running locally |
|
|
3. Cloud LLM providers - if API keys configured |
|
|
4. Demo mode - no backend available |
|
|
""" |
|
|
|
|
|
try: |
|
|
import pydantic |
|
|
except ImportError: |
|
|
return { |
|
|
"status": "error", |
|
|
"error": "Required dependency 'pydantic' is not installed.", |
|
|
"rag": None, |
|
|
"store": None, |
|
|
"embedder": None, |
|
|
"mode": "error", |
|
|
} |
|
|
|
|
|
|
|
|
backend_ok, backend_status = check_backend() |
|
|
if backend_ok: |
|
|
return { |
|
|
"status": "ready", |
|
|
"error": None, |
|
|
"rag": None, |
|
|
"store": None, |
|
|
"embedder": None, |
|
|
"mode": "backend", |
|
|
"backend_status": backend_status, |
|
|
"ollama_available": backend_status.get("ollama_available", False), |
|
|
"gpu_available": backend_status.get("gpu_available", False), |
|
|
"gpu_name": backend_status.get("gpu_name"), |
|
|
"embed_model": backend_status.get("embedding_model", "backend"), |
|
|
"llm_model": backend_status.get("llm_model", "backend"), |
|
|
"indexed_chunks": backend_status.get("indexed_chunks", 0), |
|
|
} |
|
|
|
|
|
|
|
|
ollama_ok, available_models = check_ollama() |
|
|
|
|
|
|
|
|
cloud_providers = check_cloud_providers() |
|
|
|
|
|
if ollama_ok: |
|
|
|
|
|
try: |
|
|
from src.rag.agentic import AgenticRAG, RAGConfig |
|
|
from src.rag.store import get_vector_store, VectorStoreConfig, reset_vector_store |
|
|
from src.rag.embeddings import get_embedding_adapter, EmbeddingConfig, reset_embedding_adapter |
|
|
|
|
|
|
|
|
embed_model = select_model(available_models, EMBEDDING_MODELS) |
|
|
llm_model = select_model(available_models, LLM_MODELS) |
|
|
|
|
|
|
|
|
reset_vector_store() |
|
|
reset_embedding_adapter() |
|
|
|
|
|
|
|
|
embed_config = EmbeddingConfig( |
|
|
ollama_model=embed_model, |
|
|
ollama_base_url=OLLAMA_BASE_URL, |
|
|
) |
|
|
embedder = get_embedding_adapter(config=embed_config) |
|
|
|
|
|
|
|
|
store_config = VectorStoreConfig( |
|
|
persist_directory=VECTOR_STORE_PATH, |
|
|
collection_name=COLLECTION_NAME, |
|
|
similarity_threshold=0.0, |
|
|
) |
|
|
store = get_vector_store(config=store_config) |
|
|
|
|
|
|
|
|
rag_config = RAGConfig( |
|
|
model=llm_model, |
|
|
base_url=OLLAMA_BASE_URL, |
|
|
max_revision_attempts=1, |
|
|
enable_query_planning=True, |
|
|
enable_reranking=True, |
|
|
enable_validation=True, |
|
|
retrieval_top_k=10, |
|
|
final_top_k=5, |
|
|
min_confidence=0.3, |
|
|
verbose=False, |
|
|
) |
|
|
|
|
|
|
|
|
rag = AgenticRAG( |
|
|
config=rag_config, |
|
|
vector_store=store, |
|
|
embedding_adapter=embedder, |
|
|
) |
|
|
|
|
|
return { |
|
|
"status": "ready", |
|
|
"error": None, |
|
|
"rag": rag, |
|
|
"store": store, |
|
|
"embedder": embedder, |
|
|
"embed_model": embed_model, |
|
|
"llm_model": llm_model, |
|
|
"available_models": available_models, |
|
|
"mode": "ollama", |
|
|
} |
|
|
except Exception as e: |
|
|
import traceback |
|
|
return { |
|
|
"status": "error", |
|
|
"error": f"Ollama RAG init failed: {str(e)}", |
|
|
"rag": None, |
|
|
"store": None, |
|
|
"embedder": None, |
|
|
"mode": "error", |
|
|
} |
|
|
|
|
|
elif cloud_providers: |
|
|
|
|
|
|
|
|
|
|
|
return { |
|
|
"status": "cloud", |
|
|
"error": None, |
|
|
"rag": None, |
|
|
"store": None, |
|
|
"embedder": None, |
|
|
"mode": "cloud", |
|
|
"providers": list(cloud_providers.keys()), |
|
|
"message": "Running in cloud mode. Document Q&A available via cloud LLM providers.", |
|
|
} |
|
|
|
|
|
else: |
|
|
|
|
|
return { |
|
|
"status": "demo", |
|
|
"error": "No LLM backend configured. Add API keys to secrets.toml or start Ollama.", |
|
|
"rag": None, |
|
|
"store": None, |
|
|
"embedder": None, |
|
|
"mode": "demo", |
|
|
} |
|
|
|
|
|
|
|
|
def get_store_stats(): |
|
|
"""Get current vector store statistics.""" |
|
|
system = get_unified_rag_system() |
|
|
|
|
|
|
|
|
if system["mode"] == "backend": |
|
|
return { |
|
|
"total_chunks": system.get("indexed_chunks", 0), |
|
|
"status": "ready", |
|
|
"mode": "backend", |
|
|
"embed_model": system.get("embed_model", "backend"), |
|
|
"llm_model": system.get("llm_model", "backend"), |
|
|
"gpu_available": system.get("gpu_available", False), |
|
|
"gpu_name": system.get("gpu_name"), |
|
|
} |
|
|
|
|
|
if system["mode"] == "cloud": |
|
|
return { |
|
|
"total_chunks": 0, |
|
|
"status": "cloud", |
|
|
"message": "Cloud mode - indexing requires backend or Ollama", |
|
|
} |
|
|
|
|
|
if system["status"] != "ready": |
|
|
return {"total_chunks": 0, "status": "error"} |
|
|
|
|
|
try: |
|
|
return { |
|
|
"total_chunks": system["store"].count(), |
|
|
"status": "ready", |
|
|
"embed_model": system.get("embed_model", "unknown"), |
|
|
"llm_model": system.get("llm_model", "unknown"), |
|
|
} |
|
|
except: |
|
|
return {"total_chunks": 0, "status": "error"} |
|
|
|
|
|
|
|
|
def index_document(text: str, document_id: str, metadata: dict = None) -> dict: |
|
|
"""Index a document into the unified RAG system.""" |
|
|
system = get_unified_rag_system() |
|
|
|
|
|
|
|
|
if system["mode"] == "backend": |
|
|
try: |
|
|
from backend_client import get_backend_client |
|
|
client = get_backend_client() |
|
|
|
|
|
chunk_size = 500 |
|
|
overlap = 50 |
|
|
chunks = [] |
|
|
for i in range(0, len(text), chunk_size - overlap): |
|
|
chunk_text = text[i:i + chunk_size] |
|
|
if len(chunk_text.strip()) > 20: |
|
|
chunks.append({ |
|
|
"chunk_id": f"{document_id}_chunk_{len(chunks)}", |
|
|
"text": chunk_text, |
|
|
"page": 0, |
|
|
}) |
|
|
result = client.index_document(document_id, text, chunks, metadata) |
|
|
if result.success: |
|
|
return {"success": True, "num_chunks": result.data.get("num_chunks", 0), "error": None} |
|
|
else: |
|
|
return {"success": False, "error": result.error, "num_chunks": 0} |
|
|
except Exception as e: |
|
|
return {"success": False, "error": str(e), "num_chunks": 0} |
|
|
|
|
|
if system["mode"] == "cloud": |
|
|
return {"success": False, "error": "Indexing requires backend or Ollama", "num_chunks": 0} |
|
|
|
|
|
if system["status"] != "ready": |
|
|
return {"success": False, "error": system.get("error", "RAG not ready"), "num_chunks": 0} |
|
|
|
|
|
try: |
|
|
num_chunks = system["rag"].index_text( |
|
|
text=text, |
|
|
document_id=document_id, |
|
|
metadata=metadata or {}, |
|
|
) |
|
|
return {"success": True, "num_chunks": num_chunks, "error": None} |
|
|
except Exception as e: |
|
|
return {"success": False, "error": str(e), "num_chunks": 0} |
|
|
|
|
|
|
|
|
def query_rag(question: str, filters: dict = None): |
|
|
"""Query the unified RAG system.""" |
|
|
system = get_unified_rag_system() |
|
|
|
|
|
|
|
|
if system["mode"] == "backend": |
|
|
try: |
|
|
from backend_client import get_backend_client |
|
|
client = get_backend_client() |
|
|
result = client.query(question, filters=filters) |
|
|
if result.success: |
|
|
data = result.data |
|
|
|
|
|
return type('RAGResponse', (), { |
|
|
'answer': data.get('answer', ''), |
|
|
'citations': [ |
|
|
type('Citation', (), { |
|
|
'index': s.get('index', i+1), |
|
|
'text_snippet': s.get('text_snippet', ''), |
|
|
'relevance_score': s.get('relevance_score', 0), |
|
|
'document_id': s.get('document_id', ''), |
|
|
'page': s.get('page', 0), |
|
|
})() for i, s in enumerate(data.get('sources', [])) |
|
|
], |
|
|
'confidence': data.get('confidence', 0), |
|
|
'latency_ms': data.get('latency_ms', 0), |
|
|
'num_sources': len(data.get('sources', [])), |
|
|
'validated': data.get('validated', False), |
|
|
})(), None |
|
|
else: |
|
|
return None, result.error |
|
|
except Exception as e: |
|
|
return None, str(e) |
|
|
|
|
|
if system["mode"] == "cloud": |
|
|
|
|
|
from llm_providers import generate_response |
|
|
response, error = generate_response(question) |
|
|
if error: |
|
|
return None, error |
|
|
return {"answer": response, "sources": [], "mode": "cloud"}, None |
|
|
|
|
|
if system["status"] != "ready": |
|
|
return None, system.get("error", "RAG not ready") |
|
|
|
|
|
try: |
|
|
response = system["rag"].query(question, filters=filters) |
|
|
return response, None |
|
|
except Exception as e: |
|
|
return None, str(e) |
|
|
|
|
|
|
|
|
def clear_index(): |
|
|
"""Clear the vector store index.""" |
|
|
get_unified_rag_system.clear() |
|
|
return True |
|
|
|
|
|
|
|
|
def get_indexed_documents() -> list: |
|
|
"""Get list of indexed document IDs from vector store.""" |
|
|
system = get_unified_rag_system() |
|
|
|
|
|
|
|
|
if system["mode"] == "backend": |
|
|
try: |
|
|
from backend_client import get_backend_client |
|
|
client = get_backend_client() |
|
|
result = client.list_documents() |
|
|
if result.success: |
|
|
docs = result.data.get("documents", []) |
|
|
return [ |
|
|
{ |
|
|
"document_id": d.get("doc_id", d.get("document_id", "")), |
|
|
"source_path": d.get("filename", ""), |
|
|
"chunk_count": d.get("chunk_count", 0), |
|
|
} |
|
|
for d in docs |
|
|
] |
|
|
except: |
|
|
pass |
|
|
return [] |
|
|
|
|
|
if system["status"] != "ready": |
|
|
return [] |
|
|
|
|
|
try: |
|
|
store = system["store"] |
|
|
collection = store._collection |
|
|
|
|
|
results = collection.get(include=["metadatas"]) |
|
|
if not results or not results.get("metadatas"): |
|
|
return [] |
|
|
|
|
|
doc_info = {} |
|
|
for meta in results["metadatas"]: |
|
|
doc_id = meta.get("document_id", "unknown") |
|
|
if doc_id not in doc_info: |
|
|
doc_info[doc_id] = { |
|
|
"document_id": doc_id, |
|
|
"source_path": meta.get("source_path", ""), |
|
|
"chunk_count": 0, |
|
|
} |
|
|
doc_info[doc_id]["chunk_count"] += 1 |
|
|
|
|
|
return list(doc_info.values()) |
|
|
except Exception as e: |
|
|
return [] |
|
|
|
|
|
|
|
|
def get_chunks_for_document(document_id: str) -> list: |
|
|
"""Get all chunks for a specific document.""" |
|
|
system = get_unified_rag_system() |
|
|
if system["status"] != "ready": |
|
|
return [] |
|
|
|
|
|
try: |
|
|
store = system["store"] |
|
|
collection = store._collection |
|
|
|
|
|
results = collection.get( |
|
|
where={"document_id": document_id}, |
|
|
include=["documents", "metadatas"] |
|
|
) |
|
|
|
|
|
if not results or not results.get("ids"): |
|
|
return [] |
|
|
|
|
|
chunks = [] |
|
|
for i, chunk_id in enumerate(results["ids"]): |
|
|
chunks.append({ |
|
|
"chunk_id": chunk_id, |
|
|
"text": results["documents"][i] if results.get("documents") else "", |
|
|
"metadata": results["metadatas"][i] if results.get("metadatas") else {}, |
|
|
}) |
|
|
|
|
|
return chunks |
|
|
except Exception as e: |
|
|
return [] |
|
|
|
|
|
|
|
|
def search_similar_chunks(query: str, top_k: int = 5, doc_filter: str = None): |
|
|
"""Search for similar chunks with optional document filter.""" |
|
|
system = get_unified_rag_system() |
|
|
|
|
|
|
|
|
if system["mode"] == "backend": |
|
|
try: |
|
|
from backend_client import get_backend_client |
|
|
client = get_backend_client() |
|
|
result = client.search_similar(query, top_k, doc_filter) |
|
|
if result.success: |
|
|
return result.data.get("results", []) |
|
|
except: |
|
|
pass |
|
|
return [] |
|
|
|
|
|
if system["status"] != "ready": |
|
|
return [] |
|
|
|
|
|
try: |
|
|
embedder = system["embedder"] |
|
|
store = system["store"] |
|
|
|
|
|
query_embedding = embedder.embed_text(query) |
|
|
|
|
|
filters = None |
|
|
if doc_filter: |
|
|
filters = {"document_id": doc_filter} |
|
|
|
|
|
results = store.search( |
|
|
query_embedding=query_embedding, |
|
|
top_k=top_k, |
|
|
filters=filters, |
|
|
) |
|
|
|
|
|
return [ |
|
|
{ |
|
|
"chunk_id": r.chunk_id, |
|
|
"document_id": r.document_id, |
|
|
"text": r.text, |
|
|
"similarity": r.similarity, |
|
|
"page": r.page, |
|
|
"metadata": r.metadata, |
|
|
} |
|
|
for r in results |
|
|
] |
|
|
except Exception as e: |
|
|
return [] |
|
|
|
|
|
|
|
|
def compute_document_similarity(doc_id_1: str, doc_id_2: str) -> dict: |
|
|
"""Compute semantic similarity between two documents.""" |
|
|
system = get_unified_rag_system() |
|
|
if system["status"] != "ready": |
|
|
return {"error": "RAG system not ready", "similarity": 0.0} |
|
|
|
|
|
try: |
|
|
chunks_1 = get_chunks_for_document(doc_id_1) |
|
|
chunks_2 = get_chunks_for_document(doc_id_2) |
|
|
|
|
|
if not chunks_1 or not chunks_2: |
|
|
return {"error": "One or both documents not found", "similarity": 0.0} |
|
|
|
|
|
embedder = system["embedder"] |
|
|
|
|
|
def avg_embedding(chunks): |
|
|
embeddings = [] |
|
|
for chunk in chunks[:10]: |
|
|
emb = embedder.embed_text(chunk["text"]) |
|
|
embeddings.append(emb) |
|
|
if not embeddings: |
|
|
return None |
|
|
import numpy as np |
|
|
return np.mean(embeddings, axis=0).tolist() |
|
|
|
|
|
emb1 = avg_embedding(chunks_1) |
|
|
emb2 = avg_embedding(chunks_2) |
|
|
|
|
|
if emb1 is None or emb2 is None: |
|
|
return {"error": "Could not compute embeddings", "similarity": 0.0} |
|
|
|
|
|
import numpy as np |
|
|
emb1 = np.array(emb1) |
|
|
emb2 = np.array(emb2) |
|
|
similarity = np.dot(emb1, emb2) / (np.linalg.norm(emb1) * np.linalg.norm(emb2)) |
|
|
|
|
|
return { |
|
|
"similarity": float(similarity), |
|
|
"doc1_chunks": len(chunks_1), |
|
|
"doc2_chunks": len(chunks_2), |
|
|
"error": None, |
|
|
} |
|
|
except Exception as e: |
|
|
return {"error": str(e), "similarity": 0.0} |
|
|
|
|
|
|
|
|
def auto_index_processed_document(doc_id: str, text: str, chunks: list, metadata: dict = None): |
|
|
""" |
|
|
Auto-index a processed document with pre-computed chunks. |
|
|
""" |
|
|
system = get_unified_rag_system() |
|
|
|
|
|
|
|
|
if system["mode"] == "backend": |
|
|
try: |
|
|
from backend_client import get_backend_client |
|
|
client = get_backend_client() |
|
|
result = client.index_document(doc_id, text, chunks, metadata) |
|
|
if result.success: |
|
|
return {"success": True, "num_chunks": result.data.get("num_chunks", 0), "error": None} |
|
|
else: |
|
|
return {"success": False, "error": result.error, "num_chunks": 0} |
|
|
except Exception as e: |
|
|
return {"success": False, "error": str(e), "num_chunks": 0} |
|
|
|
|
|
if system["mode"] == "cloud": |
|
|
return {"success": False, "error": "Indexing requires backend or Ollama", "num_chunks": 0} |
|
|
|
|
|
if system["status"] != "ready": |
|
|
return {"success": False, "error": "RAG system not ready", "num_chunks": 0} |
|
|
|
|
|
try: |
|
|
store = system["store"] |
|
|
embedder = system["embedder"] |
|
|
|
|
|
chunk_dicts = [] |
|
|
embeddings = [] |
|
|
|
|
|
for i, chunk in enumerate(chunks): |
|
|
chunk_text = chunk.get("text", chunk) if isinstance(chunk, dict) else chunk |
|
|
|
|
|
if len(chunk_text.strip()) < 20: |
|
|
continue |
|
|
|
|
|
chunk_id = f"{doc_id}_chunk_{i}" |
|
|
chunk_dict = { |
|
|
"chunk_id": chunk_id, |
|
|
"document_id": doc_id, |
|
|
"text": chunk_text, |
|
|
"page": chunk.get("page", 0) if isinstance(chunk, dict) else 0, |
|
|
"chunk_type": "text", |
|
|
"source_path": metadata.get("filename", "") if metadata else "", |
|
|
"sequence_index": i, |
|
|
} |
|
|
chunk_dicts.append(chunk_dict) |
|
|
|
|
|
embedding = embedder.embed_text(chunk_text) |
|
|
embeddings.append(embedding) |
|
|
|
|
|
if not chunk_dicts: |
|
|
return {"success": False, "error": "No valid chunks to index", "num_chunks": 0} |
|
|
|
|
|
store.add_chunks(chunk_dicts, embeddings) |
|
|
|
|
|
return {"success": True, "num_chunks": len(chunk_dicts), "error": None} |
|
|
|
|
|
except Exception as e: |
|
|
return {"success": False, "error": str(e), "num_chunks": 0} |
|
|
|