import os from typing import List, Dict, Any, Tuple, Optional import requests import gradio as gr from openai import OpenAI # Firecrawl SDK (used for scraping URLs into markdown) try: from firecrawl import Firecrawl except ImportError: Firecrawl = None # handled gracefully below # -------------------- CONFIG -------------------- CHAT_MODEL = "gpt-5" # main chat model DEFAULT_SYSTEM_PROMPT = """You are a Retrieval-Augmented Generation (RAG) assistant. Rules: - Answer ONLY using the provided knowledge base context and system instructions. - If the answer is not clearly supported by the context, say "I don’t know based on the current knowledge base." - Do not invent sources, statistics, or facts that are not present in the context. - When applicable, cite which source you used (e.g., "According to the uploaded file" or "Based on zenai.world"). - Be clear, concise, and structured. """ PRESET_CONFIGS = { "None (manual setup)": { "system": DEFAULT_SYSTEM_PROMPT, "urls": "", "text": "", }, "ZEN Sites Deep QA (zenai.world + AI Arena)": { "system": DEFAULT_SYSTEM_PROMPT + "\n\nYou specialize in answering questions about ZEN AI’s mission, programs, AI Pioneer, and ZEN AI Arena.", "urls": "https://zenai.world\nhttps://us.zenai.biz", "text": ( "ZEN AI is building the first global AI × Web3 literacy and automation movement, " "with youth, homeschool, and professional tracks and blockchain-verified credentials." ), }, "AI Policy & Governance Starter": { "system": DEFAULT_SYSTEM_PROMPT + "\n\nYou act as a neutral policy explainer. Summarize clearly, highlight key risks, opportunities, and practical implications.", "urls": "https://oecd.ai/en/ai-principles", "text": "Use this preset for high-level AI policy, governance, and principles exploration.", }, "Research Notebook / Personal RAG Sandbox": { "system": DEFAULT_SYSTEM_PROMPT + "\n\nYou help the user explore, connect, and synthesize insights from their personal notes and documents.", "urls": "", "text": "Use this as a sandbox for notebooks, transcripts, and long-form notes.", }, } # -------------------- TEXT HELPERS -------------------- def chunk_text(text: str, max_chars: int = 2000, overlap: int = 200) -> List[str]: """Simple character-based chunking with overlap.""" text = (text or "").strip() if not text: return [] chunks = [] start = 0 length = len(text) while start < length: end = min(start + max_chars, length) chunk = text[start:end] chunks.append(chunk) if end >= length: break start = max(0, end - overlap) return chunks def tokenize(text: str) -> List[str]: """Very simple tokenizer: lowercase, keep alphanumerics, split on spaces.""" cleaned = [] for ch in text.lower(): if ch.isalnum(): cleaned.append(ch) else: cleaned.append(" ") return [tok for tok in "".join(cleaned).split() if tok] # -------------------- DATA SOURCE HELPERS -------------------- def fetch_url_text(url: str) -> str: """Fallback: fetch text from a URL via simple HTTP.""" try: resp = requests.get(url, timeout=12) resp.raise_for_status() text = resp.text # crude HTML stripping: cut off at first script/style and remove angle brackets for tag in ["", " ") return text except Exception as e: return f"[Error fetching {url}: {e}]" def read_file_text(path: str) -> str: """Read text from simple text-based files; skip others safely.""" if not path: return "" path_lower = path.lower() try: if any(path_lower.endswith(ext) for ext in [".txt", ".md", ".csv", ".json"]): with open(path, "r", encoding="utf-8", errors="ignore") as f: return f.read() return f"[Unsupported file type for RAG content: {os.path.basename(path)}]" except Exception as e: return f"[Error reading file {os.path.basename(path)}: {e}]" # -------------------- FIRECRAWL HELPERS -------------------- def extract_markdown_from_firecrawl_result(result: Any) -> str: """ Firecrawl scrape(...) can return Document-like objects or dicts. We try to collect all markdown text into one big string. """ texts: List[str] = [] def _collect(obj: Any): if obj is None: return # Document-like object with attribute markdown md = getattr(obj, "markdown", None) if isinstance(md, str) and md.strip(): texts.append(md) return # Dict-shaped if isinstance(obj, dict): if isinstance(obj.get("markdown"), str): texts.append(obj["markdown"]) data = obj.get("data") if data is not None: _collect(data) return # Iterable (list/tuple of docs) if isinstance(obj, (list, tuple)): for item in obj: _collect(item) return _collect(result) if texts: return "\n\n".join(texts) # Fallback: string representation if nothing else worked return str(result) def firecrawl_scrape_url(firecrawl_api_key: str, url: str) -> str: """ Use Firecrawl to scrape a single URL and return markdown. This is intentionally *not* a full crawl to keep it fast. """ firecrawl_api_key = (firecrawl_api_key or "").strip() if not firecrawl_api_key: return "[Firecrawl error: no Firecrawl API key provided.]" if Firecrawl is None: return "[Firecrawl error: firecrawl-py is not installed. Add it to requirements.txt.]" try: fc = Firecrawl(api_key=firecrawl_api_key) # Fast single-page scrape → markdown suitable for RAG doc = fc.scrape(url, formats=["markdown"]) markdown = extract_markdown_from_firecrawl_result(doc) return markdown except Exception as e: return f"[Firecrawl error for {url}: {e}]" # -------------------- LOCAL KB BUILD (NO OPENAI EMBEDDINGS) -------------------- def build_local_kb(docs: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], str]: """ Build a local KB with lexical features only (no OpenAI embeddings). Each KB entry: {id, source, text, tokens} """ kb_chunks: List[Dict[str, Any]] = [] total_chunks = 0 for d in docs: source = d.get("source", "unknown") text = d.get("text", "") chunks = chunk_text(text, max_chars=2000, overlap=200) for idx, ch in enumerate(chunks): tokens = tokenize(ch) kb_chunks.append( { "id": f"{source}_{idx}", "source": source, "text": ch, "tokens": tokens, } ) total_chunks += 1 status = f"✅ Knowledge base built with {len(docs)} documents and {total_chunks} chunks (lexical retrieval)." return kb_chunks, status def retrieve_context_local( kb: List[Dict[str, Any]], query: str, top_k: int = 5, ) -> Tuple[str, str]: """ Retrieve top-k relevant chunks from KB for the query using simple lexical matching: score = number of overlapping tokens between query and chunk. """ if not kb: return "", "ℹ️ No knowledge base yet. The model will answer from instructions only." q_tokens = tokenize(query) if not q_tokens: return "", "ℹ️ Query has no meaningful tokens; answering from instructions only." q_set = set(q_tokens) scored: List[Tuple[int, Dict[str, Any]]] = [] for d in kb: tokens = d.get("tokens") or [] if not tokens: continue t_set = set(tokens) overlap = len(q_set & t_set) if overlap > 0: scored.append((overlap, d)) if not scored: return "", "ℹ️ No lexical overlap with knowledge base; answering from instructions only." scored.sort(key=lambda x: x[0], reverse=True) top = [d for (score, d) in scored[:top_k]] context_parts = [] for idx, d in enumerate(top, start=1): src = d.get("source", "unknown") txt = d.get("text", "") context_parts.append( f"[Chunk {idx} | Source: {src}]\n{txt}\n" ) context = "\n\n---\n\n".join(context_parts) debug = f"📚 Retrieved {len(top)} chunks from KB via lexical retrieval (no embeddings)." return context, debug # -------------------- GRADIO CALLBACKS -------------------- def save_api_key(api_key: str): api_key = (api_key or "").strip() if not api_key: return "❌ No API key provided.", "" masked = f"{api_key[:4]}...{api_key[-4:]}" if len(api_key) >= 8 else "******" status = f"✅ OpenAI key saved for this session: `{masked}`" return status, api_key def save_firecrawl_key(fc_key: str): fc_key = (fc_key or "").strip() if not fc_key: return "⚠️ No Firecrawl API key provided.", "" masked = f"{fc_key[:3]}...{fc_key[-4:]}" if len(fc_key) >= 8 else "******" status = f"✅ Firecrawl key saved for this session: `{masked}`" return status, fc_key def apply_preset(preset_name: str): cfg = PRESET_CONFIGS.get(preset_name) or PRESET_CONFIGS["None (manual setup)"] return cfg["system"], cfg["urls"], cfg["text"] def build_knowledge_base( api_key: str, firecrawl_api_key: str, urls_text: str, raw_text: str, file_paths: Optional[List[str]], ): """ Build knowledge base using: - Firecrawl scrape for URLs (if Firecrawl key provided and SDK available) - Fallback to simple HTTP fetch if Firecrawl not available - Raw text - Files Note: api_key is kept in the signature for symmetry and potential future use, but not required for lexical-only indexing. """ api_key = (api_key or "").strip() if not api_key: return "❌ Please save your OpenAI API key first.", [] firecrawl_api_key = (firecrawl_api_key or "").strip() docs: List[Dict[str, Any]] = [] # URLs urls = [u.strip() for u in (urls_text or "").splitlines() if u.strip()] for u in urls: text_from_url = "" if firecrawl_api_key: # Try Firecrawl first (single-page scrape) fc_text = firecrawl_scrape_url(firecrawl_api_key, u) if not fc_text.startswith("[Firecrawl error"): text_from_url = fc_text else: # Firecrawl failed; fallback to simple fetch text_from_url = fetch_url_text(u) else: # No Firecrawl key → simple fetch text_from_url = fetch_url_text(u) docs.append({"source": u, "text": text_from_url}) # Raw text if raw_text and raw_text.strip(): docs.append({"source": "Raw Text Block", "text": raw_text}) # Files if file_paths: for p in file_paths: if not p: continue txt = read_file_text(p) src_name = os.path.basename(p) docs.append({"source": f"File: {src_name}", "text": txt}) if not docs: return "⚠️ No knowledge sources provided (URLs, text, or files).", [] kb, status = build_local_kb(docs) return status, kb def extract_text_from_response(resp: Any) -> str: """ Extract plain text from the Responses API result. We assume structure like: resp.output -> list of output items each item.content -> list of content parts with .text or ['text'] """ if resp is None: return "" texts: List[str] = [] # New Responses API usually has resp.output output = getattr(resp, "output", None) or getattr(resp, "data", None) if output is None: # Fallback to just stringifying return str(resp) if not isinstance(output, (list, tuple)): output = [output] for item in output: content = getattr(item, "content", None) if content is None and isinstance(item, dict): content = item.get("content") if content is None: continue if not isinstance(content, (list, tuple)): content = [content] for part in content: # Part might be object with .text txt = getattr(part, "text", None) if isinstance(txt, str) and txt.strip(): texts.append(txt) continue # Or dict-like if isinstance(part, dict): t = part.get("text") if isinstance(t, str) and t.strip(): texts.append(t) continue # Fallback, stringify texts.append(str(part)) return "\n".join(texts).strip() def chat_with_rag( user_message: str, api_key: str, kb: List[Dict[str, Any]], system_prompt: str, history_pairs: List[List[str]], ): """ history_pairs: list of [user_str, assistant_str] pairs for the UI Chatbot. We'll rebuild conversation history for the Responses API each time. """ user_message = (user_message or "").strip() api_key = (api_key or "").strip() system_prompt = (system_prompt or "").strip() if not user_message: return history_pairs, history_pairs, "❌ Please enter a question." if not api_key: return history_pairs, history_pairs, "❌ Please save your OpenAI API key first." if not system_prompt: system_prompt = DEFAULT_SYSTEM_PROMPT # Retrieve context from KB (local lexical retrieval) context, debug_retrieval = retrieve_context_local(kb, user_message) client = OpenAI(api_key=api_key) # Build input for Responses API input_messages: List[Dict[str, Any]] = [] combined_system = ( DEFAULT_SYSTEM_PROMPT.strip() + "\n\n---\n\nUser System Instructions:\n" + system_prompt.strip() ) input_messages.append( { "role": "system", "content": [{"type": "input_text", "text": combined_system}], } ) if context: context_block = ( "You have access to the following knowledge base context.\n" "You MUST base your answer ONLY on this context and the system instructions.\n" "If the answer is not supported by the context, say you don’t know.\n\n" f"{context}" ) input_messages.append( { "role": "system", "content": [{"type": "input_text", "text": context_block}], } ) # Rebuild conversation history from pairs (last few turns) recent_pairs = history_pairs[-5:] if history_pairs else [] for u, a in recent_pairs: input_messages.append( { "role": "user", "content": [{"type": "input_text", "text": u}], } ) input_messages.append( { "role": "assistant", "content": [{"type": "output_text", "text": a}], } ) # Current user message input_messages.append( { "role": "user", "content": [{"type": "input_text", "text": user_message}], } ) # Call OpenAI GPT-5 via Responses API try: resp = client.responses.create( model=CHAT_MODEL, input=input_messages, # no temperature, no token params -> avoid unsupported parameter errors ) answer = extract_text_from_response(resp) if not answer.strip(): answer = "⚠️ Model returned an empty response object. This may be an API issue." except Exception as e: answer = f"⚠️ OpenAI API error: {e}" # Update UI history as list of [user, assistant] pairs new_history = history_pairs + [[user_message, answer]] return new_history, new_history, debug_retrieval def clear_chat(): return [], [], "Chat cleared." # -------------------- UI LAYOUT -------------------- with gr.Blocks(title="RAG Chatbot — GPT-5 + URLs / Files / Text + Firecrawl") as demo: gr.Markdown( """ # 🔍 RAG Chatbot — GPT-5 + URLs / Files / Text + Firecrawl 1. Enter your **OpenAI API key** and click **Save**. 2. (Optional) Enter your **Firecrawl API key** and save it. 3. Choose a preset (e.g. **ZEN Sites Deep QA**) — this auto-loads URLs like `https://zenai.world`. 4. Click **Grab / Retrieve Knowledge (Firecrawl + Lexical Index)** to scrape URLs + index everything. 5. Ask questions — the bot will answer **only** from your knowledge and system instructions. """ ) api_key_state = gr.State("") firecrawl_key_state = gr.State("") kb_state = gr.State([]) chat_state = gr.State([]) # list of [user, assistant] pairs # default preset on load -> ZEN default_preset_name = "ZEN Sites Deep QA (zenai.world + AI Arena)" default_preset_cfg = PRESET_CONFIGS[default_preset_name] with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 🔑 API & System") api_key_box = gr.Textbox( label="OpenAI API Key", placeholder="sk-...", type="password", ) save_api_btn = gr.Button("Save OpenAI API Key", variant="primary") save_status = gr.Markdown("OpenAI API key not set.") firecrawl_key_box = gr.Textbox( label="Firecrawl API Key (optional)", placeholder="fc-...", type="password", ) save_firecrawl_btn = gr.Button("Save Firecrawl Key") firecrawl_status = gr.Markdown( "Firecrawl key not set (will fall back to simple URL fetch)." ) preset_dropdown = gr.Dropdown( label="Presets", choices=list(PRESET_CONFIGS.keys()), value=default_preset_name, ) system_box = gr.Textbox( label="System Instructions", lines=8, value=default_preset_cfg["system"], ) gr.Markdown("### 📚 Knowledge Sources") urls_box = gr.Textbox( label="Knowledge URLs (one per line)", lines=4, value=default_preset_cfg["urls"], placeholder="https://zenai.world\nhttps://us.zenai.biz", ) raw_text_box = gr.Textbox( label="Additional Knowledge Text", lines=6, value=default_preset_cfg["text"], placeholder="Paste any notes, docs, or reference text here...", ) files_input = gr.File( label="Upload Knowledge Files (.txt, .md, .csv, .json)", file_count="multiple", type="filepath", ) grab_kb_btn = gr.Button( "Grab / Retrieve Knowledge (Firecrawl + Lexical Index)", variant="secondary", ) kb_status_md = gr.Markdown("ℹ️ No knowledge base built yet.") with gr.Column(scale=2): gr.Markdown("### 💬 RAG Chat") # Classic Chatbot format: list of [user, assistant] pairs chatbot = gr.Chatbot( label="RAG Chatbot (GPT-5)", height=450, ) user_input = gr.Textbox( label="Ask a question", lines=3, placeholder="Ask about zenai.world, AI Arena, or your uploaded docs...", ) with gr.Row(): send_btn = gr.Button("Send", variant="primary") clear_btn = gr.Button("Clear Chat") debug_md = gr.Markdown( "ℹ️ Retrieval debug info will appear here after each answer." ) # Wiring: save OpenAI API key save_api_btn.click( fn=save_api_key, inputs=[api_key_box], outputs=[save_status, api_key_state], ) # Wiring: save Firecrawl API key save_firecrawl_btn.click( fn=save_firecrawl_key, inputs=[firecrawl_key_box], outputs=[firecrawl_status, firecrawl_key_state], ) # Wiring: presets preset_dropdown.change( fn=apply_preset, inputs=[preset_dropdown], outputs=[system_box, urls_box, raw_text_box], ) # Wiring: build knowledge base (Firecrawl + lexical index) grab_kb_btn.click( fn=build_knowledge_base, inputs=[api_key_state, firecrawl_key_state, urls_box, raw_text_box, files_input], outputs=[kb_status_md, kb_state], ) # Wiring: chat send (button) send_btn.click( fn=chat_with_rag, inputs=[user_input, api_key_state, kb_state, system_box, chat_state], outputs=[chatbot, chat_state, debug_md], ) # Wiring: chat send (Enter key) user_input.submit( fn=chat_with_rag, inputs=[user_input, api_key_state, kb_state, system_box, chat_state], outputs=[chatbot, chat_state, debug_md], ) # Wiring: clear chat clear_btn.click( fn=clear_chat, inputs=[], outputs=[chatbot, chat_state, debug_md], ) if __name__ == "__main__": demo.launch()