|
|
import os |
|
|
from typing import List, Dict, Any, Tuple, Optional |
|
|
|
|
|
import requests |
|
|
import gradio as gr |
|
|
from openai import OpenAI |
|
|
|
|
|
|
|
|
try: |
|
|
from firecrawl import Firecrawl |
|
|
except ImportError: |
|
|
Firecrawl = None |
|
|
|
|
|
|
|
|
|
|
|
CHAT_MODEL = "gpt-5" |
|
|
|
|
|
DEFAULT_SYSTEM_PROMPT = """You are a Retrieval-Augmented Generation (RAG) assistant. |
|
|
|
|
|
Rules: |
|
|
- Answer ONLY using the provided knowledge base context and system instructions. |
|
|
- If the answer is not clearly supported by the context, say "I don’t know based on the current knowledge base." |
|
|
- Do not invent sources, statistics, or facts that are not present in the context. |
|
|
- When applicable, cite which source you used (e.g., "According to the uploaded file" or "Based on zenai.world"). |
|
|
- Be clear, concise, and structured. |
|
|
""" |
|
|
|
|
|
PRESET_CONFIGS = { |
|
|
"None (manual setup)": { |
|
|
"system": DEFAULT_SYSTEM_PROMPT, |
|
|
"urls": "", |
|
|
"text": "", |
|
|
}, |
|
|
"ZEN Sites Deep QA (zenai.world + AI Arena)": { |
|
|
"system": DEFAULT_SYSTEM_PROMPT |
|
|
+ "\n\nYou specialize in answering questions about ZEN AI’s mission, programs, AI Pioneer, and ZEN AI Arena.", |
|
|
"urls": "https://zenai.world\nhttps://us.zenai.biz", |
|
|
"text": ( |
|
|
"ZEN AI is building the first global AI × Web3 literacy and automation movement, " |
|
|
"with youth, homeschool, and professional tracks and blockchain-verified credentials." |
|
|
), |
|
|
}, |
|
|
"AI Policy & Governance Starter": { |
|
|
"system": DEFAULT_SYSTEM_PROMPT |
|
|
+ "\n\nYou act as a neutral policy explainer. Summarize clearly, highlight key risks, opportunities, and practical implications.", |
|
|
"urls": "https://oecd.ai/en/ai-principles", |
|
|
"text": "Use this preset for high-level AI policy, governance, and principles exploration.", |
|
|
}, |
|
|
"Research Notebook / Personal RAG Sandbox": { |
|
|
"system": DEFAULT_SYSTEM_PROMPT |
|
|
+ "\n\nYou help the user explore, connect, and synthesize insights from their personal notes and documents.", |
|
|
"urls": "", |
|
|
"text": "Use this as a sandbox for notebooks, transcripts, and long-form notes.", |
|
|
}, |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def chunk_text(text: str, max_chars: int = 2000, overlap: int = 200) -> List[str]: |
|
|
"""Simple character-based chunking with overlap.""" |
|
|
text = (text or "").strip() |
|
|
if not text: |
|
|
return [] |
|
|
chunks = [] |
|
|
start = 0 |
|
|
length = len(text) |
|
|
while start < length: |
|
|
end = min(start + max_chars, length) |
|
|
chunk = text[start:end] |
|
|
chunks.append(chunk) |
|
|
if end >= length: |
|
|
break |
|
|
start = max(0, end - overlap) |
|
|
return chunks |
|
|
|
|
|
|
|
|
def tokenize(text: str) -> List[str]: |
|
|
"""Very simple tokenizer: lowercase, keep alphanumerics, split on spaces.""" |
|
|
cleaned = [] |
|
|
for ch in text.lower(): |
|
|
if ch.isalnum(): |
|
|
cleaned.append(ch) |
|
|
else: |
|
|
cleaned.append(" ") |
|
|
return [tok for tok in "".join(cleaned).split() if tok] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def fetch_url_text(url: str) -> str: |
|
|
"""Fallback: fetch text from a URL via simple HTTP.""" |
|
|
try: |
|
|
resp = requests.get(url, timeout=12) |
|
|
resp.raise_for_status() |
|
|
text = resp.text |
|
|
|
|
|
|
|
|
for tag in ["<script", "<style"]: |
|
|
if tag in text: |
|
|
text = text.split(tag)[0] |
|
|
|
|
|
text = text.replace("<", " ").replace(">", " ") |
|
|
return text |
|
|
except Exception as e: |
|
|
return f"[Error fetching {url}: {e}]" |
|
|
|
|
|
|
|
|
def read_file_text(path: str) -> str: |
|
|
"""Read text from simple text-based files; skip others safely.""" |
|
|
if not path: |
|
|
return "" |
|
|
path_lower = path.lower() |
|
|
try: |
|
|
if any(path_lower.endswith(ext) for ext in [".txt", ".md", ".csv", ".json"]): |
|
|
with open(path, "r", encoding="utf-8", errors="ignore") as f: |
|
|
return f.read() |
|
|
return f"[Unsupported file type for RAG content: {os.path.basename(path)}]" |
|
|
except Exception as e: |
|
|
return f"[Error reading file {os.path.basename(path)}: {e}]" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_markdown_from_firecrawl_result(result: Any) -> str: |
|
|
""" |
|
|
Firecrawl scrape(...) can return Document-like objects or dicts. |
|
|
We try to collect all markdown text into one big string. |
|
|
""" |
|
|
texts: List[str] = [] |
|
|
|
|
|
def _collect(obj: Any): |
|
|
if obj is None: |
|
|
return |
|
|
|
|
|
|
|
|
md = getattr(obj, "markdown", None) |
|
|
if isinstance(md, str) and md.strip(): |
|
|
texts.append(md) |
|
|
return |
|
|
|
|
|
|
|
|
if isinstance(obj, dict): |
|
|
if isinstance(obj.get("markdown"), str): |
|
|
texts.append(obj["markdown"]) |
|
|
data = obj.get("data") |
|
|
if data is not None: |
|
|
_collect(data) |
|
|
return |
|
|
|
|
|
|
|
|
if isinstance(obj, (list, tuple)): |
|
|
for item in obj: |
|
|
_collect(item) |
|
|
return |
|
|
|
|
|
_collect(result) |
|
|
if texts: |
|
|
return "\n\n".join(texts) |
|
|
|
|
|
return str(result) |
|
|
|
|
|
|
|
|
def firecrawl_scrape_url(firecrawl_api_key: str, url: str) -> str: |
|
|
""" |
|
|
Use Firecrawl to scrape a single URL and return markdown. |
|
|
This is intentionally *not* a full crawl to keep it fast. |
|
|
""" |
|
|
firecrawl_api_key = (firecrawl_api_key or "").strip() |
|
|
if not firecrawl_api_key: |
|
|
return "[Firecrawl error: no Firecrawl API key provided.]" |
|
|
|
|
|
if Firecrawl is None: |
|
|
return "[Firecrawl error: firecrawl-py is not installed. Add it to requirements.txt.]" |
|
|
|
|
|
try: |
|
|
fc = Firecrawl(api_key=firecrawl_api_key) |
|
|
|
|
|
doc = fc.scrape(url, formats=["markdown"]) |
|
|
markdown = extract_markdown_from_firecrawl_result(doc) |
|
|
return markdown |
|
|
except Exception as e: |
|
|
return f"[Firecrawl error for {url}: {e}]" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def build_local_kb(docs: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], str]: |
|
|
""" |
|
|
Build a local KB with lexical features only (no OpenAI embeddings). |
|
|
Each KB entry: {id, source, text, tokens} |
|
|
""" |
|
|
kb_chunks: List[Dict[str, Any]] = [] |
|
|
total_chunks = 0 |
|
|
|
|
|
for d in docs: |
|
|
source = d.get("source", "unknown") |
|
|
text = d.get("text", "") |
|
|
chunks = chunk_text(text, max_chars=2000, overlap=200) |
|
|
|
|
|
for idx, ch in enumerate(chunks): |
|
|
tokens = tokenize(ch) |
|
|
kb_chunks.append( |
|
|
{ |
|
|
"id": f"{source}_{idx}", |
|
|
"source": source, |
|
|
"text": ch, |
|
|
"tokens": tokens, |
|
|
} |
|
|
) |
|
|
total_chunks += 1 |
|
|
|
|
|
status = f"✅ Knowledge base built with {len(docs)} documents and {total_chunks} chunks (lexical retrieval)." |
|
|
return kb_chunks, status |
|
|
|
|
|
|
|
|
def retrieve_context_local( |
|
|
kb: List[Dict[str, Any]], |
|
|
query: str, |
|
|
top_k: int = 5, |
|
|
) -> Tuple[str, str]: |
|
|
""" |
|
|
Retrieve top-k relevant chunks from KB for the query using simple lexical matching: |
|
|
score = number of overlapping tokens between query and chunk. |
|
|
""" |
|
|
if not kb: |
|
|
return "", "ℹ️ No knowledge base yet. The model will answer from instructions only." |
|
|
|
|
|
q_tokens = tokenize(query) |
|
|
if not q_tokens: |
|
|
return "", "ℹ️ Query has no meaningful tokens; answering from instructions only." |
|
|
|
|
|
q_set = set(q_tokens) |
|
|
|
|
|
scored: List[Tuple[int, Dict[str, Any]]] = [] |
|
|
for d in kb: |
|
|
tokens = d.get("tokens") or [] |
|
|
if not tokens: |
|
|
continue |
|
|
t_set = set(tokens) |
|
|
overlap = len(q_set & t_set) |
|
|
if overlap > 0: |
|
|
scored.append((overlap, d)) |
|
|
|
|
|
if not scored: |
|
|
return "", "ℹ️ No lexical overlap with knowledge base; answering from instructions only." |
|
|
|
|
|
scored.sort(key=lambda x: x[0], reverse=True) |
|
|
top = [d for (score, d) in scored[:top_k]] |
|
|
|
|
|
context_parts = [] |
|
|
for idx, d in enumerate(top, start=1): |
|
|
src = d.get("source", "unknown") |
|
|
txt = d.get("text", "") |
|
|
context_parts.append( |
|
|
f"[Chunk {idx} | Source: {src}]\n{txt}\n" |
|
|
) |
|
|
|
|
|
context = "\n\n---\n\n".join(context_parts) |
|
|
debug = f"📚 Retrieved {len(top)} chunks from KB via lexical retrieval (no embeddings)." |
|
|
return context, debug |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def save_api_key(api_key: str): |
|
|
api_key = (api_key or "").strip() |
|
|
if not api_key: |
|
|
return "❌ No API key provided.", "" |
|
|
masked = f"{api_key[:4]}...{api_key[-4:]}" if len(api_key) >= 8 else "******" |
|
|
status = f"✅ OpenAI key saved for this session: `{masked}`" |
|
|
return status, api_key |
|
|
|
|
|
|
|
|
def save_firecrawl_key(fc_key: str): |
|
|
fc_key = (fc_key or "").strip() |
|
|
if not fc_key: |
|
|
return "⚠️ No Firecrawl API key provided.", "" |
|
|
masked = f"{fc_key[:3]}...{fc_key[-4:]}" if len(fc_key) >= 8 else "******" |
|
|
status = f"✅ Firecrawl key saved for this session: `{masked}`" |
|
|
return status, fc_key |
|
|
|
|
|
|
|
|
def apply_preset(preset_name: str): |
|
|
cfg = PRESET_CONFIGS.get(preset_name) or PRESET_CONFIGS["None (manual setup)"] |
|
|
return cfg["system"], cfg["urls"], cfg["text"] |
|
|
|
|
|
|
|
|
def build_knowledge_base( |
|
|
api_key: str, |
|
|
firecrawl_api_key: str, |
|
|
urls_text: str, |
|
|
raw_text: str, |
|
|
file_paths: Optional[List[str]], |
|
|
): |
|
|
""" |
|
|
Build knowledge base using: |
|
|
- Firecrawl scrape for URLs (if Firecrawl key provided and SDK available) |
|
|
- Fallback to simple HTTP fetch if Firecrawl not available |
|
|
- Raw text |
|
|
- Files |
|
|
|
|
|
Note: api_key is kept in the signature for symmetry and potential future use, |
|
|
but not required for lexical-only indexing. |
|
|
""" |
|
|
api_key = (api_key or "").strip() |
|
|
if not api_key: |
|
|
return "❌ Please save your OpenAI API key first.", [] |
|
|
|
|
|
firecrawl_api_key = (firecrawl_api_key or "").strip() |
|
|
|
|
|
docs: List[Dict[str, Any]] = [] |
|
|
|
|
|
|
|
|
urls = [u.strip() for u in (urls_text or "").splitlines() if u.strip()] |
|
|
for u in urls: |
|
|
text_from_url = "" |
|
|
if firecrawl_api_key: |
|
|
|
|
|
fc_text = firecrawl_scrape_url(firecrawl_api_key, u) |
|
|
if not fc_text.startswith("[Firecrawl error"): |
|
|
text_from_url = fc_text |
|
|
else: |
|
|
|
|
|
text_from_url = fetch_url_text(u) |
|
|
else: |
|
|
|
|
|
text_from_url = fetch_url_text(u) |
|
|
|
|
|
docs.append({"source": u, "text": text_from_url}) |
|
|
|
|
|
|
|
|
if raw_text and raw_text.strip(): |
|
|
docs.append({"source": "Raw Text Block", "text": raw_text}) |
|
|
|
|
|
|
|
|
if file_paths: |
|
|
for p in file_paths: |
|
|
if not p: |
|
|
continue |
|
|
txt = read_file_text(p) |
|
|
src_name = os.path.basename(p) |
|
|
docs.append({"source": f"File: {src_name}", "text": txt}) |
|
|
|
|
|
if not docs: |
|
|
return "⚠️ No knowledge sources provided (URLs, text, or files).", [] |
|
|
|
|
|
kb, status = build_local_kb(docs) |
|
|
return status, kb |
|
|
|
|
|
|
|
|
def extract_text_from_response(resp: Any) -> str: |
|
|
""" |
|
|
Extract plain text from the Responses API result. |
|
|
We assume structure like: |
|
|
resp.output -> list of output items |
|
|
each item.content -> list of content parts with .text or ['text'] |
|
|
""" |
|
|
if resp is None: |
|
|
return "" |
|
|
|
|
|
texts: List[str] = [] |
|
|
|
|
|
|
|
|
output = getattr(resp, "output", None) or getattr(resp, "data", None) |
|
|
if output is None: |
|
|
|
|
|
return str(resp) |
|
|
|
|
|
if not isinstance(output, (list, tuple)): |
|
|
output = [output] |
|
|
|
|
|
for item in output: |
|
|
content = getattr(item, "content", None) |
|
|
if content is None and isinstance(item, dict): |
|
|
content = item.get("content") |
|
|
if content is None: |
|
|
continue |
|
|
|
|
|
if not isinstance(content, (list, tuple)): |
|
|
content = [content] |
|
|
|
|
|
for part in content: |
|
|
|
|
|
txt = getattr(part, "text", None) |
|
|
if isinstance(txt, str) and txt.strip(): |
|
|
texts.append(txt) |
|
|
continue |
|
|
|
|
|
|
|
|
if isinstance(part, dict): |
|
|
t = part.get("text") |
|
|
if isinstance(t, str) and t.strip(): |
|
|
texts.append(t) |
|
|
continue |
|
|
|
|
|
|
|
|
texts.append(str(part)) |
|
|
|
|
|
return "\n".join(texts).strip() |
|
|
|
|
|
|
|
|
def chat_with_rag( |
|
|
user_message: str, |
|
|
api_key: str, |
|
|
kb: List[Dict[str, Any]], |
|
|
system_prompt: str, |
|
|
history_pairs: List[List[str]], |
|
|
): |
|
|
""" |
|
|
history_pairs: list of [user_str, assistant_str] pairs for the UI Chatbot. |
|
|
We'll rebuild conversation history for the Responses API each time. |
|
|
""" |
|
|
user_message = (user_message or "").strip() |
|
|
api_key = (api_key or "").strip() |
|
|
system_prompt = (system_prompt or "").strip() |
|
|
|
|
|
if not user_message: |
|
|
return history_pairs, history_pairs, "❌ Please enter a question." |
|
|
|
|
|
if not api_key: |
|
|
return history_pairs, history_pairs, "❌ Please save your OpenAI API key first." |
|
|
|
|
|
if not system_prompt: |
|
|
system_prompt = DEFAULT_SYSTEM_PROMPT |
|
|
|
|
|
|
|
|
context, debug_retrieval = retrieve_context_local(kb, user_message) |
|
|
|
|
|
client = OpenAI(api_key=api_key) |
|
|
|
|
|
|
|
|
input_messages: List[Dict[str, Any]] = [] |
|
|
|
|
|
combined_system = ( |
|
|
DEFAULT_SYSTEM_PROMPT.strip() |
|
|
+ "\n\n---\n\nUser System Instructions:\n" |
|
|
+ system_prompt.strip() |
|
|
) |
|
|
input_messages.append( |
|
|
{ |
|
|
"role": "system", |
|
|
"content": [{"type": "input_text", "text": combined_system}], |
|
|
} |
|
|
) |
|
|
|
|
|
if context: |
|
|
context_block = ( |
|
|
"You have access to the following knowledge base context.\n" |
|
|
"You MUST base your answer ONLY on this context and the system instructions.\n" |
|
|
"If the answer is not supported by the context, say you don’t know.\n\n" |
|
|
f"{context}" |
|
|
) |
|
|
input_messages.append( |
|
|
{ |
|
|
"role": "system", |
|
|
"content": [{"type": "input_text", "text": context_block}], |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
recent_pairs = history_pairs[-5:] if history_pairs else [] |
|
|
for u, a in recent_pairs: |
|
|
input_messages.append( |
|
|
{ |
|
|
"role": "user", |
|
|
"content": [{"type": "input_text", "text": u}], |
|
|
} |
|
|
) |
|
|
input_messages.append( |
|
|
{ |
|
|
"role": "assistant", |
|
|
"content": [{"type": "output_text", "text": a}], |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
input_messages.append( |
|
|
{ |
|
|
"role": "user", |
|
|
"content": [{"type": "input_text", "text": user_message}], |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
resp = client.responses.create( |
|
|
model=CHAT_MODEL, |
|
|
input=input_messages, |
|
|
|
|
|
) |
|
|
answer = extract_text_from_response(resp) |
|
|
if not answer.strip(): |
|
|
answer = "⚠️ Model returned an empty response object. This may be an API issue." |
|
|
except Exception as e: |
|
|
answer = f"⚠️ OpenAI API error: {e}" |
|
|
|
|
|
|
|
|
new_history = history_pairs + [[user_message, answer]] |
|
|
|
|
|
return new_history, new_history, debug_retrieval |
|
|
|
|
|
|
|
|
def clear_chat(): |
|
|
return [], [], "Chat cleared." |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(title="RAG Chatbot — GPT-5 + URLs / Files / Text + Firecrawl") as demo: |
|
|
gr.Markdown( |
|
|
""" |
|
|
# 🔍 RAG Chatbot — GPT-5 + URLs / Files / Text + Firecrawl |
|
|
|
|
|
1. Enter your **OpenAI API key** and click **Save**. |
|
|
2. (Optional) Enter your **Firecrawl API key** and save it. |
|
|
3. Choose a preset (e.g. **ZEN Sites Deep QA**) — this auto-loads URLs like `https://zenai.world`. |
|
|
4. Click **Grab / Retrieve Knowledge (Firecrawl + Lexical Index)** to scrape URLs + index everything. |
|
|
5. Ask questions — the bot will answer **only** from your knowledge and system instructions. |
|
|
""" |
|
|
) |
|
|
|
|
|
api_key_state = gr.State("") |
|
|
firecrawl_key_state = gr.State("") |
|
|
kb_state = gr.State([]) |
|
|
chat_state = gr.State([]) |
|
|
|
|
|
|
|
|
default_preset_name = "ZEN Sites Deep QA (zenai.world + AI Arena)" |
|
|
default_preset_cfg = PRESET_CONFIGS[default_preset_name] |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("### 🔑 API & System") |
|
|
|
|
|
api_key_box = gr.Textbox( |
|
|
label="OpenAI API Key", |
|
|
placeholder="sk-...", |
|
|
type="password", |
|
|
) |
|
|
save_api_btn = gr.Button("Save OpenAI API Key", variant="primary") |
|
|
save_status = gr.Markdown("OpenAI API key not set.") |
|
|
|
|
|
firecrawl_key_box = gr.Textbox( |
|
|
label="Firecrawl API Key (optional)", |
|
|
placeholder="fc-...", |
|
|
type="password", |
|
|
) |
|
|
save_firecrawl_btn = gr.Button("Save Firecrawl Key") |
|
|
firecrawl_status = gr.Markdown( |
|
|
"Firecrawl key not set (will fall back to simple URL fetch)." |
|
|
) |
|
|
|
|
|
preset_dropdown = gr.Dropdown( |
|
|
label="Presets", |
|
|
choices=list(PRESET_CONFIGS.keys()), |
|
|
value=default_preset_name, |
|
|
) |
|
|
|
|
|
system_box = gr.Textbox( |
|
|
label="System Instructions", |
|
|
lines=8, |
|
|
value=default_preset_cfg["system"], |
|
|
) |
|
|
|
|
|
gr.Markdown("### 📚 Knowledge Sources") |
|
|
|
|
|
urls_box = gr.Textbox( |
|
|
label="Knowledge URLs (one per line)", |
|
|
lines=4, |
|
|
value=default_preset_cfg["urls"], |
|
|
placeholder="https://zenai.world\nhttps://us.zenai.biz", |
|
|
) |
|
|
|
|
|
raw_text_box = gr.Textbox( |
|
|
label="Additional Knowledge Text", |
|
|
lines=6, |
|
|
value=default_preset_cfg["text"], |
|
|
placeholder="Paste any notes, docs, or reference text here...", |
|
|
) |
|
|
|
|
|
files_input = gr.File( |
|
|
label="Upload Knowledge Files (.txt, .md, .csv, .json)", |
|
|
file_count="multiple", |
|
|
type="filepath", |
|
|
) |
|
|
|
|
|
grab_kb_btn = gr.Button( |
|
|
"Grab / Retrieve Knowledge (Firecrawl + Lexical Index)", |
|
|
variant="secondary", |
|
|
) |
|
|
kb_status_md = gr.Markdown("ℹ️ No knowledge base built yet.") |
|
|
|
|
|
with gr.Column(scale=2): |
|
|
gr.Markdown("### 💬 RAG Chat") |
|
|
|
|
|
|
|
|
chatbot = gr.Chatbot( |
|
|
label="RAG Chatbot (GPT-5)", |
|
|
height=450, |
|
|
) |
|
|
|
|
|
user_input = gr.Textbox( |
|
|
label="Ask a question", |
|
|
lines=3, |
|
|
placeholder="Ask about zenai.world, AI Arena, or your uploaded docs...", |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
send_btn = gr.Button("Send", variant="primary") |
|
|
clear_btn = gr.Button("Clear Chat") |
|
|
|
|
|
debug_md = gr.Markdown( |
|
|
"ℹ️ Retrieval debug info will appear here after each answer." |
|
|
) |
|
|
|
|
|
|
|
|
save_api_btn.click( |
|
|
fn=save_api_key, |
|
|
inputs=[api_key_box], |
|
|
outputs=[save_status, api_key_state], |
|
|
) |
|
|
|
|
|
|
|
|
save_firecrawl_btn.click( |
|
|
fn=save_firecrawl_key, |
|
|
inputs=[firecrawl_key_box], |
|
|
outputs=[firecrawl_status, firecrawl_key_state], |
|
|
) |
|
|
|
|
|
|
|
|
preset_dropdown.change( |
|
|
fn=apply_preset, |
|
|
inputs=[preset_dropdown], |
|
|
outputs=[system_box, urls_box, raw_text_box], |
|
|
) |
|
|
|
|
|
|
|
|
grab_kb_btn.click( |
|
|
fn=build_knowledge_base, |
|
|
inputs=[api_key_state, firecrawl_key_state, urls_box, raw_text_box, files_input], |
|
|
outputs=[kb_status_md, kb_state], |
|
|
) |
|
|
|
|
|
|
|
|
send_btn.click( |
|
|
fn=chat_with_rag, |
|
|
inputs=[user_input, api_key_state, kb_state, system_box, chat_state], |
|
|
outputs=[chatbot, chat_state, debug_md], |
|
|
) |
|
|
|
|
|
|
|
|
user_input.submit( |
|
|
fn=chat_with_rag, |
|
|
inputs=[user_input, api_key_state, kb_state, system_box, chat_state], |
|
|
outputs=[chatbot, chat_state, debug_md], |
|
|
) |
|
|
|
|
|
|
|
|
clear_btn.click( |
|
|
fn=clear_chat, |
|
|
inputs=[], |
|
|
outputs=[chatbot, chat_state, debug_md], |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch() |
|
|
|