| import base64 |
| import os |
| import uuid |
| import time |
| import gradio as gr |
| from gradio_client import utils as client_utils |
| from config import DEFAULT_SYS_PROMPT, MODEL, api_key, base_url |
|
|
| from openai import OpenAI |
|
|
| |
| client = OpenAI( |
| api_key=api_key, |
| base_url=base_url, |
| ) |
|
|
|
|
| def encode_file_to_base64(file_path): |
| """Encode file to base64 data URL.""" |
| with open(file_path, "rb") as file: |
| mime_type = client_utils.get_mimetype(file_path) |
| base64_data = base64.b64encode(file.read()).decode("utf-8") |
| return f"data:{mime_type};base64,{base64_data}" |
|
|
|
|
| def is_video_file(file_path): |
| """Check if file is a video.""" |
| video_extensions = {'.mp4', '.avi', '.mov', '.mkv', '.webm', '.flv', '.wmv'} |
| ext = os.path.splitext(file_path)[1].lower() |
| return ext in video_extensions |
|
|
|
|
| def is_image_file(file_path): |
| """Check if file is an image.""" |
| image_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp'} |
| ext = os.path.splitext(file_path)[1].lower() |
| return ext in image_extensions |
|
|
|
|
| def format_messages(history, sys_prompt=None): |
| """Format chat history to OpenAI message format.""" |
| messages = [{ |
| "role": "system", |
| "content": sys_prompt or DEFAULT_SYS_PROMPT, |
| }] |
| |
| for item in history: |
| if item["role"] == "user": |
| content_parts = [] |
| |
| |
| if item.get("files"): |
| for file_path in item["files"]: |
| if file_path.startswith("http"): |
| |
| if is_video_file(file_path): |
| content_parts.append({ |
| "type": "video_url", |
| "video_url": {"url": file_path} |
| }) |
| else: |
| content_parts.append({ |
| "type": "image_url", |
| "image_url": {"url": file_path} |
| }) |
| elif os.path.exists(file_path): |
| data_url = encode_file_to_base64(file_path) |
| if is_video_file(file_path): |
| content_parts.append({ |
| "type": "video_url", |
| "video_url": {"url": data_url} |
| }) |
| else: |
| content_parts.append({ |
| "type": "image_url", |
| "image_url": {"url": data_url} |
| }) |
| |
| |
| if item.get("content"): |
| content_parts.append({ |
| "type": "text", |
| "text": item.get("content", "") |
| }) |
| |
| messages.append({ |
| "role": "user", |
| "content": content_parts if len(content_parts) > 1 else item.get("content", "") |
| }) |
| |
| elif item["role"] == "assistant": |
| content = item.get("content", "").strip() |
| if content: |
| messages.append({ |
| "role": "assistant", |
| "content": content |
| }) |
|
|
| |
| return messages |
|
|
|
|
| def chat_stream(messages, model, enable_thinking=True): |
| """Stream chat completion from API.""" |
| try: |
| kwargs = { |
| "model": model, |
| "messages": messages, |
| "stream": True, |
| "top_p": 0.95, |
| } |
| |
| if enable_thinking: |
| kwargs["temperature"] = 1 |
| kwargs["extra_body"] = {"thinking": {"type": "enabled"}} |
| else: |
| kwargs["temperature"] = 0.6 |
| kwargs["extra_body"] = {"thinking": {"type": "disabled"}} |
| |
| print(f"[DEBUG] enable_thinking={enable_thinking}, model={model}") |
| |
| response = client.chat.completions.create(**kwargs) |
| |
| for chunk in response: |
| if chunk and chunk.choices: |
| delta = chunk.choices[0].delta |
| if hasattr(delta, 'content') and delta.content: |
| yield delta.content, None |
| elif hasattr(delta, 'reasoning_content') and delta.reasoning_content: |
| yield None, delta.reasoning_content |
| |
| except Exception as e: |
| yield f"\n\n**Error:** {str(e)}", None |
|
|
|
|
| |
| chat_history = [] |
| conversations = {} |
| current_conversation_id = None |
| enable_thinking = True |
| uploaded_files = [] |
|
|
|
|
| def create_new_conversation(): |
| """Create a new conversation.""" |
| global current_conversation_id, chat_history, enable_thinking, uploaded_files |
| current_conversation_id = str(uuid.uuid4()) |
| chat_history = [] |
| uploaded_files = [] |
| conversations[current_conversation_id] = { |
| "history": chat_history, |
| "enable_thinking": True |
| } |
| enable_thinking = True |
| return [], None, gr.update(value="⚡ Thinking: ON") |
|
|
|
|
| def toggle_thinking(): |
| """Toggle thinking mode.""" |
| global enable_thinking |
| enable_thinking = not enable_thinking |
| if current_conversation_id and current_conversation_id in conversations: |
| conversations[current_conversation_id]["enable_thinking"] = enable_thinking |
| |
| if enable_thinking: |
| return gr.update(value="⚡ Thinking: ON") |
| else: |
| return gr.update(value="💤 Thinking: OFF") |
|
|
|
|
| def format_chat_display(history): |
| """Format history for Gradio Chatbot display.""" |
| messages = [] |
| for item in history: |
| if item["role"] == "user": |
| content_parts = [] |
| |
| if item.get("files"): |
| for f in item["files"]: |
| if is_image_file(f): |
| content_parts.append({"type": "image", "path": f}) |
| else: |
| content_parts.append({"type": "text", "text": f"🎥 {os.path.basename(f)}"}) |
| |
| if item.get("content"): |
| content_parts.append({"type": "text", "text": item["content"]}) |
| |
| if len(content_parts) == 1 and content_parts[0].get("type") == "text": |
| messages.append({"role": "user", "content": content_parts[0]["text"]}) |
| else: |
| messages.append({"role": "user", "content": content_parts}) |
| elif item["role"] == "assistant": |
| messages.append({"role": "assistant", "content": item.get("content", "")}) |
|
|
| return messages |
|
|
|
|
| def handle_upload(files): |
| """Handle file upload.""" |
| global uploaded_files |
| if files: |
| if isinstance(files, list): |
| for f in files: |
| if isinstance(f, dict) and 'name' in f: |
| uploaded_files.append(f['name']) |
| elif isinstance(f, str): |
| uploaded_files.append(f) |
| elif isinstance(files, dict) and 'name' in files: |
| uploaded_files.append(files['name']) |
| elif isinstance(files, str): |
| uploaded_files.append(files) |
| |
| |
| if uploaded_files: |
| preview = "📎 " + ", ".join([os.path.basename(p) for p in uploaded_files[-3:]]) |
| if len(uploaded_files) > 3: |
| preview += f" (+{len(uploaded_files) - 3} more)" |
| return preview |
| return "" |
|
|
|
|
| def user_message(message, files): |
| """Add user message to history.""" |
| global chat_history, current_conversation_id, uploaded_files |
| |
| if not current_conversation_id: |
| create_new_conversation() |
| |
| |
| if files: |
| for f in files: |
| if isinstance(f, str) and f not in uploaded_files: |
| uploaded_files.append(f) |
| |
| chat_history.append({ |
| "role": "user", |
| "content": message, |
| "files": uploaded_files.copy() |
| }) |
| |
| conversations[current_conversation_id]["history"] = chat_history |
| conversations[current_conversation_id]["enable_thinking"] = enable_thinking |
| |
| |
| uploaded_files = [] |
| |
| return format_chat_display(chat_history), "", None |
|
|
|
|
| def bot_response(): |
| """Generate bot response.""" |
| global chat_history |
| |
| if not chat_history or chat_history[-1]["role"] != "user": |
| return format_chat_display(chat_history) |
| |
| messages = format_messages(chat_history) |
| model = MODEL |
| |
| reasoning_text = "" |
| answer_text = "" |
| is_thinking = False |
| |
| assistant_msg = {"role": "assistant", "content": ""} |
| chat_history.append(assistant_msg) |
| |
| try: |
| for content, reasoning in chat_stream(messages, model, enable_thinking): |
| if reasoning: |
| reasoning_text += reasoning |
| is_thinking = True |
| |
| if content: |
| answer_text += content |
| |
| |
| final_response = "" |
| if is_thinking and reasoning_text.strip(): |
| if content: |
| |
| final_response += f"""<details style="margin-bottom: 12px;"> |
| <summary style="font-size: 13px; cursor: pointer;">🧠 Thought for a moment...</summary> |
| <div style="font-size: 12px; font-style: italic; padding: 8px 0; line-height: 1.6;"> |
| |
| {reasoning_text} |
| |
| </div> |
| </details> |
| |
| """ |
| else: |
| |
| final_response += f'<div style="font-size: 12px; font-style: italic; margin-bottom: 12px; line-height: 1.6;">{reasoning_text}</div>\n\n' |
| final_response += answer_text |
| |
| assistant_msg["content"] = final_response |
| yield format_chat_display(chat_history) |
| |
| conversations[current_conversation_id]["history"] = chat_history |
| except Exception as e: |
| if chat_history and chat_history[-1]["role"] == "assistant" and not chat_history[-1]["content"].strip(): |
| chat_history.pop() |
| raise e |
|
|
|
|
| |
| QUICK_EXAMPLES = [ |
| {"label": "📝 Longform Creative Writing", "prompt": "Write a 2000-word story about a cat who learns to fly. Make it whimsical, heartwarming, yet enough creative.", "file": None}, |
| {"label": "💻 Python Concept Explanation", "prompt": "Explain how Python decorators work with simple examples.", "file": None}, |
| {"label": "👀🦋 Recognize Butterfly Species", "prompt": "What species of butterfly is this? Describe its key characteristics.", "file": "butterfly.jpg"}, |
| {"label": "🧠 Brainstorm Ideas", "prompt": "Brainstorm 10 creative marketing campaign ideas for a new eco-friendly water bottle.", "file": None}, |
| {"label": "🌐 Translation and Analysis", "prompt": "Translate the following text to Chinese: 'Gatsby believed in the green light, the orgastic future that year by year recedes before us. It eluded us then, but that's no matter--tomorrow we will run faster, stretch out our arms farther.... And one fine morning-- So we beat on, boats against the current, borne back ceaselessly into the past.' and make a bilingual & deep explanation. Your explanation shall be in one English paragraph followed by one Chinese paragraph, interleaved.", "file": None}, |
| {"label": "👀📊 Analyze Benchmarks", "prompt": "List all benchmarks where K2.5 reaches SOTA and make it a new barchart; In the barchart, take Opus and Gemini as comparison; Finally write a summary on K2.5's ability.", "file": "benchmark_thinking.png"}, |
| ] |
|
|
|
|
| def load_example(prompt, file_path=None): |
| """Load an example prompt into the input.""" |
| import os |
| if file_path: |
| |
| return prompt, [os.path.abspath(file_path)] |
| return prompt, None |
|
|
|
|
| |
| custom_css = """ |
| @import url('https://fonts.googleapis.com/css2?family=Roboto:wght@200;300;400;500&display=swap'); |
| |
| .gradio-container { |
| font-family: 'Roboto', -apple-system, BlinkMacSystemFont, sans-serif !important; |
| font-weight: 200 !important; |
| } |
| |
| /* Make input and button taller */ |
| .gr-text-input textarea { |
| padding: 16px !important; |
| min-height: 56px !important; |
| } |
| |
| .gr-button { |
| padding: 12px 24px !important; |
| min-height: 56px !important; |
| } |
| |
| .header { |
| text-align: center; |
| padding: 20px; |
| margin-bottom: 10px; |
| } |
| .header h1 { |
| font-size: 32px; |
| font-weight: 500; |
| margin: 0; |
| } |
| .header .highlight { |
| background: linear-gradient(135deg, #0f0b05, #054e0b); |
| -webkit-background-clip: text; |
| -webkit-text-fill-color: transparent; |
| } |
| .sidebar-info { |
| padding: 15px; |
| background: #f9fafb; |
| border-radius: 8px; |
| font-size: 14px; |
| color: #666; |
| } |
| |
| .quick-load-title { |
| margin-top: 16px !important; |
| margin-bottom: 8px !important; |
| color: #92400e !important; |
| } |
| |
| .quick-load-title p { |
| margin: 0 !important; |
| } |
| |
| /* Quick load container styling - lower saturation background */ |
| .quick-load-container { |
| background: #f5f5f4 !important; |
| border: 1px solid #e7e5e4 !important; |
| border-radius: 8px !important; |
| padding: 12px !important; |
| } |
| |
| /* Quick load button styling */ |
| .quick-load-btn { |
| margin-bottom: 6px !important; |
| font-size: 13px !important; |
| } |
| """ |
|
|
| |
| with gr.Blocks(title="Chat with K2.5", css=custom_css, theme=gr.themes.Soft(primary_hue="blue", secondary_hue="blue", neutral_hue="blue")) as demo: |
| |
| |
| gr.HTML(""" |
| <div class="header"> |
| <h1 style="display: inline-flex; align-items: center; justify-content: center; gap: 12px; vertical-align: middle;"> |
| <svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 66 26" fill="none" style="height: 58px; width: auto; flex-shrink: 0;"> |
| <rect width="66" height="26" fill="#000" rx="13"></rect> |
| <path fill="#fff" d="M27.65 19.706c0 .163.132.294.294.294h1.888a.294.294 0 0 0 .294-.294V6.294A.294.294 0 0 0 29.832 6h-1.888a.294.294 0 0 0-.294.294zM51.525 19.706c0 .163.131.294.293.294h1.888a.294.294 0 0 0 .294-.294V6.294A.294.294 0 0 0 53.706 6h-1.888a.294.294 0 0 0-.293.294zM44.058 6a.294.294 0 0 0-.285.224l-2.79 11.419c-.047.187-.267.187-.312 0L37.88 6.223A.29.29 0 0 0 37.596 6h-4.73a.29.29 0 0 0-.291.294v13.412c0 .163.13.294.293.294h2.097a.29.29 0 0 0 .293-.29V8.43c0-.226.265-.272.319-.056l2.787 11.402c.032.131.15.224.285.224h4.355a.294.294 0 0 0 .285-.224l2.786-11.4c.053-.217.318-.17.318.055v11.275c0 .163.132.294.294.294h2.096a.294.294 0 0 0 .294-.294V6.294A.294.294 0 0 0 48.783 6zM18.697 12.491l5.692-5.995A.294.294 0 0 0 24.176 6h-2.65a.3.3 0 0 0-.208.087l-6.524 6.573c-.101.102-.251.011-.251-.153V6.294A.294.294 0 0 0 14.249 6h-1.955a.294.294 0 0 0-.294.294v13.412c0 .163.132.294.294.294h1.955a.294.294 0 0 0 .294-.294v-2.674a.22.22 0 0 1 .057-.153l2.013-2.052c.049-.05.116-.057.17-.02l5.385 4.07c.788.546 1.79.902 2.709 1.052a.282.282 0 0 0 .325-.284v-2.423a.306.306 0 0 0-.246-.296c-.532-.114-1.125-.331-1.574-.642l-4.662-3.467c-.096-.067-.109-.238-.023-.326"></path> |
| </svg> |
| <span style="font-size: 1.8em;">Chat with <span class="highlight">K2.5</span></span> |
| </h1> |
| </div> |
| """) |
| |
| |
| with gr.Row(): |
| with gr.Column(scale=4): |
| chatbot = gr.Chatbot( |
| label="Chat", |
| height=700, |
| avatar_images=("https://api.dicebear.com/7.x/initials/svg?seed=User&backgroundColor=e0f2fe&textColor=0ea5e9", "https://upload.wikimedia.org/wikipedia/en/8/87/Kimi-logo-2025.png") |
| ) |
| |
| with gr.Row(): |
| msg_input = gr.Textbox( |
| placeholder="Type your message here...", |
| show_label=False, |
| scale=4 |
| ) |
| send_btn = gr.Button("Send", variant="primary", scale=1) |
| |
| file_input = gr.File( |
| label="Upload Images/Videos (optional)", |
| file_types=["image", "video"], |
| file_count="multiple" |
| ) |
| |
| |
| with gr.Column(scale=1): |
| new_chat_btn = gr.Button("✨ New Chat", variant="primary") |
| thinking_btn = gr.Button("⚡ Thinking: ON", variant="secondary") |
| |
| gr.HTML(f""" |
| <div class="sidebar-info"> |
| Model: {MODEL}<br> |
| API: {base_url} |
| </div> |
| <div style="margin-top: 16px; padding: 12px; background: linear-gradient(135deg, #e0f2fe 0%, #dbeafe 100%); border-radius: 8px; border-left: 3px solid #3b82f6; font-size: 14px; color: #1e40af;"> |
| <strong>📖 How to Use</strong><br><br> |
| • <strong>Chat:</strong> Type your message and press Enter or click Send<br> |
| • <strong>Upload Image:</strong> Click "Upload Images/Videos" to add images; Support JPG, PNG, etc.<br> |
| • <strong>Upload Video:</strong> Support MP4, AVI, MOV, etc.<br> |
| • <strong>⚠️ Note:</strong> Video size should not exceed <strong>100MB</strong><br> |
| • <strong>Thinking Mode:</strong> Toggle "Thinking" button to enable/disable reasoning |
| </div> |
| <div style="margin-top: 16px; padding: 12px; background: linear-gradient(135deg, #e0f2fe 0%, #dbeafe 100%); border-radius: 8px; border-left: 3px solid #3b82f6; font-size: 14px; color: #1e40af;"> |
| <strong>✨ Hungry for More?</strong><br><br> |
| <strong>💬 Chat Users:</strong> Head to <a href="https://kimi.com" target="_blank" style="color: #2563eb; text-decoration: underline; font-weight: bold;">kimi.com</a> for the full experience—complete with agentic magic that'll make your workflow dance.<br><br> |
| <strong>🚀 Pro Users:</strong> For API enthusiasts, fuel your creations at <a href="https://platform.moonshot.ai" target="_blank" style="color: #2563eb; text-decoration: underline; font-weight: bold;">platform.moonshot.ai</a>. Your wallet will thank you, and your projects will too. |
| </div> |
| """) |
| |
| |
| gr.Markdown(f"""<div style="margin-top: 16px; padding: 12px; background: linear-gradient(135deg, #d1d5db 0%, #d1d5db 100%); border-radius: 8px; border-left: 3px solid #6b7280; font-size: 14px; color: #374151;"> |
| <strong>⚡ Quick Load Examples</strong> |
| </div>""") |
| with gr.Column(variant="panel"): |
| quick_load_buttons = [] |
| for example in QUICK_EXAMPLES: |
| btn = gr.Button( |
| example["label"], |
| size="sm", |
| variant="secondary", |
| elem_classes="quick-load-btn" |
| ) |
| quick_load_buttons.append((btn, example["prompt"], example.get("file"))) |
| |
| |
| new_chat_btn.click( |
| create_new_conversation, |
| outputs=[chatbot, file_input, thinking_btn] |
| ) |
| |
| thinking_btn.click( |
| toggle_thinking, |
| outputs=[thinking_btn] |
| ) |
| |
| |
| for btn, prompt, file_path in quick_load_buttons: |
| btn.click( |
| fn=lambda p=prompt, f=file_path: load_example(p, f), |
| outputs=[msg_input, file_input] |
| ) |
| |
| |
| send_btn.click( |
| user_message, |
| inputs=[msg_input, file_input], |
| outputs=[chatbot, msg_input, file_input] |
| ).then( |
| bot_response, |
| outputs=[chatbot] |
| ) |
| |
| |
| msg_input.submit( |
| user_message, |
| inputs=[msg_input, file_input], |
| outputs=[chatbot, msg_input, file_input] |
| ).then( |
| bot_response, |
| outputs=[chatbot] |
| ) |
|
|
| if __name__ == "__main__": |
| demo.queue().launch( |
| server_name="0.0.0.0", |
| server_port=7860, |
| share=False |
| ) |
|
|