Spaces:
Runtime error
Runtime error
| import time | |
| import gradio as gr | |
| from openai import OpenAI | |
| import re | |
| # --- Configuration --- | |
| # Point this to your local LLM (e.g., Llama.cpp, vLLM, Ollama) | |
| # If using Ollama, URL is usually http://localhost:11434/v1 | |
| client = OpenAI(base_url="http://localhost:8080/v1", api_key="no-key-required") | |
| CSS = """ | |
| .spinner { animation: spin 1s linear infinite; display: inline-block; margin-right: 8px; } | |
| @keyframes spin { from { transform: rotate(0deg); } to { transform: rotate(360deg); } } | |
| .thinking-summary { cursor: pointer; padding: 8px; background: #f5f5f5; border-radius: 4px; margin: 4px 0; } | |
| .thinking-container { border-left: 3px solid #facc15; padding-left: 10px; margin: 8px 0; background: #210c29; } | |
| details:not([open]) .thinking-container { border-left-color: #290c15; } | |
| details { border: 1px solid #e0e0e0 !important; border-radius: 8px !important; padding: 12px !important; margin: 8px 0 !important; } | |
| """ | |
| def format_time(seconds_float): | |
| total_seconds = int(round(seconds_float)) | |
| hours = total_seconds // 3600 | |
| remaining = total_seconds % 3600 | |
| minutes = remaining // 60 | |
| seconds = remaining % 60 | |
| if hours > 0: return f"{hours}h {minutes}m {seconds}s" | |
| elif minutes > 0: return f"{minutes}m {seconds}s" | |
| return f"{seconds}s" | |
| # --- Web UI Logic (HTML/Streaming) --- | |
| class ParserState: | |
| __slots__ = ['answer', 'thought', 'in_think', 'start_time', 'last_pos', 'total_think_time'] | |
| def __init__(self): | |
| self.answer = "" | |
| self.thought = "" | |
| self.in_think = False | |
| self.start_time = 0 | |
| self.last_pos = 0 | |
| self.total_think_time = 0.0 | |
| def parse_response(text, state): | |
| buffer = text[state.last_pos:] | |
| state.last_pos = len(text) | |
| while buffer: | |
| if not state.in_think: | |
| think_start = buffer.find('<think>') | |
| if think_start != -1: | |
| state.answer += buffer[:think_start] | |
| state.in_think = True | |
| state.start_time = time.perf_counter() | |
| buffer = buffer[think_start + 7:] | |
| else: | |
| state.answer += buffer | |
| break | |
| else: | |
| think_end = buffer.find('</think>') | |
| if think_end != -1: | |
| state.thought += buffer[:think_end] | |
| duration = time.perf_counter() - state.start_time | |
| state.total_think_time += duration | |
| state.in_think = False | |
| buffer = buffer[think_end + 8:] | |
| else: | |
| state.thought += buffer | |
| break | |
| elapsed = time.perf_counter() - state.start_time if state.in_think else 0 | |
| return state, elapsed | |
| def format_ui_response(state, elapsed): | |
| answer_part = state.answer.replace('<think>', '').replace('</think>', '') | |
| collapsible = [] | |
| collapsed = "<details open>" | |
| if state.thought or state.in_think: | |
| if state.in_think: | |
| total_elapsed = state.total_think_time + elapsed | |
| status = f"🌀 Thinking for {format_time(total_elapsed)}" | |
| else: | |
| status = f"✅ Thought for {format_time(state.total_think_time)}" | |
| collapsed = "<details>" | |
| collapsible.append( | |
| f"{collapsed}<summary>{status}</summary>\n\n<div class='thinking-container'>\n{state.thought}\n</div>\n</details>" | |
| ) | |
| return collapsible, answer_part | |
| def generate_web_response(history, temperature, top_p, max_tokens, active_gen): | |
| messages = [{"role": "user", "content": history[-1][0]}] | |
| # Add history context if needed for Web UI (optional, usually handled by Chatbot component) | |
| full_response = "" | |
| state = ParserState() | |
| try: | |
| stream = client.chat.completions.create( | |
| model="local-model", # Model name is ignored by most local servers | |
| messages=messages, | |
| temperature=temperature, | |
| top_p=top_p, | |
| max_tokens=max_tokens, | |
| stream=True | |
| ) | |
| for chunk in stream: | |
| if not active_gen[0]: break | |
| if chunk.choices[0].delta.content: | |
| full_response += chunk.choices[0].delta.content | |
| state, elapsed = parse_response(full_response, state) | |
| collapsible, answer_part = format_ui_response(state, elapsed) | |
| history[-1][1] = "\n\n".join(collapsible + [answer_part]) | |
| yield history | |
| # Final pass | |
| state, elapsed = parse_response(full_response, state) | |
| collapsible, answer_part = format_ui_response(state, elapsed) | |
| history[-1][1] = "\n\n".join(collapsible + [answer_part]) | |
| yield history | |
| except Exception as e: | |
| history[-1][1] = f"Error: {str(e)}" | |
| yield history | |
| finally: | |
| active_gen[0] = False | |
| def user(message, history): | |
| return "", history + [[message, None]] | |
| # --- API Logic (Discord Bot) --- | |
| def discord_api_endpoint(prompt, history_json): | |
| """ | |
| API Endpoint for Discord. | |
| Args: | |
| prompt: The user's message. | |
| history_json: List of [user, bot] lists from previous context. | |
| Returns: | |
| String containing the formatted response. | |
| """ | |
| # 1. Reconstruct messages for OpenAI Client | |
| messages = [] | |
| # Add system prompt if desired | |
| # messages.append({"role": "system", "content": "You are a helpful assistant."}) | |
| # History comes in as [[user, bot], [user, bot]] | |
| for pair in history_json: | |
| if pair[0]: messages.append({"role": "user", "content": pair[0]}) | |
| if pair[1]: messages.append({"role": "assistant", "content": pair[1]}) | |
| messages.append({"role": "user", "content": prompt}) | |
| try: | |
| # Non-streaming request for the bot to ensure we get full completion before sending | |
| response = client.chat.completions.create( | |
| model="local-model", | |
| messages=messages, | |
| temperature=0.7, | |
| max_tokens=4096 | |
| ) | |
| raw_content = response.choices[0].message.content | |
| # Parse <think> tags for Discord Markdown | |
| # We replace <think> content with a Discord blockquote (>>> or >) | |
| def replace_think(match): | |
| thought_content = match.group(1).strip() | |
| # Format as italicized quote | |
| return f"> *Thinking Process:*\n> {thought_content}\n\n" | |
| # Regex to find <think>...</think> (dotall to match newlines) | |
| formatted_content = re.sub(r'<think>(.*?)</think>', replace_think, raw_content, flags=re.DOTALL) | |
| return formatted_content | |
| except Exception as e: | |
| return f"❌ **Error from backend:** {str(e)}" | |
| # --- Interface Setup --- | |
| with gr.Blocks(css=CSS) as demo: | |
| gr.Markdown("## Qwen/Reasoning Model Host") | |
| active_gen = gr.State([False]) | |
| chatbot = gr.Chatbot(elem_id="chatbot", height=500, show_label=False, render_markdown=True) | |
| with gr.Row(): | |
| msg = gr.Textbox(label="Message", placeholder="Type message...", scale=4) | |
| submit_btn = gr.Button("Send", variant='primary', scale=1) | |
| with gr.Accordion("Parameters", open=False): | |
| temperature = gr.Slider(0.1, 1.5, 0.6, label="Temperature") | |
| top_p = gr.Slider(0.1, 1.0, 0.95, label="Top-p") | |
| max_tokens = gr.Slider(2048, 32768, 4096, step=64, label="Max Tokens") | |
| # UI Events | |
| submit_event = submit_btn.click(user, [msg, chatbot], [msg, chatbot], queue=False).then( | |
| lambda: [True], outputs=active_gen).then( | |
| generate_web_response, [chatbot, temperature, top_p, max_tokens, active_gen], chatbot | |
| ) | |
| msg.submit(user, [msg, chatbot], [msg, chatbot], queue=False).then( | |
| lambda: [True], outputs=active_gen).then( | |
| generate_web_response, [chatbot, temperature, top_p, max_tokens, active_gen], chatbot | |
| ) | |
| # --- HIDDEN API COMPONENT --- | |
| # We create a hidden button/function specifically to expose the API | |
| api_trigger = gr.Button("API Trigger", visible=False) | |
| api_trigger.click( | |
| fn=discord_api_endpoint, | |
| inputs=[gr.Textbox(label="Prompt"), gr.State(label="History")], # Virtual inputs | |
| outputs=[gr.Textbox(label="Response")], | |
| api_name="discord_chat" # <--- THIS IS THE ENDPOINT NAME | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |