""" Unified RAG Configuration for SPARKNET Demo This module provides a single source of truth for RAG system configuration, ensuring all demo pages use the same vector store, embeddings, and models. Supports three deployment modes: 1. Backend API (GPU server like Lytos) - Full processing power 2. Local Ollama (for on-premise deployments) 3. Cloud LLM providers (for Streamlit Cloud without backend) """ import streamlit as st from pathlib import Path import sys import os PROJECT_ROOT = Path(__file__).parent.parent sys.path.insert(0, str(PROJECT_ROOT)) # Configuration constants OLLAMA_BASE_URL = "http://localhost:11434" VECTOR_STORE_PATH = "data/sparknet_unified_rag" COLLECTION_NAME = "sparknet_documents" # Model preferences (in order of preference) EMBEDDING_MODELS = ["nomic-embed-text", "mxbai-embed-large:latest", "mxbai-embed-large"] LLM_MODELS = ["llama3.2:latest", "llama3.1:8b", "mistral:latest", "qwen2.5:14b", "qwen2.5:32b"] def get_secret(key: str, default: str = None): """Get secret from Streamlit secrets or environment.""" try: if hasattr(st, 'secrets') and key in st.secrets: return st.secrets[key] except: pass return os.environ.get(key, default) def check_ollama(): """Check Ollama availability and get available models.""" try: import httpx with httpx.Client(timeout=3.0) as client: resp = client.get(f"{OLLAMA_BASE_URL}/api/tags") if resp.status_code == 200: models = [m["name"] for m in resp.json().get("models", [])] return True, models except: pass return False, [] def select_model(available_models: list, preferred_models: list) -> str: """Select the best available model from preferences.""" for model in preferred_models: if model in available_models: return model return preferred_models[0] if preferred_models else "llama3.2:latest" def check_cloud_providers(): """Check which cloud LLM providers are available.""" providers = {} if get_secret("GROQ_API_KEY"): providers["groq"] = True if get_secret("GOOGLE_API_KEY"): providers["google"] = True if get_secret("OPENROUTER_API_KEY"): providers["openrouter"] = True if get_secret("HF_TOKEN"): providers["huggingface"] = True if get_secret("GITHUB_TOKEN"): providers["github"] = True if get_secret("MISTRAL_API_KEY"): providers["mistral"] = True return providers def check_backend(): """Check if backend API is available.""" try: from backend_client import check_backend_available, get_backend_url if get_backend_url(): available, status = check_backend_available() return available, status except: pass return False, {} @st.cache_resource def get_unified_rag_system(): """ Initialize and return the unified RAG system. This is cached at the Streamlit level so all pages share the same instance. Priority: 1. Backend API (GPU server) - if BACKEND_URL is configured 2. Local Ollama - if running locally 3. Cloud LLM providers - if API keys configured 4. Demo mode - no backend available """ # Check for required dependencies first try: import pydantic except ImportError: return { "status": "error", "error": "Required dependency 'pydantic' is not installed.", "rag": None, "store": None, "embedder": None, "mode": "error", } # Check backend API first (GPU server) backend_ok, backend_status = check_backend() if backend_ok: return { "status": "ready", "error": None, "rag": None, # Use backend API instead "store": None, "embedder": None, "mode": "backend", "backend_status": backend_status, "ollama_available": backend_status.get("ollama_available", False), "gpu_available": backend_status.get("gpu_available", False), "gpu_name": backend_status.get("gpu_name"), "embed_model": backend_status.get("embedding_model", "backend"), "llm_model": backend_status.get("llm_model", "backend"), "indexed_chunks": backend_status.get("indexed_chunks", 0), } # Check Ollama availability ollama_ok, available_models = check_ollama() # Check cloud providers cloud_providers = check_cloud_providers() if ollama_ok: # Use Ollama for full RAG functionality try: from src.rag.agentic import AgenticRAG, RAGConfig from src.rag.store import get_vector_store, VectorStoreConfig, reset_vector_store from src.rag.embeddings import get_embedding_adapter, EmbeddingConfig, reset_embedding_adapter # Select models embed_model = select_model(available_models, EMBEDDING_MODELS) llm_model = select_model(available_models, LLM_MODELS) # Reset singletons to ensure fresh config reset_vector_store() reset_embedding_adapter() # Initialize embedding adapter embed_config = EmbeddingConfig( ollama_model=embed_model, ollama_base_url=OLLAMA_BASE_URL, ) embedder = get_embedding_adapter(config=embed_config) # Initialize vector store store_config = VectorStoreConfig( persist_directory=VECTOR_STORE_PATH, collection_name=COLLECTION_NAME, similarity_threshold=0.0, ) store = get_vector_store(config=store_config) # Initialize RAG config rag_config = RAGConfig( model=llm_model, base_url=OLLAMA_BASE_URL, max_revision_attempts=1, enable_query_planning=True, enable_reranking=True, enable_validation=True, retrieval_top_k=10, final_top_k=5, min_confidence=0.3, verbose=False, ) # Initialize RAG system rag = AgenticRAG( config=rag_config, vector_store=store, embedding_adapter=embedder, ) return { "status": "ready", "error": None, "rag": rag, "store": store, "embedder": embedder, "embed_model": embed_model, "llm_model": llm_model, "available_models": available_models, "mode": "ollama", } except Exception as e: import traceback return { "status": "error", "error": f"Ollama RAG init failed: {str(e)}", "rag": None, "store": None, "embedder": None, "mode": "error", } elif cloud_providers: # Cloud mode - use cloud LLM providers # RAG with vector store requires local processing # but we can still do basic document Q&A with cloud LLMs return { "status": "cloud", "error": None, "rag": None, "store": None, "embedder": None, "mode": "cloud", "providers": list(cloud_providers.keys()), "message": "Running in cloud mode. Document Q&A available via cloud LLM providers.", } else: # No backend available return { "status": "demo", "error": "No LLM backend configured. Add API keys to secrets.toml or start Ollama.", "rag": None, "store": None, "embedder": None, "mode": "demo", } def get_store_stats(): """Get current vector store statistics.""" system = get_unified_rag_system() # Use backend status if available if system["mode"] == "backend": return { "total_chunks": system.get("indexed_chunks", 0), "status": "ready", "mode": "backend", "embed_model": system.get("embed_model", "backend"), "llm_model": system.get("llm_model", "backend"), "gpu_available": system.get("gpu_available", False), "gpu_name": system.get("gpu_name"), } if system["mode"] == "cloud": return { "total_chunks": 0, "status": "cloud", "message": "Cloud mode - indexing requires backend or Ollama", } if system["status"] != "ready": return {"total_chunks": 0, "status": "error"} try: return { "total_chunks": system["store"].count(), "status": "ready", "embed_model": system.get("embed_model", "unknown"), "llm_model": system.get("llm_model", "unknown"), } except: return {"total_chunks": 0, "status": "error"} def index_document(text: str, document_id: str, metadata: dict = None) -> dict: """Index a document into the unified RAG system.""" system = get_unified_rag_system() # Use backend API if available if system["mode"] == "backend": try: from backend_client import get_backend_client client = get_backend_client() # Simple chunking for backend indexing chunk_size = 500 overlap = 50 chunks = [] for i in range(0, len(text), chunk_size - overlap): chunk_text = text[i:i + chunk_size] if len(chunk_text.strip()) > 20: chunks.append({ "chunk_id": f"{document_id}_chunk_{len(chunks)}", "text": chunk_text, "page": 0, }) result = client.index_document(document_id, text, chunks, metadata) if result.success: return {"success": True, "num_chunks": result.data.get("num_chunks", 0), "error": None} else: return {"success": False, "error": result.error, "num_chunks": 0} except Exception as e: return {"success": False, "error": str(e), "num_chunks": 0} if system["mode"] == "cloud": return {"success": False, "error": "Indexing requires backend or Ollama", "num_chunks": 0} if system["status"] != "ready": return {"success": False, "error": system.get("error", "RAG not ready"), "num_chunks": 0} try: num_chunks = system["rag"].index_text( text=text, document_id=document_id, metadata=metadata or {}, ) return {"success": True, "num_chunks": num_chunks, "error": None} except Exception as e: return {"success": False, "error": str(e), "num_chunks": 0} def query_rag(question: str, filters: dict = None): """Query the unified RAG system.""" system = get_unified_rag_system() # Use backend API if available if system["mode"] == "backend": try: from backend_client import get_backend_client client = get_backend_client() result = client.query(question, filters=filters) if result.success: data = result.data # Create a response object-like dict return type('RAGResponse', (), { 'answer': data.get('answer', ''), 'citations': [ type('Citation', (), { 'index': s.get('index', i+1), 'text_snippet': s.get('text_snippet', ''), 'relevance_score': s.get('relevance_score', 0), 'document_id': s.get('document_id', ''), 'page': s.get('page', 0), })() for i, s in enumerate(data.get('sources', [])) ], 'confidence': data.get('confidence', 0), 'latency_ms': data.get('latency_ms', 0), 'num_sources': len(data.get('sources', [])), 'validated': data.get('validated', False), })(), None else: return None, result.error except Exception as e: return None, str(e) if system["mode"] == "cloud": # Use cloud LLM for Q&A from llm_providers import generate_response response, error = generate_response(question) if error: return None, error return {"answer": response, "sources": [], "mode": "cloud"}, None if system["status"] != "ready": return None, system.get("error", "RAG not ready") try: response = system["rag"].query(question, filters=filters) return response, None except Exception as e: return None, str(e) def clear_index(): """Clear the vector store index.""" get_unified_rag_system.clear() return True def get_indexed_documents() -> list: """Get list of indexed document IDs from vector store.""" system = get_unified_rag_system() # Use backend API if available if system["mode"] == "backend": try: from backend_client import get_backend_client client = get_backend_client() result = client.list_documents() if result.success: docs = result.data.get("documents", []) return [ { "document_id": d.get("doc_id", d.get("document_id", "")), "source_path": d.get("filename", ""), "chunk_count": d.get("chunk_count", 0), } for d in docs ] except: pass return [] if system["status"] != "ready": return [] try: store = system["store"] collection = store._collection results = collection.get(include=["metadatas"]) if not results or not results.get("metadatas"): return [] doc_info = {} for meta in results["metadatas"]: doc_id = meta.get("document_id", "unknown") if doc_id not in doc_info: doc_info[doc_id] = { "document_id": doc_id, "source_path": meta.get("source_path", ""), "chunk_count": 0, } doc_info[doc_id]["chunk_count"] += 1 return list(doc_info.values()) except Exception as e: return [] def get_chunks_for_document(document_id: str) -> list: """Get all chunks for a specific document.""" system = get_unified_rag_system() if system["status"] != "ready": return [] try: store = system["store"] collection = store._collection results = collection.get( where={"document_id": document_id}, include=["documents", "metadatas"] ) if not results or not results.get("ids"): return [] chunks = [] for i, chunk_id in enumerate(results["ids"]): chunks.append({ "chunk_id": chunk_id, "text": results["documents"][i] if results.get("documents") else "", "metadata": results["metadatas"][i] if results.get("metadatas") else {}, }) return chunks except Exception as e: return [] def search_similar_chunks(query: str, top_k: int = 5, doc_filter: str = None): """Search for similar chunks with optional document filter.""" system = get_unified_rag_system() # Use backend API if available if system["mode"] == "backend": try: from backend_client import get_backend_client client = get_backend_client() result = client.search_similar(query, top_k, doc_filter) if result.success: return result.data.get("results", []) except: pass return [] if system["status"] != "ready": return [] try: embedder = system["embedder"] store = system["store"] query_embedding = embedder.embed_text(query) filters = None if doc_filter: filters = {"document_id": doc_filter} results = store.search( query_embedding=query_embedding, top_k=top_k, filters=filters, ) return [ { "chunk_id": r.chunk_id, "document_id": r.document_id, "text": r.text, "similarity": r.similarity, "page": r.page, "metadata": r.metadata, } for r in results ] except Exception as e: return [] def compute_document_similarity(doc_id_1: str, doc_id_2: str) -> dict: """Compute semantic similarity between two documents.""" system = get_unified_rag_system() if system["status"] != "ready": return {"error": "RAG system not ready", "similarity": 0.0} try: chunks_1 = get_chunks_for_document(doc_id_1) chunks_2 = get_chunks_for_document(doc_id_2) if not chunks_1 or not chunks_2: return {"error": "One or both documents not found", "similarity": 0.0} embedder = system["embedder"] def avg_embedding(chunks): embeddings = [] for chunk in chunks[:10]: emb = embedder.embed_text(chunk["text"]) embeddings.append(emb) if not embeddings: return None import numpy as np return np.mean(embeddings, axis=0).tolist() emb1 = avg_embedding(chunks_1) emb2 = avg_embedding(chunks_2) if emb1 is None or emb2 is None: return {"error": "Could not compute embeddings", "similarity": 0.0} import numpy as np emb1 = np.array(emb1) emb2 = np.array(emb2) similarity = np.dot(emb1, emb2) / (np.linalg.norm(emb1) * np.linalg.norm(emb2)) return { "similarity": float(similarity), "doc1_chunks": len(chunks_1), "doc2_chunks": len(chunks_2), "error": None, } except Exception as e: return {"error": str(e), "similarity": 0.0} def auto_index_processed_document(doc_id: str, text: str, chunks: list, metadata: dict = None): """ Auto-index a processed document with pre-computed chunks. """ system = get_unified_rag_system() # Use backend API if available if system["mode"] == "backend": try: from backend_client import get_backend_client client = get_backend_client() result = client.index_document(doc_id, text, chunks, metadata) if result.success: return {"success": True, "num_chunks": result.data.get("num_chunks", 0), "error": None} else: return {"success": False, "error": result.error, "num_chunks": 0} except Exception as e: return {"success": False, "error": str(e), "num_chunks": 0} if system["mode"] == "cloud": return {"success": False, "error": "Indexing requires backend or Ollama", "num_chunks": 0} if system["status"] != "ready": return {"success": False, "error": "RAG system not ready", "num_chunks": 0} try: store = system["store"] embedder = system["embedder"] chunk_dicts = [] embeddings = [] for i, chunk in enumerate(chunks): chunk_text = chunk.get("text", chunk) if isinstance(chunk, dict) else chunk if len(chunk_text.strip()) < 20: continue chunk_id = f"{doc_id}_chunk_{i}" chunk_dict = { "chunk_id": chunk_id, "document_id": doc_id, "text": chunk_text, "page": chunk.get("page", 0) if isinstance(chunk, dict) else 0, "chunk_type": "text", "source_path": metadata.get("filename", "") if metadata else "", "sequence_index": i, } chunk_dicts.append(chunk_dict) embedding = embedder.embed_text(chunk_text) embeddings.append(embedding) if not chunk_dicts: return {"success": False, "error": "No valid chunks to index", "num_chunks": 0} store.add_chunks(chunk_dicts, embeddings) return {"success": True, "num_chunks": len(chunk_dicts), "error": None} except Exception as e: return {"success": False, "error": str(e), "num_chunks": 0}