File size: 5,142 Bytes
35c0d38 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 | """Gradio entry point.
Single app that runs both locally (`python app.py`) and on Hugging Face Spaces.
Builds a gr.ChatInterface with a radio toggle between the OSS (Qwen) and frontier
(Claude) assistants, wired through the full Phase 3 pipeline:
input guardrail -> memory-backed assistant (+ tools) -> output guardrail
A small status footer under each reply shows which assistant answered, which
tools fired, and whether either guardrail triggered.
"""
from __future__ import annotations
import uuid
import gradio as gr
from langchain_core.messages import AIMessage
from src.guardrails import (
INPUT_REFUSAL,
OUTPUT_REFUSAL,
check_input,
moderate_output,
)
from src.memory import build_conversational, get_session_history
from src.observability import flush, observe, trace_attributes
# Labels shown in the radio toggle.
FRONTIER = "Claude (frontier)"
OSS = "Qwen2.5-1.5B (open-source)"
# One session id per app launch -> a fresh conversation each run. (For a
# multi-user deployment we'd mint a per-browser-session id instead; fine for
# this single-user demo / eval harness.)
SESSION_ID = uuid.uuid4().hex
# Memory-wrapped assistants, built on first use and cached. Building Qwen here
# triggers its ~3 GB load, so we only do it when that assistant is first picked.
_conversationals: dict = {}
def _get_conversational(choice: str):
if choice not in _conversationals:
if choice == FRONTIER:
from src.assistants.frontier import ClaudeAssistant
_conversationals[choice] = build_conversational(ClaudeAssistant())
else:
from src.assistants.oss import QwenAssistant
_conversationals[choice] = build_conversational(QwenAssistant())
return _conversationals[choice]
def _footer(assistant: str, tools_used: list[str], in_blocked: bool, out_blocked: bool) -> str:
"""Build the small status line shown under each reply."""
tools = ", ".join(dict.fromkeys(tools_used)) if tools_used else "none"
input_status = "BLOCKED" if in_blocked else "ok"
output_status = "BLOCKED" if out_blocked else "ok"
return (
f"\n\n---\n"
f"*assistant: {assistant} | tools: {tools} | "
f"guardrails -- input: {input_status}, output: {output_status}*"
)
@observe(name="chat_turn")
def respond(message: str, history: list[dict], assistant_choice: str) -> str:
"""ChatInterface callback running the full guardrail + memory + tools pipeline.
Note: conversation context comes from persistent memory (SQLite via
RunnableWithMessageHistory), not from Gradio's `history` arg, so we ignore it.
The whole turn is one Langfuse trace, tagged with the session id and which
assistant answered; the model/tool/moderation spans nest underneath.
"""
with trace_attributes(
session_id=SESSION_ID,
tags=[assistant_choice],
metadata={"assistant_type": assistant_choice},
):
# --- Layer 1: input guardrail (before the model sees anything) ---
in_check = check_input(message)
if in_check.blocked:
flush()
return INPUT_REFUSAL + _footer(assistant_choice, [], True, False)
# --- Generate with memory + tools ---
conv = _get_conversational(assistant_choice)
result: AIMessage = conv.invoke(
{"input": message},
config={"configurable": {"session_id": SESSION_ID}},
)
text = result.content
tools_used = result.additional_kwargs.get("tools_used", [])
# --- Layer 2: output moderation ---
out_check = moderate_output(text)
if out_check.blocked:
# Replace what the user sees AND what gets remembered, so the unsafe
# text doesn't leak into future context. (Rare path; rewrite is cheap.)
history_store = get_session_history(SESSION_ID)
msgs = history_store.messages
history_store.clear()
history_store.add_messages(msgs[:-1] + [AIMessage(content=OUTPUT_REFUSAL)])
text = OUTPUT_REFUSAL
# Send buffered traces to Langfuse at the end of each turn.
flush()
return text + _footer(assistant_choice, tools_used, False, out_check.blocked)
def build_demo() -> gr.ChatInterface:
assistant_picker = gr.Radio(
choices=[FRONTIER, OSS],
value=FRONTIER,
label="Assistant",
info="Switch between the frontier (Claude) and open-source (Qwen) models.",
)
# Gradio 6 uses the "messages" history format by default (no `type=` arg).
return gr.ChatInterface(
fn=respond,
additional_inputs=[assistant_picker],
title="OSS vs. Frontier Assistant",
description=(
"Compare an open-source assistant (Qwen2.5-1.5B) against a frontier "
"assistant (Claude Sonnet 4.5). Both have short-term memory, a "
"calculator + web-search tool, and input/output guardrails. Pick one "
"below and chat — the status line under each reply shows what fired."
),
)
demo = build_demo()
if __name__ == "__main__":
demo.launch()
|