File size: 5,584 Bytes
7e77ec2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import spaces
from transformers import TextIteratorStreamer, AutoModelForCausalLM, AutoTokenizer
from threading import Thread
import gradio as gr
import re
from openai_harmony import (
    load_harmony_encoding,
    HarmonyEncodingName,
    Role,
    Message,
    Conversation,
    SystemContent,
    DeveloperContent,
    ReasoningEffort,
)

RE_REASONING = re.compile(r'(?i)Reasoning:\s*(low|medium|high)')
RE_FINAL_MARKER = re.compile(r'(?i)assistantfinal')
RE_ANALYSIS_PREFIX = re.compile(r'(?i)^analysis\s*')

def parse_reasoning_and_instructions(system_prompt: str):
    instructions = system_prompt or "You are a helpful assistant."
    match = RE_REASONING.search(instructions)
    effort_key = match.group(1).lower() if match else 'medium'
    effort = {
        'low': ReasoningEffort.LOW,
        'medium': ReasoningEffort.MEDIUM,
        'high': ReasoningEffort.HIGH,
    }.get(effort_key, ReasoningEffort.MEDIUM)
    cleaned_instructions = RE_REASONING.sub('', instructions).strip()
    return effort, cleaned_instructions

model_id = "ArliAI/gpt-oss-20b-Derestricted"

tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(
    model_id,
    torch_dtype="auto",
    trust_remote_code=True,
    device_map=None,
)

enc = load_harmony_encoding(HarmonyEncodingName.HARMONY_GPT_OSS)

def format_conversation_history(chat_history):
    """Handle legacy/new format"""
    messages = []
    for item in chat_history:
        if isinstance(item, dict):
            role = item.get("role", "user")
            content = item.get("content", "")
            if isinstance(content, list):
                content = content[0].get("text", str(content)) if content else ""
            messages.append({"role": role, "content": content})
        elif isinstance(item, (list, tuple)):
            if item[0]:
                messages.append({"role": "user", "content": item[0]})
            if len(item) > 1 and item[1]:
                messages.append({"role": "assistant", "content": item[1]})
    return messages

@spaces.GPU(duration=120)
def generate_response(input_data, chat_history, max_new_tokens, system_prompt, temperature, top_p, top_k, repetition_penalty):
    model.to('cuda')
    
    new_message = {"role": "user", "content": input_data}
    processed_history = format_conversation_history(chat_history)
    effort, instructions = parse_reasoning_and_instructions(system_prompt)
    system_content = SystemContent.new().with_reasoning_effort(effort)
    developer_content = DeveloperContent.new().with_instructions(instructions)
    harmony_messages = [
        Message.from_role_and_content(Role.SYSTEM, system_content),
        Message.from_role_and_content(Role.DEVELOPER, developer_content),
    ]
    
    for m in processed_history + [new_message]:
        role = Role.USER if m["role"] == "user" else Role.ASSISTANT
        harmony_messages.append(Message.from_role_and_content(role, m["content"]))
    conversation = Conversation.from_messages(harmony_messages)
    prompt_tokens = enc.render_conversation_for_completion(conversation, Role.ASSISTANT)
    prompt_text = tokenizer.decode(prompt_tokens, skip_special_tokens=False)

    streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)
    
    inputs = tokenizer(prompt_text, return_tensors="pt").to('cuda')

    generation_kwargs = {
        "input_ids": inputs["input_ids"],
        "attention_mask": inputs["attention_mask"],
        "max_new_tokens": max_new_tokens,
        "do_sample": True,
        "temperature": temperature,
        "top_p": top_p,
        "top_k": top_k,
        "repetition_penalty": repetition_penalty,
        "streamer": streamer,
    }
    
    thread = Thread(target=model.generate, kwargs=generation_kwargs)
    thread.start()

    thinking = ""
    final = ""
    started_final = False
    for chunk in streamer:
        if not started_final:
            parts = RE_FINAL_MARKER.split(chunk, maxsplit=1)
            thinking += parts[0]
            if len(parts) > 1:
                final += parts[-1]
                started_final = True
        else:
            final += chunk
        clean_thinking = RE_ANALYSIS_PREFIX.sub('', thinking).strip()
        clean_final = final.strip()
        formatted = f"<details open><summary>Click to view Thinking Process</summary>\n\n{clean_thinking}\n\n</details>\n\n{clean_final}"
        yield formatted
    
    thread.join()

demo = gr.ChatInterface(
    fn=generate_response,
    additional_inputs=[
        gr.Slider(label="Max new tokens", minimum=64, maximum=4096, step=1, value=2048),
        gr.Textbox(
            label="System Prompt",
            value="You are a helpful assistant. Reasoning: medium",
            lines=4,
            placeholder="Change system prompt"
        ),
        gr.Slider(label="Temperature", minimum=0.1, maximum=2.0, step=0.1, value=0.7),
        gr.Slider(label="Top-p", minimum=0.05, maximum=1.0, step=0.05, value=0.9),
        gr.Slider(label="Top-k", minimum=1, maximum=100, step=1, value=50),
        gr.Slider(label="Repetition Penalty", minimum=1.0, maximum=2.0, step=0.05, value=1.0)
    ],
    examples=[
        ["Explain Newton's laws clearly and concisely"],
        ["What are the benefits of open weight AI models"],
        ["Write a Python function to calculate the Fibonacci sequence"],
    ],
    cache_examples=False,
    description="""# GPT-OSS 20B Derestricted.""",
    fill_height=True,
    stop_btn="Stop Generation",
)

if __name__ == "__main__":
    demo.launch()