Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import time | |
| import requests | |
| import pandas as pd | |
| import matplotlib | |
| matplotlib.use("Agg") | |
| import matplotlib.pyplot as plt | |
| import gradio as gr | |
| from bs4 import BeautifulSoup | |
| from langchain_core.documents import Document | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from langchain_community.vectorstores import Chroma | |
| from rank_bm25 import BM25Okapi | |
| from sentence_transformers import CrossEncoder | |
| PERSIST_DIR = "k8s_chroma_db" | |
| URLS = { | |
| "pods": "https://kubernetes.io/docs/concepts/workloads/pods/", | |
| "deployments": "https://kubernetes.io/docs/concepts/workloads/controllers/deployment/", | |
| "services": "https://kubernetes.io/docs/concepts/services-networking/service/", | |
| "namespaces": "https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/", | |
| "nodes": "https://kubernetes.io/docs/concepts/architecture/nodes/", | |
| "statefulsets": "https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/", | |
| "rbac": "https://kubernetes.io/docs/reference/access-authn-authz/rbac/", | |
| "persistent-volumes": "https://kubernetes.io/docs/concepts/storage/persistent-volumes/", | |
| "ingress": "https://kubernetes.io/docs/concepts/services-networking/ingress/", | |
| "autoscaling": "https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/", | |
| } | |
| # ----------------- SCRAPING + KB ----------------- # | |
| def scrape_page(name, url): | |
| try: | |
| r = requests.get(url, timeout=20) | |
| r.raise_for_status() | |
| soup = BeautifulSoup(r.text, "html.parser") | |
| content = soup.find("div", class_="td-content") | |
| if not content: | |
| return None | |
| text = content.get_text(separator="\n").strip() | |
| return Document(page_content=text, metadata={"doc_id": name, "url": url}) | |
| except Exception as e: | |
| print(f"[ERROR] scraping {url}: {e}") | |
| return None | |
| def build_or_load_kb(): | |
| embedding_model = HuggingFaceEmbeddings( | |
| model_name="sentence-transformers/all-MiniLM-L6-v2" | |
| ) | |
| # If DB exists, load it | |
| if os.path.isdir(PERSIST_DIR): | |
| print("[INFO] Loading existing Chroma DB") | |
| vectordb = Chroma( | |
| embedding_function=embedding_model, | |
| persist_directory=PERSIST_DIR, | |
| ) | |
| raw = vectordb._collection.get(include=["documents", "metadatas"]) | |
| chunks = [ | |
| Document(page_content=doc, metadata=meta) | |
| for doc, meta in zip(raw["documents"], raw["metadatas"]) | |
| ] | |
| return vectordb, chunks | |
| # Else: scrape + build | |
| print("[INFO] No DB found, scraping docs...") | |
| docs = [] | |
| for name, url in URLS.items(): | |
| d = scrape_page(name, url) | |
| if d: | |
| docs.append(d) | |
| print(f"[INFO] Scraped {len(docs)} docs") | |
| splitter = RecursiveCharacterTextSplitter(chunk_size=900, chunk_overlap=200) | |
| chunks = splitter.split_documents(docs) | |
| vectordb = Chroma.from_documents( | |
| chunks, embedding_model, persist_directory=PERSIST_DIR | |
| ) | |
| return vectordb, chunks | |
| vectordb, chunks = build_or_load_kb() | |
| # ----------------- HYBRID SEARCH ----------------- # | |
| bm25_corpus = [doc.page_content.split() for doc in chunks] | |
| bm25 = BM25Okapi(bm25_corpus) | |
| reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-12-v2") | |
| retriever = vectordb.as_retriever( | |
| search_type="similarity_score_threshold", | |
| search_kwargs={"k": 8, "score_threshold": 0.35}, | |
| ) | |
| def hybrid_search(query, top_k=5): | |
| vector_results = retriever.invoke(query) | |
| tokenized_query = query.lower().split() | |
| bm25_scores = bm25.get_scores(tokenized_query) | |
| bm25_ranked = sorted(zip(bm25_scores, chunks), key=lambda x: x[0], reverse=True) | |
| bm25_results = [d for _, d in bm25_ranked[:top_k]] | |
| combined = vector_results + bm25_results | |
| seen = set() | |
| unique = [] | |
| for d in combined: | |
| key = (d.metadata.get("doc_id"), d.page_content[:80]) | |
| if key not in seen: | |
| seen.add(key) | |
| unique.append(d) | |
| if not unique: | |
| return [] | |
| pairs = [(query, doc.page_content) for doc in unique] | |
| scores = reranker.predict(pairs) | |
| ranked = sorted(zip(scores, unique), key=lambda x: x[0], reverse=True)[:top_k] | |
| for s, doc in ranked: | |
| doc.metadata["rerank_score"] = float(s) | |
| return [doc for _, doc in ranked] | |
| # ----------------- LLM CALL ----------------- # | |
| def call_llm(prompt: str) -> str: | |
| api_key = os.getenv("OPENROUTER_API_KEY") | |
| if not api_key: | |
| return "⚠️ Missing OPENROUTER_API_KEY in Space secrets.\nGroundedness: 0%" | |
| try: | |
| r = requests.post( | |
| "https://openrouter.ai/api/v1/chat/completions", | |
| headers={ | |
| "Authorization": f"Bearer {api_key}", | |
| "HTTP-Referer": "https://huggingface.co/", | |
| "X-Title": "Kubernetes RAG Assistant", | |
| }, | |
| json={ | |
| "model": "meta-llama/llama-3.1-8b-instruct", | |
| "messages": [{"role": "user", "content": prompt}], | |
| "max_tokens": 400, | |
| "temperature": 0.0, | |
| }, | |
| timeout=60, | |
| ) | |
| r.raise_for_status() | |
| data = r.json() | |
| return data["choices"][0]["message"]["content"] | |
| except Exception as e: | |
| print("[ERROR] LLM:", e) | |
| return f"⚠️ LLM error: {e}\nGroundedness: 0%" | |
| # ----------------- CONTEXT + METRICS ----------------- # | |
| def build_context(query: str): | |
| docs = hybrid_search(query) | |
| if not docs: | |
| return "", [], [] | |
| context, sources, scores = "", [], [] | |
| for i, d in enumerate(docs, start=1): | |
| label = f"[{i}]" | |
| context += f"{label} {d.page_content[:900]}\nSource: {d.metadata['url']}\n\n" | |
| sources.append(f"{label} → {d.metadata['url']}") | |
| scores.append(d.metadata["rerank_score"]) | |
| return context, sources, scores | |
| def classify_query(q: str) -> str: | |
| q = q.lower() | |
| if "how" in q: | |
| return "how-to" | |
| if "error" in q or "fail" in q: | |
| return "debug" | |
| return "general" | |
| def init_metrics(): | |
| return {"q": [], "lat": [], "tok": [], "g": [], "r": [], "c": [], "t": []} | |
| # global analytics, no gr.State | |
| METRICS = init_metrics() | |
| # ----------------- CHAT HANDLER ----------------- # | |
| def answer_question(query, history): | |
| global METRICS | |
| if METRICS is None: | |
| METRICS = init_metrics() | |
| start = time.time() | |
| ctx, sources, scores = build_context(query) | |
| if not ctx: | |
| reply = "Not in docs or insufficient context.\nGroundedness: 0%" | |
| history.append((query, reply)) | |
| return history, "" | |
| prompt = f""" | |
| Use ONLY the context below to answer. | |
| Every sentence MUST end with a citation like [1]. | |
| Question: {query} | |
| Context: | |
| {ctx} | |
| At the end add a line: Groundedness: XX% | |
| """ | |
| answer = call_llm(prompt) | |
| latency = time.time() - start | |
| # robust groundedness parsing | |
| grounded = 0 | |
| m = re.search(r"Groundedness:\s*(\d+)%", answer) | |
| if m: | |
| try: | |
| grounded = int(m.group(1)) | |
| except ValueError: | |
| grounded = 0 | |
| cites = len(set(re.findall(r"\[(\d+)\]", answer))) | |
| avg_score = sum(scores) / len(scores) if scores else 0.0 | |
| tokens = len(answer.split()) + len(prompt.split()) | |
| alert = "" | |
| if grounded < 70 or cites == 0: | |
| alert = "⚠️ Low support from docs; please verify in official Kubernetes docs.\n\n" | |
| final = alert + answer + "\n\n---\nSources:\n" + "\n".join(sources) | |
| history.append((query, final)) | |
| METRICS["q"].append(query) | |
| METRICS["lat"].append(latency) | |
| METRICS["tok"].append(tokens) | |
| METRICS["g"].append(grounded) | |
| METRICS["r"].append(avg_score) | |
| METRICS["c"].append(cites) | |
| METRICS["t"].append(classify_query(query)) | |
| return history, "" | |
| # ----------------- ANALYTICS HELPERS ----------------- # | |
| def render_metrics(): | |
| if len(METRICS["q"]) == 0: | |
| return [], 0.0, 0.0, 0.0 | |
| rows = [] | |
| for i, q in enumerate(METRICS["q"]): | |
| rows.append([ | |
| i + 1, | |
| q, | |
| round(METRICS["lat"][i], 3), | |
| METRICS["tok"][i], | |
| METRICS["g"][i], | |
| round(METRICS["r"][i], 3), | |
| METRICS["c"][i], | |
| METRICS["t"][i], | |
| ]) | |
| avg_ground = sum(METRICS["g"]) / len(METRICS["g"]) | |
| avg_lat = sum(METRICS["lat"]) / len(METRICS["lat"]) | |
| avg_tok = sum(METRICS["tok"]) / len(METRICS["tok"]) | |
| return rows, avg_ground, avg_lat, avg_tok | |
| # ----------------- GRADIO UI ----------------- # | |
| with gr.Blocks(title="Kubernetes RAG Assistant") as app: | |
| gr.Markdown("# ☸ Kubernetes RAG Assistant") | |
| with gr.Tab("Chat"): | |
| chat = gr.Chatbot(height=450) | |
| inp = gr.Textbox(label="Ask anything about Kubernetes") | |
| clear_btn = gr.Button("Reset Conversation") | |
| inp.submit(answer_question, [inp, chat], [chat, inp]) | |
| clear_btn.click(lambda: ([], ""), None, [chat, inp]) | |
| with gr.Tab("Analytics"): | |
| gr.Markdown("### 📊 Query Analytics (this session)") | |
| table = gr.DataFrame( | |
| headers=[ | |
| "ID", | |
| "Query", | |
| "Latency (s)", | |
| "Tokens", | |
| "Groundedness (%)", | |
| "Avg Rerank Score", | |
| "Citations", | |
| "Type", | |
| ], | |
| interactive=False, | |
| ) | |
| avgG = gr.Number(label="Avg Groundedness (%)") | |
| avgL = gr.Number(label="Avg Latency (s)") | |
| avgT = gr.Number(label="Avg Tokens") | |
| refresh = gr.Button("Update Dashboard") | |
| refresh.click(render_metrics, None, [table, avgG, avgL, avgT]) | |
| app.launch() | |