""" 🤖 Intelligent Multi-Source Knowledge Orchestration System (IMSKOS) ================================================================ Advanced Agentic RAG Framework with Dynamic Routing & Distributed Vector Storage An enterprise-grade, production-ready intelligent query routing system that leverages: - LangGraph for stateful workflow orchestration - DataStax Astra DB for distributed vector storage - Groq LLM for high-performance inference - Adaptive routing between proprietary knowledge base and Wikipedia - Real-time semantic search with HuggingFace embeddings """ import streamlit as st import os from typing import List, Dict, Any, Optional from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() # Set USER_AGENT to suppress warnings from web loaders if not os.getenv("USER_AGENT"): os.environ["USER_AGENT"] = "IMSKOS/1.0 (Intelligent Multi-Source Knowledge Orchestration System)" # Compatibility shim for different typing.ForwardRef._evaluate signatures # ------------------------------------------------------------ # Some Python/typing/pydantic versions expect ForwardRef._evaluate to accept # recursive_guard as a keyword-only argument, while other versions accept it # positionally. When a third-party library calls ForwardRef._evaluate using the # older calling convention, it can raise: # TypeError: ForwardRef._evaluate() missing 1 required keyword-only argument: 'recursive_guard' # # This shim wraps/monkeypatches typing.ForwardRef._evaluate so it accepts both # calling conventions. It should be safe and only applied at import time. try: from typing import ForwardRef as _ForwardRef _orig_forwardref_evaluate = getattr(_ForwardRef, "_evaluate", None) if _orig_forwardref_evaluate is not None: def _evaluate_compat(self, globalns, localns, *args, **kwargs): """ Compatibility wrapper that attempts to call the original _evaluate with whatever args/kwargs were passed. If a TypeError occurs (typical when the underlying implementation requires recursive_guard as keyword-only), call the original with recursive_guard provided as a keyword using the first positional arg if available or an empty set. """ try: return _orig_forwardref_evaluate(self, globalns, localns, *args, **kwargs) except TypeError: # Older callers passed recursive_guard positionally; newer # implementations require recursive_guard as a keyword-only arg. recursive_guard = args[0] if args else set() return _orig_forwardref_evaluate(self, globalns, localns, recursive_guard=recursive_guard) # Monkeypatch the ForwardRef implementation with the compatibility wrapper _ForwardRef._evaluate = _evaluate_compat except Exception: # If anything goes wrong here, do not prevent app import — let original # behavior surface later (so the original error will be visible). pass # ------------------------------------------------------------ import cassio from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_community.document_loaders import WebBaseLoader from langchain_community.vectorstores import Cassandra from langchain_huggingface import HuggingFaceEmbeddings from langchain_community.utilities import WikipediaAPIWrapper from langchain_community.tools import WikipediaQueryRun from langchain_core.prompts import ChatPromptTemplate from langchain_groq import ChatGroq from langchain_core.documents import Document from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import RunnablePassthrough from langgraph.graph import END, StateGraph, START from typing_extensions import TypedDict from pydantic import BaseModel, Field from typing import Literal import time import json from datetime import datetime import traceback # Page Configuration st.set_page_config( page_title="IMSKOS - Intelligent Knowledge Orchestration", page_icon="🧠", layout="wide", initial_sidebar_state="expanded" ) # Custom CSS for modern UI st.markdown(""" """, unsafe_allow_html=True) # ==================== Configuration & Initialization ==================== class Config: """Centralized configuration management""" @staticmethod def load_env_variables(): """Load and validate environment variables from multiple sources Priority order: 1. Streamlit secrets (for Streamlit Cloud / HuggingFace Spaces) 2. Environment variables (for local development / Docker) """ def get_secret(key: str) -> Optional[str]: """Get secret from Streamlit secrets or environment variables""" # First check Streamlit secrets (works on HuggingFace Spaces) try: if hasattr(st, 'secrets') and key in st.secrets: return st.secrets[key] except Exception: pass # Fall back to environment variables return os.getenv(key) required_vars = { "ASTRA_DB_APPLICATION_TOKEN": get_secret("ASTRA_DB_APPLICATION_TOKEN"), "ASTRA_DB_ID": get_secret("ASTRA_DB_ID"), "GROQ_API_KEY": get_secret("GROQ_API_KEY") } missing_vars = [key for key, value in required_vars.items() if not value] if missing_vars: st.error(f"⚠️ Missing environment variables: {', '.join(missing_vars)}") st.info(""" **Setup Instructions:** 1. **Local Development:** Create a `.env` file with your credentials 2. **Streamlit Cloud:** Add secrets in the app settings Required variables: - `ASTRA_DB_APPLICATION_TOKEN` - Get from [DataStax Astra](https://astra.datastax.com) - `ASTRA_DB_ID` - Your Astra DB database ID - `GROQ_API_KEY` - Get from [Groq Console](https://console.groq.com) """) st.stop() return required_vars @staticmethod def get_default_urls(): """Default knowledge base URLs""" return [ "https://lilianweng.github.io/posts/2023-06-23-agent/", "https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/", "https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/", ] # ==================== State Management Classes ==================== class RouteQuery(BaseModel): """Pydantic model for query routing decisions""" datasource: Literal["vectorstore", "wiki_search"] = Field( ..., description="Route query to wikipedia or vectorstore based on content", ) class GraphState(TypedDict): """LangGraph state schema""" question: str generation: str documents: List[str] route: str # ==================== Core System Classes ==================== class KnowledgeBaseManager: """Manages document ingestion and vector storage""" def __init__(self, astra_token: str, astra_db_id: str): self.astra_token = astra_token self.astra_db_id = astra_db_id self.embeddings = None self.vector_store = None def initialize_cassandra(self): """Initialize Cassandra connection""" cassio.init(token=self.astra_token, database_id=self.astra_db_id) def setup_embeddings(self): """Initialize HuggingFace embeddings""" self.embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") def load_and_process_documents(self, urls: List[str], progress_callback=None): """Load, split, and index documents""" if progress_callback: progress_callback("Loading documents from URLs...") docs = [] for i, url in enumerate(urls): try: loader = WebBaseLoader(url) docs.extend(loader.load()) if progress_callback: progress_callback(f"Loaded {i+1}/{len(urls)} documents") except Exception as e: st.warning(f"Failed to load {url}: {str(e)}") if progress_callback: progress_callback("Splitting documents into chunks...") text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder( chunk_size=500, chunk_overlap=50 ) doc_splits = text_splitter.split_documents(docs) return doc_splits def create_vector_store(self): """Initialize Astra DB vector store""" self.vector_store = Cassandra( embedding=self.embeddings, table_name="intelligent_knowledge_base", session=None, keyspace=None ) return self.vector_store def add_documents(self, documents: List[Document], progress_callback=None): """Add documents to vector store""" if progress_callback: progress_callback("Indexing documents in Astra DB...") self.vector_store.add_documents(documents) if progress_callback: progress_callback(f"Successfully indexed {len(documents)} document chunks") class IntelligentRouter: """LLM-powered query router""" def __init__(self, groq_api_key: str): self.groq_api_key = groq_api_key self.llm = None self.question_router = None self.generation_chain = None def initialize(self): """Set up LLM and routing chain""" self.llm = ChatGroq( groq_api_key=self.groq_api_key, model_name="llama-3.1-8b-instant", temperature=0 ) structured_llm = self.llm.with_structured_output(RouteQuery) system_prompt = """You are an expert at routing user questions to the most relevant data source. The vectorstore contains specialized documents about: - AI Agents and their architectures - Prompt Engineering techniques and best practices - Adversarial attacks on Large Language Models - Machine Learning security concepts Route to 'vectorstore' for questions about these topics. Route to 'wiki_search' for general knowledge, current events, people, places, or topics outside the vectorstore domain. Be precise in your routing decisions.""" route_prompt = ChatPromptTemplate.from_messages([ ("system", system_prompt), ("human", "{question}"), ]) self.question_router = route_prompt | structured_llm # Set up generation chain for synthesizing answers generation_prompt = ChatPromptTemplate.from_messages([ ("system", """You are a helpful AI assistant specialized in providing accurate, informative answers. Use the following retrieved context to answer the user's question. If the context doesn't contain relevant information, say so and provide general guidance. Be concise but comprehensive. Use bullet points for clarity when appropriate. Context: {context}"""), ("human", "{question}") ]) self.generation_chain = generation_prompt | self.llm | StrOutputParser() def route(self, question: str) -> str: """Route question to appropriate data source""" result = self.question_router.invoke({"question": question}) return result.datasource def generate_response(self, question: str, documents: List[Document]) -> str: """Generate a coherent response from retrieved documents""" # Format documents into context string if isinstance(documents, list): context = "\n\n".join([ f"Document {i+1}:\n{doc.page_content}" for i, doc in enumerate(documents[:5]) ]) else: context = documents.page_content if hasattr(documents, 'page_content') else str(documents) response = self.generation_chain.invoke({ "context": context, "question": question }) return response class AdaptiveRAGWorkflow: """LangGraph-based adaptive retrieval workflow""" def __init__(self, vector_store, router: IntelligentRouter): self.vector_store = vector_store self.router = router self.retriever = vector_store.as_retriever(search_kwargs={"k": 4}) self.wiki = self._setup_wikipedia() self.workflow = None self.app = None def _setup_wikipedia(self): """Initialize Wikipedia search tool""" api_wrapper = WikipediaAPIWrapper( top_k_results=2, doc_content_chars_max=1000 ) return WikipediaQueryRun(api_wrapper=api_wrapper) def retrieve(self, state: Dict) -> Dict: """Retrieve from vector store""" question = state["question"] documents = self.retriever.invoke(question) return {"documents": documents, "question": question, "route": "vectorstore"} def wiki_search(self, state: Dict) -> Dict: """Search Wikipedia""" question = state["question"] try: docs = self.wiki.invoke({"query": question}) wiki_results = Document(page_content=docs) except Exception as e: wiki_results = Document(page_content=f"Wikipedia search returned no results for this query. Error: {str(e)}") return {"documents": [wiki_results], "question": question, "route": "wikipedia"} def generate(self, state: Dict) -> Dict: """Generate response from retrieved documents""" question = state["question"] documents = state["documents"] # Use the router's generation chain to create a response generation = self.router.generate_response(question, documents) return { "question": question, "documents": documents, "generation": generation, "route": state.get("route", "unknown") } def route_question(self, state: Dict) -> str: """Route based on question type""" question = state["question"] source = self.router.route(question) if source == "wiki_search": return "wiki_search" else: return "vectorstore" def build_graph(self): """Construct LangGraph workflow""" workflow = StateGraph(GraphState) # Add nodes workflow.add_node("wiki_search", self.wiki_search) workflow.add_node("retrieve", self.retrieve) workflow.add_node("generate", self.generate) # Add conditional edges from START workflow.add_conditional_edges( START, self.route_question, { "wiki_search": "wiki_search", "vectorstore": "retrieve", }, ) # Both retrieval paths lead to generation workflow.add_edge("retrieve", "generate") workflow.add_edge("wiki_search", "generate") # Generation leads to END workflow.add_edge("generate", END) self.app = workflow.compile() def execute(self, question: str) -> Dict[str, Any]: """Execute workflow and return results""" inputs = {"question": question} result = { "route": None, "documents": [], "generation": "", "execution_time": 0 } start_time = time.time() try: for output in self.app.stream(inputs): for key, value in output.items(): if key == "generate": result["generation"] = value.get("generation", "") result["route"] = value.get("route", "unknown") result["documents"] = value.get("documents", []) elif key in ["retrieve", "wiki_search"]: result["route"] = value.get("route", key) result["documents"] = value.get("documents", []) except Exception as e: result["generation"] = f"Error executing query: {str(e)}" result["route"] = "error" result["execution_time"] = time.time() - start_time return result # ==================== Streamlit UI ==================== def render_header(): """Render application header""" st.markdown('
Intelligent Multi-Source Knowledge Orchestration System
', unsafe_allow_html=True ) st.markdown("---") def render_sidebar(): """Render sidebar with configuration and info""" with st.sidebar: st.image("https://img.icons8.com/fluency/96/000000/artificial-intelligence.png", width=100) st.title("⚙️ System Configuration") st.markdown("### 🔧 Core Technologies") st.markdown(""" - **LangGraph**: Stateful workflow orchestration - **Astra DB**: Distributed vector storage - **Groq**: High-performance LLM inference - **HuggingFace**: Semantic embeddings """) st.markdown("---") st.markdown("### 📊 System Capabilities") st.markdown(""" ✅ Adaptive query routing ✅ Real-time semantic search ✅ Multi-source knowledge fusion ✅ Scalable vector operations ✅ Enterprise-grade architecture """) st.markdown("---") st.markdown("### 🎯 Use Cases") st.markdown(""" - AI/ML Research Assistance - Technical Documentation Q&A - Competitive Intelligence - Knowledge Base Management """) return st.button("🔄 Reset System", use_container_width=True) def initialize_system(): """Initialize all system components""" if 'initialized' not in st.session_state: with st.spinner("🚀 Initializing Intelligent Knowledge Orchestration System..."): try: # Load configuration config = Config.load_env_variables() # Initialize Knowledge Base Manager kb_manager = KnowledgeBaseManager( config["ASTRA_DB_APPLICATION_TOKEN"], config["ASTRA_DB_ID"] ) kb_manager.initialize_cassandra() kb_manager.setup_embeddings() # Initialize Router router = IntelligentRouter(config["GROQ_API_KEY"]) router.initialize() # Store in session state st.session_state.kb_manager = kb_manager st.session_state.router = router st.session_state.initialized = True st.session_state.documents_indexed = False st.success("✅ System initialized successfully!") except Exception as e: st.error(f"❌ Initialization failed: {str(e)}") st.stop() def render_indexing_tab(): """Render document indexing interface""" st.header("📚 Knowledge Base Indexing") st.markdown("""IMSKOS v1.0 | Built with LangGraph, Astra DB, and Groq
Enterprise-Grade Intelligent Knowledge Orchestration