Spaces:
Running
Running
| """AppSimple assistant — a curated demo of the LLM harness. | |
| Pre-loaded workspace, global daily question limit, themed to match appsimple.io. | |
| """ | |
| from __future__ import annotations | |
| import hmac | |
| import json | |
| import os | |
| import tempfile | |
| import time | |
| from collections.abc import Generator | |
| from dataclasses import asdict | |
| from datetime import datetime, timezone, date | |
| from pathlib import Path | |
| import gradio as gr | |
| import litellm | |
| from dotenv import load_dotenv | |
| from huggingface_hub import HfApi | |
| from llm_harness.agent import run_agent_loop | |
| from llm_harness.citations import process_citations, superscript | |
| from llm_harness.prompt import build_system_prompt | |
| from llm_harness.tools import TOOL_DEFINITIONS | |
| from llm_harness.trace_viewer import render_trace | |
| from llm_harness.types import Message, TextDeltaEvent, ToolCallEvent, ToolResultEvent | |
| from sandbox_e2b import run_python as e2b_run_python | |
| load_dotenv() | |
| litellm.suppress_debug_info = True | |
| MODEL = os.environ.get("LH_MODEL", "") | |
| ADMIN_TOKEN = os.environ.get("LH_ADMIN_TOKEN", "") | |
| MAX_SESSION_COST = float(os.environ.get("LH_MAX_SESSION_COST", "0.50")) | |
| DAILY_LIMIT = int(os.environ.get("LH_DAILY_LIMIT", "10")) | |
| NOTIFY_EMAIL = os.environ.get("NOTIFY_EMAIL", "") | |
| SMTP_APP_PASSWORD = os.environ.get("SMTP_APP_PASSWORD", "") | |
| HF_TRACES_REPO = os.environ.get("HF_TRACES_REPO", "") | |
| HF_DOCS_REPO = os.environ.get("HF_DOCS_REPO", "") | |
| HF_TOKEN = os.environ.get("HF_TOKEN", "") | |
| DOCUMENT_EXPLORER_URL = os.environ.get( | |
| "DOCUMENT_EXPLORER_URL", | |
| "https://huggingface.co/spaces/chuckfinca/document-explorer", | |
| ) | |
| hf_api = HfApi(token=HF_TOKEN) if HF_TOKEN else None | |
| SOURCE = "prod" if os.environ.get("SPACE_ID") else "dev" | |
| BASE_PROMPT = ( | |
| "You represent Charles Feinn and AppSimple. You have documents about his " | |
| "professional background, services, projects, capabilities, and website content. " | |
| "Use third person. Refer to him as 'Charles' if possible, " | |
| "'Charles Feinn' if appropriate.\n\n" | |
| "Write for potential clients who are exploring whether AppSimple can help them. " | |
| "Your response should stand on its own.\n\n" | |
| "Do not speculate, manufacture connections to make a question fit, or answer " | |
| "off-topic questions." | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Global daily counter (initialized from trace repo on startup) | |
| # --------------------------------------------------------------------------- | |
| def _count_traces_uploaded_today() -> int: | |
| """Initialize the daily counter from trace files already uploaded today.""" | |
| if not hf_api or not HF_TRACES_REPO: | |
| return 0 | |
| today_prefix = datetime.now(timezone.utc).strftime("%Y%m%d") | |
| try: | |
| files = hf_api.list_repo_files(repo_id=HF_TRACES_REPO, repo_type="dataset") | |
| return sum(1 for f in files if f.startswith(today_prefix)) | |
| except Exception as exc: | |
| print(f"WARNING: could not read trace count: {exc}") | |
| return 0 | |
| # Global because Gradio runs handlers in threads sharing one process. | |
| # Survives Space sleep/wake cycles by re-counting traces on startup. | |
| _daily_count = _count_traces_uploaded_today() | |
| _daily_date = date.today() | |
| def _notify_limit_reached(label: str, limit: int) -> None: | |
| """Send a one-time daily email when a question limit is reached.""" | |
| if not NOTIFY_EMAIL or not SMTP_APP_PASSWORD: | |
| return | |
| try: | |
| import smtplib | |
| from email.message import EmailMessage | |
| msg = EmailMessage() | |
| msg["Subject"] = f"{label}: daily limit reached" | |
| msg["From"] = NOTIFY_EMAIL | |
| msg["To"] = NOTIFY_EMAIL | |
| msg.set_content( | |
| f"The {label} daily question limit ({limit}) " | |
| f"was reached on {date.today()}." | |
| ) | |
| with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp: | |
| smtp.login(NOTIFY_EMAIL, SMTP_APP_PASSWORD) | |
| smtp.send_message(msg) | |
| print(f"Notification sent to {NOTIFY_EMAIL}") | |
| except Exception as exc: | |
| print(f"WARNING: notification failed: {exc}") | |
| def _is_daily_question_allowed() -> bool: | |
| """Check whether the daily question limit has been reached, and if not, count this question.""" | |
| global _daily_count, _daily_date | |
| today = date.today() | |
| if today != _daily_date: | |
| _daily_count = 0 | |
| _daily_date = today | |
| if _daily_count >= DAILY_LIMIT: | |
| return False | |
| _daily_count += 1 | |
| if _daily_count == DAILY_LIMIT: | |
| _notify_limit_reached("AppSimple Assistant", DAILY_LIMIT) | |
| return True | |
| def _reset_counter(): | |
| global _daily_count, _daily_date | |
| _daily_count = 0 | |
| _daily_date = date.today() | |
| def _daily_questions_remaining() -> int: | |
| global _daily_count, _daily_date | |
| today = date.today() | |
| if today != _daily_date: | |
| return DAILY_LIMIT | |
| return max(0, DAILY_LIMIT - _daily_count) | |
| # --------------------------------------------------------------------------- | |
| # Workspace — download from private HF dataset repo on startup | |
| # --------------------------------------------------------------------------- | |
| # Set once at startup by load_workspace(), then treated as a constant | |
| WORKSPACE_DIR: Path | None = None | |
| def load_workspace() -> Path | None: | |
| local_workspace = Path(__file__).parent / "workspace" | |
| if local_workspace.is_dir() and any(local_workspace.iterdir()): | |
| doc_count = sum( | |
| 1 for f in local_workspace.iterdir() | |
| if f.is_file() and not f.name.startswith(".") | |
| ) | |
| print(f"Loaded {doc_count} workspace files from local workspace/") | |
| return local_workspace | |
| if not hf_api or not HF_DOCS_REPO: | |
| return None | |
| try: | |
| local_dir = Path(tempfile.mkdtemp(prefix="lh-workspace-")) | |
| files = hf_api.list_repo_files(HF_DOCS_REPO, repo_type="dataset") | |
| doc_files = [f for f in files if not f.startswith(".")] | |
| for filename in doc_files: | |
| path = hf_api.hf_hub_download( | |
| HF_DOCS_REPO, filename, repo_type="dataset" | |
| ) | |
| (local_dir / filename).write_bytes(Path(path).read_bytes()) | |
| print(f"Loaded {len(doc_files)} workspace files from {HF_DOCS_REPO}") | |
| return local_dir | |
| except Exception as exc: | |
| print(f"WARNING: workspace load failed: {exc}") | |
| return None | |
| # --------------------------------------------------------------------------- | |
| # Trace upload | |
| # --------------------------------------------------------------------------- | |
| def _slugify(text: str, max_len: int = 50) -> str: | |
| slug = text.lower().replace(" ", "-") | |
| slug = "".join(c for c in slug if c.isalnum() or c == "-") | |
| return slug[:max_len].rstrip("-") | |
| def upload_trace(result: dict) -> None: | |
| if not hf_api or not HF_TRACES_REPO: | |
| return | |
| timestamp = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S-%f") | |
| question_slug = _slugify(result.get("question", "")) | |
| filename = f"{timestamp}_{question_slug}.json" if question_slug else f"{timestamp}.json" | |
| content = json.dumps(result, indent=2, default=str).encode() | |
| try: | |
| hf_api.upload_file( | |
| path_or_fileobj=content, | |
| path_in_repo=filename, | |
| repo_id=HF_TRACES_REPO, | |
| repo_type="dataset", | |
| ) | |
| except Exception as exc: | |
| print(f"WARNING: trace upload failed: {exc}") | |
| # --------------------------------------------------------------------------- | |
| # Stats formatting | |
| # --------------------------------------------------------------------------- | |
| def format_stats(trace) -> str: | |
| """Format trace stats for display. Accepts a Trace object or dict.""" | |
| if isinstance(trace, dict): | |
| cached = trace.get("cached_tokens", 0) | |
| model = trace.get("model", "") | |
| prompt = trace.get("prompt_tokens", 0) | |
| completion = trace.get("completion_tokens", 0) | |
| tool_calls = trace.get("tool_calls", []) | |
| wall = trace.get("wall_time_s", 0) | |
| cost = trace.get("cost") | |
| else: | |
| cached = trace.cached_tokens | |
| model = trace.model | |
| prompt = trace.prompt_tokens | |
| completion = trace.completion_tokens | |
| tool_calls = trace.tool_calls | |
| wall = trace.wall_time_s | |
| cost = trace.cost | |
| cache_str = f" ({cached} cached)" if cached else "" | |
| model_name = model.split("/")[-1] if model else "" | |
| parts = [ | |
| model_name, | |
| f"{prompt + completion:,} tokens{cache_str}", | |
| f"{len(tool_calls)} tool calls", | |
| f"{wall:.1f}s", | |
| ] | |
| if cost: | |
| parts.append(f"${cost:.4f}") | |
| return " · ".join(parts) | |
| # --------------------------------------------------------------------------- | |
| # Post-processing (shared between chat and stream_question) | |
| # --------------------------------------------------------------------------- | |
| def _process_completed_trace(question: str, trace, start_time: float) -> dict: | |
| """Process a completed agent trace: citations, upload, render. | |
| Returns a dict with answer, sources, stats, trace_html, and remaining. | |
| """ | |
| trace.wall_time_s = round(time.monotonic() - start_time, 2) | |
| clean_answer, sources = process_citations(trace.answer or "", WORKSPACE_DIR) | |
| result = { | |
| "question": question, | |
| "source": SOURCE, | |
| "passed": True, | |
| "assertions": {}, | |
| "trace": asdict(trace), | |
| "citations": sources, | |
| } | |
| upload_trace(result) | |
| return { | |
| "answer": clean_answer, | |
| "sources": sources, | |
| "stats": format_stats(trace), | |
| "trace_html": render_trace(result, max_chars=2000), | |
| "remaining": _daily_questions_remaining(), | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Chat (Gradio chatbot interface) | |
| # --------------------------------------------------------------------------- | |
| def chat(message: str, scratch_path: str, session_cost: float): | |
| no_answer = ("", "", scratch_path, session_cost) | |
| if not _is_daily_question_allowed(): | |
| yield ( | |
| "The daily question limit has been reached. " | |
| "Check back tomorrow, or try it with your own documents on the " | |
| f"[Document Explorer]({DOCUMENT_EXPLORER_URL}).", | |
| *no_answer[1:], | |
| ) | |
| return | |
| if not MODEL: | |
| yield ("Error: LH_MODEL not set.", *no_answer[1:]) | |
| return | |
| if session_cost >= MAX_SESSION_COST: | |
| yield ( | |
| f"Session cost limit reached (${session_cost:.2f} / " | |
| f"${MAX_SESSION_COST:.2f}). Start a new session.", | |
| *no_answer[1:], | |
| ) | |
| return | |
| if not scratch_path: | |
| scratch_path = tempfile.mkdtemp(prefix="lh-scratch-") | |
| scratch_dir = Path(scratch_path) | |
| system_prompt = build_system_prompt(base_prompt=BASE_PROMPT, workspace=WORKSPACE_DIR) | |
| messages: list[Message] = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": message}, | |
| ] | |
| start = time.monotonic() | |
| agent_run = run_agent_loop( | |
| model=MODEL, | |
| messages=messages, | |
| tools=TOOL_DEFINITIONS, | |
| completion=litellm.completion, | |
| workspace=WORKSPACE_DIR, | |
| scratch_dir=scratch_dir, | |
| sandbox_fn=e2b_run_python, | |
| stream=True, | |
| ) | |
| tool_call_count = 0 | |
| accumulated_answer = "" | |
| try: | |
| for event in agent_run: | |
| if isinstance(event, TextDeltaEvent): | |
| accumulated_answer += event.content | |
| yield accumulated_answer, "", scratch_path, session_cost | |
| elif isinstance(event, ToolCallEvent): | |
| tool_call_count += 1 | |
| status = f"*Exploring documents ({tool_call_count} tool calls)...*" | |
| yield status, "", scratch_path, session_cost | |
| accumulated_answer = "" | |
| elif isinstance(event, ToolResultEvent): | |
| continue | |
| else: | |
| cost = agent_run.trace.cost or 0 | |
| session_cost += cost | |
| except Exception as exc: | |
| yield f"Error: {exc}", "", scratch_path, session_cost | |
| return | |
| processed = _process_completed_trace(message, agent_run.trace, start) | |
| answer = processed["answer"] | |
| if processed["sources"]: | |
| source_lines = "\n".join( | |
| f"{superscript(s['id'])} {s['doc']}: \"{s['quote']}\"" | |
| for s in processed["sources"] | |
| ) | |
| answer += f"\n\n---\n{source_lines}" | |
| remaining = processed["remaining"] | |
| answer += f"\n\n---\n*{processed['stats']}*\n\n*{remaining} question{'s' if remaining != 1 else ''} remaining today*" | |
| yield ( | |
| answer, | |
| processed["trace_html"], | |
| scratch_path, | |
| session_cost, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Theme | |
| # --------------------------------------------------------------------------- | |
| appsimple_theme = gr.themes.Base( | |
| primary_hue=gr.themes.Color( | |
| c50="#E5F0FF", c100="#CCE0FF", c200="#99C2FF", | |
| c300="#66A3FF", c400="#4682B4", c500="#4682B4", | |
| c600="#336699", c700="#2B5580", c800="#1F3D5C", c900="#142638", | |
| c950="#0A1A2E", | |
| ), | |
| secondary_hue=gr.themes.Color( | |
| c50="#FEF3C7", c100="#FDE68A", c200="#FCD34D", | |
| c300="#FBBF24", c400="#F59E0B", c500="#F59E0B", | |
| c600="#D97706", c700="#B45309", c800="#92400E", c900="#78350F", | |
| c950="#451A03", | |
| ), | |
| neutral_hue=gr.themes.Color( | |
| c50="#F9FAFB", c100="#F3F4F6", c200="#E5E7EB", | |
| c300="#D1D5DB", c400="#9CA3AF", c500="#6B7280", | |
| c600="#4B5563", c700="#374151", c800="#1F2937", c900="#111827", | |
| c950="#030712", | |
| ), | |
| radius_size=gr.themes.Size( | |
| lg="12px", md="8px", sm="4px", xl="16px", xxl="24px", xs="2px", xxs="1px", | |
| ), | |
| font=("Inter", "system-ui", "sans-serif"), | |
| ).set( | |
| # Kill the blue focus indicator — use Steel Blue or transparent | |
| color_accent="#4682B4", | |
| color_accent_soft="transparent", | |
| input_background_fill="transparent", | |
| input_background_fill_dark="transparent", | |
| input_border_color="transparent", | |
| input_border_color_focus="#E5E7EB", | |
| input_shadow="none", | |
| input_shadow_focus="none", | |
| block_background_fill="transparent", | |
| block_border_width="0px", | |
| block_shadow="none", | |
| panel_background_fill="transparent", | |
| panel_border_width="0px", | |
| body_background_fill="transparent", | |
| background_fill_primary="transparent", | |
| background_fill_secondary="transparent", | |
| ) | |
| CUSTOM_CSS = """ | |
| @import url('https://fonts.googleapis.com/css2?family=Inter:wght@300;400;500;600&display=swap'); | |
| /* Reset Gradio container */ | |
| footer { display: none !important; } | |
| .gradio-container { | |
| max-width: 100% !important; | |
| padding: 0 !important; | |
| background: transparent !important; | |
| } | |
| /* Nuke ALL shadows globally */ | |
| .gradio-container, .gradio-container * { | |
| box-shadow: none !important; | |
| } | |
| /* Input — clean bottom border only */ | |
| #question-input, #question-input > * { | |
| background: transparent !important; | |
| border: none !important; | |
| padding: 0 !important; | |
| } | |
| #question-input textarea { | |
| background: transparent !important; | |
| border: none !important; | |
| border-bottom: 1px solid #E5E7EB !important; | |
| border-radius: 0 !important; | |
| padding: 12px 0 !important; | |
| font-size: 16px !important; | |
| color: #1F2937 !important; | |
| } | |
| #question-input textarea:focus { | |
| border-bottom-color: #4682B4 !important; | |
| outline: none !important; | |
| } | |
| #question-input textarea::placeholder { color: #9CA3AF !important; } | |
| /* Kill ALL focus indicators, bars, underlines — nuclear option */ | |
| .gradio-container [class*="focus"], | |
| .gradio-container [class*="indicator"], | |
| .gradio-container [class*="progress"], | |
| .gradio-container [class*="generating"], | |
| .gradio-container [class*="eta"], | |
| .gradio-container svg.feather-loader { | |
| display: none !important; | |
| height: 0 !important; | |
| opacity: 0 !important; | |
| } | |
| /* Chat output — subtle left border for definition */ | |
| #chat-output { | |
| border: none !important; | |
| background: transparent !important; | |
| padding: 0 !important; | |
| } | |
| #chat-output [class*="message"], | |
| #chat-output [class*="bubble"], | |
| #chat-output [class*="row"] { | |
| background: transparent !important; | |
| border: none !important; | |
| } | |
| /* User question — left-aligned, subtle styling */ | |
| #chat-output [class*="user"] [class*="bubble"], | |
| #chat-output [class*="user"] [class*="message-content"] { | |
| color: #374151 !important; | |
| font-weight: 500 !important; | |
| } | |
| /* Bot response — slight left border for visual anchoring */ | |
| #chat-output [class*="bot"] [class*="bubble"], | |
| #chat-output [class*="bot"] [class*="message-content"] { | |
| border-left: 2px solid #E5E7EB !important; | |
| padding-left: 16px !important; | |
| color: #4B5563 !important; | |
| } | |
| /* Hide ALL buttons inside chatbot */ | |
| #chat-output button { display: none !important; } | |
| /* Accordion (Trace) */ | |
| .accordion { border-color: #E5E7EB !important; } | |
| /* Disclaimer — quiet footnote */ | |
| .disclaimer-text, .disclaimer-text * { | |
| font-size: 12px !important; | |
| color: #9CA3AF !important; | |
| line-height: 1.6 !important; | |
| } | |
| .disclaimer-text a { color: #D97706 !important; } | |
| """ | |
| # --------------------------------------------------------------------------- | |
| # App | |
| # --------------------------------------------------------------------------- | |
| def build_app() -> gr.Blocks: | |
| with gr.Blocks(title="AppSimple Assistant", css=CUSTOM_CSS, theme=appsimple_theme) as demo: | |
| scratch_state = gr.State("") | |
| cost_state = gr.State(0.0) | |
| msg = gr.Textbox( | |
| placeholder="Ask a question...", | |
| label="", | |
| show_label=False, | |
| interactive=True, | |
| elem_id="question-input", | |
| ) | |
| chatbot = gr.Chatbot( | |
| height=None, | |
| label="", | |
| show_label=False, | |
| show_copy_button=False, | |
| elem_id="chat-output", | |
| ) | |
| with gr.Accordion("Trace", open=False, visible=False) as trace_accordion: | |
| trace_display = gr.HTML("") | |
| gr.Markdown( | |
| "LLMs can make mistakes. " | |
| f"Try it with your own documents — " | |
| f"[Open the Document Explorer]({DOCUMENT_EXPLORER_URL})", | |
| elem_classes=["disclaimer-text"], | |
| ) | |
| def respond(message, history, scratch_path, session_cost): | |
| history = history or [] | |
| history.append({"role": "user", "content": message}) | |
| for response, trace_html, sp, sc in chat( | |
| message, scratch_path, session_cost | |
| ): | |
| history_with_response = [ | |
| *history, | |
| {"role": "assistant", "content": response}, | |
| ] | |
| accordion = gr.Accordion(visible=bool(trace_html)) | |
| yield history_with_response, "", trace_html, accordion, sp, sc | |
| def check_admin_reset(request: gr.Request): | |
| token = request.query_params.get("admin", "") | |
| reset = request.query_params.get("reset", "") | |
| if ADMIN_TOKEN and hmac.compare_digest(token, ADMIN_TOKEN) and reset: | |
| _reset_counter() | |
| print("Admin reset: daily counter cleared") | |
| return "" | |
| admin_hidden = gr.State("") | |
| demo.load(check_admin_reset, outputs=[admin_hidden]) | |
| msg.submit( | |
| respond, | |
| inputs=[msg, chatbot, scratch_state, cost_state], | |
| outputs=[ | |
| chatbot, msg, trace_display, trace_accordion, | |
| scratch_state, cost_state, | |
| ], | |
| ) | |
| # Streaming API endpoint for custom chat UI | |
| api_input = gr.Textbox(visible=False) | |
| api_output = gr.Textbox(visible=False) | |
| def api_ask_stream(question): | |
| for event_json in stream_question(question): | |
| yield event_json | |
| api_btn = gr.Button(visible=False) | |
| api_btn.click(api_ask_stream, inputs=api_input, outputs=api_output, api_name="ask") | |
| # Status endpoint (remaining questions) | |
| status_output = gr.Textbox(visible=False) | |
| def api_status(): | |
| return json.dumps({"remaining": _daily_questions_remaining()}) | |
| status_btn = gr.Button(visible=False) | |
| status_btn.click(api_status, inputs=[], outputs=status_output, api_name="status") | |
| # Document viewer endpoint | |
| doc_input = gr.Textbox(visible=False) | |
| doc_output = gr.Textbox(visible=False) | |
| def api_get_doc(filename): | |
| if not WORKSPACE_DIR or not filename: | |
| return json.dumps({"error": "not found"}) | |
| safe_name = Path(filename).name | |
| if not safe_name.endswith(".md"): | |
| safe_name += ".md" | |
| filepath = WORKSPACE_DIR / safe_name | |
| if not filepath.is_file(): | |
| return json.dumps({"error": "not found"}) | |
| return json.dumps({"filename": safe_name, "content": filepath.read_text()}) | |
| doc_btn = gr.Button(visible=False) | |
| doc_btn.click(api_get_doc, inputs=doc_input, outputs=doc_output, api_name="doc") | |
| # Trace list endpoint (admin-only) | |
| traces_token_input = gr.Textbox(visible=False) | |
| traces_query_input = gr.Textbox(visible=False) | |
| traces_output = gr.Textbox(visible=False) | |
| def api_list_traces(token, query): | |
| if not ADMIN_TOKEN or not hmac.compare_digest(token, ADMIN_TOKEN): | |
| return json.dumps({"error": "unauthorized"}) | |
| if not hf_api or not HF_TRACES_REPO: | |
| return json.dumps({"error": "traces not configured"}) | |
| try: | |
| files = hf_api.list_repo_files( | |
| repo_id=HF_TRACES_REPO, repo_type="dataset" | |
| ) | |
| traces = sorted( | |
| [f for f in files if f.endswith(".json")], reverse=True | |
| ) | |
| if query: | |
| traces = [f for f in traces if query.lower() in f.lower()] | |
| return json.dumps({"traces": traces[:100]}) | |
| except Exception as exc: | |
| return json.dumps({"error": str(exc)}) | |
| traces_btn = gr.Button(visible=False) | |
| traces_btn.click(api_list_traces, inputs=[traces_token_input, traces_query_input], outputs=traces_output, api_name="traces") | |
| # Trace replay endpoint (admin-only) | |
| replay_token_input = gr.Textbox(visible=False) | |
| replay_filename_input = gr.Textbox(visible=False) | |
| replay_output = gr.Textbox(visible=False) | |
| def api_get_trace(token, filename): | |
| if not ADMIN_TOKEN or not hmac.compare_digest(token, ADMIN_TOKEN): | |
| return json.dumps({"error": "unauthorized"}) | |
| if not hf_api or not HF_TRACES_REPO or not filename: | |
| return json.dumps({"error": "not found"}) | |
| safe_name = Path(filename).name | |
| try: | |
| path = hf_api.hf_hub_download( | |
| HF_TRACES_REPO, safe_name, repo_type="dataset" | |
| ) | |
| data = json.loads(Path(path).read_text()) | |
| trace = data.get("trace", {}) | |
| raw_answer = trace.get("answer", "") | |
| clean_answer, sources = process_citations(raw_answer, WORKSPACE_DIR) | |
| trace_html = render_trace(data, max_chars=2000) | |
| return json.dumps({ | |
| "question": data.get("question", ""), | |
| "answer": clean_answer, | |
| "sources": sources, | |
| "stats": format_stats(trace), | |
| "source_tag": data.get("source", ""), | |
| "trace_html": trace_html, | |
| "filename": safe_name, | |
| }) | |
| except Exception as exc: | |
| return json.dumps({"error": str(exc)}) | |
| replay_btn = gr.Button(visible=False) | |
| replay_btn.click(api_get_trace, inputs=[replay_token_input, replay_filename_input], outputs=replay_output, api_name="replay") | |
| return demo | |
| def stream_question(question: str) -> Generator[str, None, None]: | |
| """Streaming API — yields JSON event strings for the custom chat UI.""" | |
| if not _is_daily_question_allowed(): | |
| yield json.dumps({"type": "error", "error": "daily_limit", "remaining": 0}) | |
| return | |
| if not MODEL: | |
| yield json.dumps({"type": "error", "error": "LH_MODEL not set"}) | |
| return | |
| scratch_dir = Path(tempfile.mkdtemp(prefix="lh-scratch-")) | |
| system_prompt = build_system_prompt(base_prompt=BASE_PROMPT, workspace=WORKSPACE_DIR) | |
| messages: list[Message] = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": question}, | |
| ] | |
| start = time.monotonic() | |
| agent_run = run_agent_loop( | |
| model=MODEL, | |
| messages=messages, | |
| tools=TOOL_DEFINITIONS, | |
| completion=litellm.completion, | |
| workspace=WORKSPACE_DIR, | |
| scratch_dir=scratch_dir, | |
| sandbox_fn=e2b_run_python, | |
| stream=True, | |
| ) | |
| tool_call_count = 0 | |
| try: | |
| for event in agent_run: | |
| if isinstance(event, TextDeltaEvent): | |
| yield json.dumps({"type": "delta", "content": event.content}) | |
| elif isinstance(event, ToolCallEvent): | |
| tool_call_count += 1 | |
| yield json.dumps({"type": "tool_call", "count": tool_call_count, "name": event.name}) | |
| except Exception as exc: | |
| print(f"ERROR in stream_question: {exc}") | |
| yield json.dumps({"type": "error", "error": "An error occurred during processing."}) | |
| return | |
| try: | |
| processed = _process_completed_trace(question, agent_run.trace, start) | |
| except Exception as exc: | |
| print(f"ERROR in post-processing: {exc}") | |
| yield json.dumps({"type": "error", "error": "An error occurred during processing."}) | |
| return | |
| yield json.dumps({ | |
| "type": "done", | |
| "answer": processed["answer"], | |
| "sources": processed["sources"], | |
| "stats": processed["stats"], | |
| "trace_html": processed["trace_html"], | |
| "remaining": processed["remaining"], | |
| }) | |
| WORKSPACE_DIR = load_workspace() | |
| if __name__ == "__main__": | |
| demo = build_app() | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |