File size: 3,170 Bytes
a1231c7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import gradio as gr
from open_cortex.ui.gradio_history import history_to_chat_messages
from open_cortex.runtime.client import stream_chat_events


def format_runtime(event) -> str:
    if event.kind == "request_started":
        return "Phase: request started\nWaiting for first token..."

    if event.kind == "first_token" and event.snapshot is not None:
        snapshot = event.snapshot
        context_tokens = (
            snapshot.slot_context_tokens[0]
            if snapshot.slot_context_tokens
            else None
        )

        return "\n".join(
            [
                "Phase: first token",
                f"TTFT: {event.ttft_ms:.1f} ms",
                f"Context: {context_tokens} / {snapshot.slot_context_size}",
                f"Token Stream: {snapshot.decode_tps} tok/s",
                (
                    "Engine: "
                    f"processing={snapshot.requests_processing} "
                    f"deferred={snapshot.requests_deferred}"
                ),
            ]
        )

    if event.kind == "request_completed":
        return "\n".join(
            [
                "Phase: completed",
                f"Prompt tokens: {event.prompt_tokens}",
                f"Output tokens: {event.completion_tokens}",
                f"Prefill: {event.prompt_tps:.1f} tok/s",
                f"Decode: {event.decode_tps:.1f} tok/s",
            ]
        )

    return "Phase: decoding"

def user(user_message: str, history: list[dict]) -> tuple[str, list[dict]]:
    if not user_message.strip():
        return "", history

    return "", history + [
        {
            "role": "user",
            "content": user_message,
        }
    ]

def bot(history: list):
    messages = history_to_chat_messages(history)

    history.append(
        {
            "role": "assistant",
            "content": "",
        }
    )

    history.append(
        {
            "role": "assistant",
            "content": "",
        }
    )

    runtime_text = "Phase: idle"

    for event in stream_chat_events(messages):
        runtime_text = format_runtime(event)

        if event.text_delta:
            history[-1]["content"] += event.text_delta

        yield history, runtime_text


with gr.Blocks() as demo:
    gr.Markdown("# OpenCortex Minimal Chat")

    with gr.Row():
        with gr.Column(scale=2):
            chatbot = gr.Chatbot(
                height=480,
            )
            msg = gr.Textbox(
                placeholder="Ask the local model...",
                show_label=False,
            )
            clear = gr.Button("Clear")

        with gr.Column(scale=1):
            runtime = gr.Textbox(
                label="Runtime",
                value="Phase: idle",
                lines=10,
                interactive=False,
            )


    msg.submit(
        user,
        [msg, chatbot],
        [msg, chatbot],
        queue=False,
    ).then(
        bot,
        chatbot,
        [chatbot,runtime],
    )

    clear.click(
        lambda: ([],"Phase: idle"),
        None,
        [chatbot, runtime],
        queue=False,
    )

if __name__ == "__main__":
    demo.queue()
    demo.launch()