File size: 5,142 Bytes
35c0d38
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
"""Gradio entry point.

Single app that runs both locally (`python app.py`) and on Hugging Face Spaces.
Builds a gr.ChatInterface with a radio toggle between the OSS (Qwen) and frontier
(Claude) assistants, wired through the full Phase 3 pipeline:

    input guardrail  ->  memory-backed assistant (+ tools)  ->  output guardrail

A small status footer under each reply shows which assistant answered, which
tools fired, and whether either guardrail triggered.
"""

from __future__ import annotations

import uuid

import gradio as gr
from langchain_core.messages import AIMessage

from src.guardrails import (
    INPUT_REFUSAL,
    OUTPUT_REFUSAL,
    check_input,
    moderate_output,
)
from src.memory import build_conversational, get_session_history
from src.observability import flush, observe, trace_attributes

# Labels shown in the radio toggle.
FRONTIER = "Claude (frontier)"
OSS = "Qwen2.5-1.5B (open-source)"

# One session id per app launch -> a fresh conversation each run. (For a
# multi-user deployment we'd mint a per-browser-session id instead; fine for
# this single-user demo / eval harness.)
SESSION_ID = uuid.uuid4().hex

# Memory-wrapped assistants, built on first use and cached. Building Qwen here
# triggers its ~3 GB load, so we only do it when that assistant is first picked.
_conversationals: dict = {}


def _get_conversational(choice: str):
    if choice not in _conversationals:
        if choice == FRONTIER:
            from src.assistants.frontier import ClaudeAssistant

            _conversationals[choice] = build_conversational(ClaudeAssistant())
        else:
            from src.assistants.oss import QwenAssistant

            _conversationals[choice] = build_conversational(QwenAssistant())
    return _conversationals[choice]


def _footer(assistant: str, tools_used: list[str], in_blocked: bool, out_blocked: bool) -> str:
    """Build the small status line shown under each reply."""
    tools = ", ".join(dict.fromkeys(tools_used)) if tools_used else "none"
    input_status = "BLOCKED" if in_blocked else "ok"
    output_status = "BLOCKED" if out_blocked else "ok"
    return (
        f"\n\n---\n"
        f"*assistant: {assistant} | tools: {tools} | "
        f"guardrails -- input: {input_status}, output: {output_status}*"
    )


@observe(name="chat_turn")
def respond(message: str, history: list[dict], assistant_choice: str) -> str:
    """ChatInterface callback running the full guardrail + memory + tools pipeline.

    Note: conversation context comes from persistent memory (SQLite via
    RunnableWithMessageHistory), not from Gradio's `history` arg, so we ignore it.
    The whole turn is one Langfuse trace, tagged with the session id and which
    assistant answered; the model/tool/moderation spans nest underneath.
    """
    with trace_attributes(
        session_id=SESSION_ID,
        tags=[assistant_choice],
        metadata={"assistant_type": assistant_choice},
    ):
        # --- Layer 1: input guardrail (before the model sees anything) ---
        in_check = check_input(message)
        if in_check.blocked:
            flush()
            return INPUT_REFUSAL + _footer(assistant_choice, [], True, False)

        # --- Generate with memory + tools ---
        conv = _get_conversational(assistant_choice)
        result: AIMessage = conv.invoke(
            {"input": message},
            config={"configurable": {"session_id": SESSION_ID}},
        )
        text = result.content
        tools_used = result.additional_kwargs.get("tools_used", [])

        # --- Layer 2: output moderation ---
        out_check = moderate_output(text)
        if out_check.blocked:
            # Replace what the user sees AND what gets remembered, so the unsafe
            # text doesn't leak into future context. (Rare path; rewrite is cheap.)
            history_store = get_session_history(SESSION_ID)
            msgs = history_store.messages
            history_store.clear()
            history_store.add_messages(msgs[:-1] + [AIMessage(content=OUTPUT_REFUSAL)])
            text = OUTPUT_REFUSAL

        # Send buffered traces to Langfuse at the end of each turn.
        flush()
        return text + _footer(assistant_choice, tools_used, False, out_check.blocked)


def build_demo() -> gr.ChatInterface:
    assistant_picker = gr.Radio(
        choices=[FRONTIER, OSS],
        value=FRONTIER,
        label="Assistant",
        info="Switch between the frontier (Claude) and open-source (Qwen) models.",
    )
    # Gradio 6 uses the "messages" history format by default (no `type=` arg).
    return gr.ChatInterface(
        fn=respond,
        additional_inputs=[assistant_picker],
        title="OSS vs. Frontier Assistant",
        description=(
            "Compare an open-source assistant (Qwen2.5-1.5B) against a frontier "
            "assistant (Claude Sonnet 4.5). Both have short-term memory, a "
            "calculator + web-search tool, and input/output guardrails. Pick one "
            "below and chat — the status line under each reply shows what fired."
        ),
    )


demo = build_demo()

if __name__ == "__main__":
    demo.launch()