""" app.py — Enterprise Document Q&A (RAG) Powered by Llama 3 + FAISS + Sentence Transformers A Demo Product by Kerdos Infrasoft Private Limited Website: https://kerdos.in New features in this version: • Model selector dropdown (switch LLM without restart) • Indexing progress indicator (gr.Progress) • MAX_NEW_TOKENS slider exposed in UI • Retrieved sources panel with cosine scores (accordion) • Chat export — download conversation as Markdown • .dockerignore added for security """ import os import datetime import tempfile from pathlib import Path from dotenv import load_dotenv import gradio as gr from rag.document_loader import load_documents from rag.embedder import build_index, add_to_index from rag.retriever import retrieve from rag.chain import answer_stream import rag.chain as _chain_module load_dotenv() # ───────────────────────────────────────────────────────────────────────────── # Available models (HF Inference API — free tier) # ───────────────────────────────────────────────────────────────────────────── AVAILABLE_MODELS = { "Llama 3.1 8B Instruct ⚡ (default)": "meta-llama/Llama-3.1-8B-Instruct", "Mistral 7B Instruct v0.3": "mistralai/Mistral-7B-Instruct-v0.3", "Mixtral 8×7B Instruct v0.1": "mistralai/Mixtral-8x7B-Instruct-v0.1", "Qwen2.5 72B Instruct": "Qwen/Qwen2.5-72B-Instruct", } DEFAULT_MODEL_LABEL = list(AVAILABLE_MODELS.keys())[0] # ───────────────────────────────────────────────────────────────────────────── # State helpers # ───────────────────────────────────────────────────────────────────────────── def get_hf_token(user_token: str) -> str: t = user_token.strip() if user_token else "" return t or os.environ.get("HF_TOKEN", "") # ───────────────────────────────────────────────────────────────────────────── # Gradio handlers # ───────────────────────────────────────────────────────────────────────────── def process_files(files, current_index, indexed_sources, progress=gr.Progress()): """Parse uploaded files and build / extend the FAISS index with live progress.""" if not files: return current_index, indexed_sources, "⚠️ No files uploaded." file_paths = [f.name for f in files] if hasattr(files[0], "name") else files # ── Duplicate guard ────────────────────────────────────────────────────── new_paths, skipped = [], [] for p in file_paths: name = Path(p).name if name in indexed_sources: skipped.append(name) else: new_paths.append(p) if skipped and not new_paths: return current_index, indexed_sources, ( f"⚠️ Already indexed: {', '.join(skipped)}. No new documents added." ) # ── Load ───────────────────────────────────────────────────────────────── progress(0.10, desc="📄 Parsing documents…") docs = load_documents(new_paths) if not docs: return current_index, indexed_sources, ( "❌ Could not extract text. Please upload PDF, DOCX, TXT, MD, or CSV." ) # ── Embed & index ───────────────────────────────────────────────────────── progress(0.40, desc="🧠 Embedding chunks…") try: if current_index is None: idx = build_index(docs) else: idx = add_to_index(current_index, docs) except Exception as e: return current_index, indexed_sources, f"❌ Failed to build index: {e}" progress(1.0, desc="✅ Done!") new_sources = {d["source"] for d in docs} updated_sources = indexed_sources | new_sources total_chunks = idx.index.ntotal skip_note = f" (skipped duplicates: {', '.join(skipped)})" if skipped else "" msg = ( f"✅ Indexed {len(new_sources)} new file(s): {', '.join(new_sources)}{skip_note}\n" f"📦 Total chunks in knowledge base: {total_chunks}" ) return idx, updated_sources, msg def chat(user_message, history, vector_index, hf_token_input, top_k, model_label, max_tokens): """Streaming chat handler — yields progressively-updated history + sources panel.""" if not user_message.strip(): yield history, "", "" return hf_token = get_hf_token(hf_token_input) if not hf_token: history = history + [ {"role": "user", "content": user_message}, {"role": "assistant", "content": "⚠️ Please provide a Hugging Face API token."}, ] yield history, "", "" return if vector_index is None: history = history + [ {"role": "user", "content": user_message}, {"role": "assistant", "content": "⚠️ Please upload at least one document first."}, ] yield history, "", "" return # Apply model + token settings from UI for this request selected_model = AVAILABLE_MODELS.get(model_label, _chain_module.LLM_MODEL) _chain_module.LLM_MODEL = selected_model _chain_module.MAX_NEW_TOKENS = int(max_tokens) try: chunks = retrieve(user_message, vector_index, top_k=int(top_k)) # Build sources panel text if chunks: sources_lines = ["**🔍 Retrieved Chunks:**\n"] for i, c in enumerate(chunks, 1): score_bar = "█" * int(c["score"] * 10) + "░" * (10 - int(c["score"] * 10)) sources_lines.append( f"**[{i}] {c['source']}** — score: `{c['score']:.3f}` `{score_bar}`\n" f"> {c['text'][:220].strip()}{'…' if len(c['text']) > 220 else ''}\n" ) sources_md = "\n".join(sources_lines) else: sources_md = "_(No relevant chunks above score threshold)_" # Append placeholder for streaming history = history + [ {"role": "user", "content": user_message}, {"role": "assistant", "content": ""}, ] for partial in answer_stream(user_message, chunks, hf_token, chat_history=history[:-2]): history[-1]["content"] = partial yield history, "", sources_md yield history, "", sources_md except Exception as e: history[-1]["content"] = f"❌ Error: {e}" yield history, "", "" def export_chat(history) -> str | None: """Export the current chat history to a Markdown file for download.""" if not history: return None lines = [ f"# Kerdos AI — Chat Export", f"_Exported: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}_\n", "---\n", ] for msg in history: role = "👤 **User**" if msg["role"] == "user" else "🤖 **Assistant**" lines.append(f"{role}\n\n{msg['content']}\n\n---\n") tmp = tempfile.NamedTemporaryFile( mode="w", suffix=".md", prefix="kerdos_chat_", delete=False, encoding="utf-8" ) tmp.write("\n".join(lines)) tmp.close() return tmp.name def reset_all(): """Clear index, chat, sources panel, and the indexed-sources tracker.""" return None, set(), [], "🗑️ Knowledge base and chat cleared.", "", "" # ───────────────────────────────────────────────────────────────────────────── # CSS # ───────────────────────────────────────────────────────────────────────────── CSS = """ /* ── Kerdos Brand Theme ── */ :root { --kerdos-primary: #0055FF; --kerdos-accent: #00C2FF; --kerdos-dark: #0A0F2C; --kerdos-light: #E8F0FF; } body { font-family: 'Segoe UI', Arial, sans-serif; } #kerdos-header { background: linear-gradient(135deg, #0A0F2C 0%, #0B2C6E 60%, #0044CC 100%); border-radius: 16px; padding: 24px 32px 20px; margin-bottom: 12px; border: 1px solid rgba(0,194,255,0.25); box-shadow: 0 4px 24px rgba(0,85,255,0.18); } #kerdos-logo-line { display: flex; align-items: center; justify-content: center; gap: 10px; flex-wrap: wrap; } #kerdos-badge { display: inline-block; background: rgba(0,194,255,0.15); border: 1px solid rgba(0,194,255,0.4); border-radius: 20px; padding: 3px 14px; font-size: 0.75em; color: #00C2FF; letter-spacing: 0.08em; text-transform: uppercase; font-weight: 600; } #kerdos-demo-banner { background: linear-gradient(90deg, rgba(255,160,0,0.15), rgba(255,100,0,0.15)); border: 1px solid rgba(255,160,0,0.4); border-radius: 10px; padding: 10px 18px; margin: 10px 0 6px; text-align: center; font-size: 0.88em; } #kerdos-fund-banner { background: linear-gradient(90deg, rgba(0,85,255,0.12), rgba(0,194,255,0.12)); border: 1px solid rgba(0,194,255,0.3); border-radius: 10px; padding: 10px 18px; margin: 6px 0 0; text-align: center; font-size: 0.85em; } #kerdos-footer { text-align: center; margin-top: 18px; padding: 12px; border-top: 1px solid rgba(0,194,255,0.15); font-size: 0.82em; color: #888; } #subtitle { text-align: center; color: #6B8CFF; margin-bottom: 8px; } .upload-box { border: 2px dashed #0055FF !important; border-radius: 12px !important; } #status-box { font-size: 0.9em; } footer { display: none !important; } """ # ───────────────────────────────────────────────────────────────────────────── # UI # ───────────────────────────────────────────────────────────────────────────── with gr.Blocks(title="Kerdos AI — Custom LLM Chat | Document Q&A Demo") as demo: # ── Kerdos Header ──────────────────────────────────────────────────────── gr.HTML("""