import os import re import time import requests import pandas as pd import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt import gradio as gr from bs4 import BeautifulSoup from langchain_core.documents import Document from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_huggingface import HuggingFaceEmbeddings from langchain_community.vectorstores import Chroma from rank_bm25 import BM25Okapi from sentence_transformers import CrossEncoder PERSIST_DIR = "k8s_chroma_db" URLS = { "pods": "https://kubernetes.io/docs/concepts/workloads/pods/", "deployments": "https://kubernetes.io/docs/concepts/workloads/controllers/deployment/", "services": "https://kubernetes.io/docs/concepts/services-networking/service/", "namespaces": "https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/", "nodes": "https://kubernetes.io/docs/concepts/architecture/nodes/", "statefulsets": "https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/", "rbac": "https://kubernetes.io/docs/reference/access-authn-authz/rbac/", "persistent-volumes": "https://kubernetes.io/docs/concepts/storage/persistent-volumes/", "ingress": "https://kubernetes.io/docs/concepts/services-networking/ingress/", "autoscaling": "https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/", } # ================= Knowledge Base ================= # def scrape_page(name, url): try: response = requests.get(url, timeout=20) response.raise_for_status() soup = BeautifulSoup(response.text, "html.parser") content = soup.find("div", class_="td-content") if not content: return None text = content.get_text(separator="\n").strip() return Document(page_content=text, metadata={"doc_id": name, "url": url}) except Exception as e: print(f"[ERROR] scraping {url}: {e}") return None def build_or_load_kb(): print("[INFO] Loading embedding model...") embedding_model = HuggingFaceEmbeddings( model_name="sentence-transformers/all-MiniLM-L6-v2" ) if os.path.isdir(PERSIST_DIR): print("[INFO] Loading existing vector DB...") vectordb = Chroma( embedding_function=embedding_model, persist_directory=PERSIST_DIR, ) raw = vectordb._collection.get(include=["documents", "metadatas"]) chunks = [ Document(page_content=d, metadata=m) for d, m in zip(raw["documents"], raw["metadatas"]) ] return vectordb, chunks print("[INFO] No DB found — scraping docs...") docs = [] for name, url in URLS.items(): doc = scrape_page(name, url) if doc: docs.append(doc) print(f"[INFO] Scraped {len(docs)} docs") splitter = RecursiveCharacterTextSplitter(chunk_size=900, chunk_overlap=200) chunks = splitter.split_documents(docs) vectordb = Chroma.from_documents(chunks, embedding_model, persist_directory=PERSIST_DIR) vectordb.persist() print("[INFO] Vector DB built & saved.") return vectordb, chunks vectordb, chunks = build_or_load_kb() # ================= Search & Reranker ================= # bm25 = BM25Okapi([c.page_content.split() for c in chunks]) reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-12-v2") retriever = vectordb.as_retriever( search_type="similarity_score_threshold", search_kwargs={"k": 8, "score_threshold": 0.35}, ) def hybrid_search(query, top_k=5): vector_results = retriever.invoke(query) bm_scores = bm25.get_scores(query.lower().split()) bm_ranked = sorted(zip(bm_scores, chunks), reverse=True) bm_results = [doc for _, doc in bm_ranked[:top_k]] unique_docs = [] seen = set() for doc in vector_results + bm_results: key = (doc.metadata.get("doc_id"), doc.page_content[:50]) if key not in seen: seen.add(key) unique_docs.append(doc) if not unique_docs: return [] rerank_pairs = [(query, doc.page_content) for doc in unique_docs] scores = reranker.predict(rerank_pairs) ranked = sorted(zip(scores, unique_docs), reverse=True)[:top_k] for s, doc in ranked: doc.metadata["rerank_score"] = float(s) return [doc for _, doc in ranked] # ================= LLM ================= # def call_llm(prompt): api_key = os.getenv("OPENROUTER_API_KEY") if not api_key: return "⚠️ Missing API key.\nGroundedness: 0%" try: res = requests.post( "https://openrouter.ai/api/v1/chat/completions", headers={ "Authorization": f"Bearer {api_key}", "HTTP-Referer": "https://huggingface.co/", "X-Title": "Kubernetes RAG Assistant", }, json={ "model": "meta-llama/llama-3.1-8b-instruct", "messages": [{"role": "user", "content": prompt}], "temperature": 0.0, "max_tokens": 400, }, ) res.raise_for_status() data = res.json() return data["choices"][0]["message"]["content"] except Exception as e: return f"⚠️ LLM Error: {e}\nGroundedness: 0%" # ================= Analytics ================= # def classify_query(q): q = q.lower() if "how" in q: return "how-to" if "error" in q or "fail" in q: return "debug" return "general" METRICS = {"q": [], "lat": [], "tok": [], "g": [], "r": [], "c": [], "t": []} # ================= Chat Handler ================= # def answer_question(query, history): start = time.time() docs = hybrid_search(query) if not docs: reply = "Not found in docs.\nGroundedness: 0%" return history + [ {"role": "user", "content": query}, {"role": "assistant", "content": reply} ], "" ctx = "" sources = [] scores = [] for i, d in enumerate(docs, 1): label = f"[{i}]" ctx += f"{label} {d.page_content[:900]}\nSource: {d.metadata['url']}\n\n" sources.append(f"{label} → {d.metadata['url']}") scores.append(d.metadata["rerank_score"]) prompt = f""" Answer the question ONLY using the context below. Each sentence MUST end with a citation like [1]. Question: {query} Context: {ctx} End with: Groundedness: XX% """ answer = call_llm(prompt) latency = time.time() - start grounded = 0 m = re.search(r"Groundedness:\s*(\d+)%", answer) if m: grounded = int(m.group(1")) cites = len(set(re.findall(r"\[(\d+)\]", answer))) avg_score = sum(scores) / len(scores) if scores else 0 final = answer + "\n\n---\nSources:\n" + "\n".join(sources) METRICS["q"].append(query) METRICS["lat"].append(latency) METRICS["tok"].append(len(answer.split())) METRICS["g"].append(grounded) METRICS["r"].append(avg_score) METRICS["c"].append(cites) METRICS["t"].append(classify_query(query)) history.append({"role": "user", "content": query}) history.append({"role": "assistant", "content": final}) return history, "" def update_dashboard(): rows = list(zip( range(1, len(METRICS["q"])+1), METRICS["q"], METRICS["lat"], METRICS["tok"], METRICS["g"], METRICS["r"], METRICS["c"], METRICS["t"], )) avgG = round(sum(METRICS["g"]) / len(METRICS["g"]), 2) avgL = round(sum(METRICS["lat"]) / len(METRICS["lat"]), 2) avgT = round(sum(METRICS["tok"]) / len(METRICS["tok"]), 2) return rows, avgG, avgL, avgT # ================= UI ================= # with gr.Blocks(title="Kubernetes RAG Assistant") as app: gr.Markdown("# ☸ Kubernetes RAG Assistant") with gr.Tab("Chat"): chat = gr.Chatbot(height=450) user_in = gr.Textbox(label="Ask anything about Kubernetes") reset = gr.Button("Reset") user_in.submit(answer_question, [user_in, chat], [chat, user_in]) reset.click(lambda: ([], ""), None, [chat, user_in]) with gr.Tab("Analytics"): gr.Markdown("### 📊 Analytics This Session") table = gr.DataFrame(headers=[ "ID","Query","Latency","Tokens","Grounded","Rerank","Citations","Type" ], interactive=False) avgG = gr.Number(label="Avg Groundedness") avgL = gr.Number(label="Avg Latency") avgT = gr.Number(label="Avg Tokens") refresh = gr.Button("Refresh") refresh.click(update_dashboard, None, [table, avgG, avgL, avgT]) app.launch()