Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import time | |
| import base64 | |
| from openai import OpenAI | |
| import os | |
| from io import BytesIO | |
| from PIL import Image | |
| import re | |
| # 配置 | |
| BASE_URL = "https://api.stepfun.com/v1" | |
| # 从环境变量获取API密钥 | |
| STEP_API_KEY = os.environ.get("STEP_API_KEY", "") | |
| def image_to_base64(image): | |
| """将PIL图像转换为base64字符串""" | |
| if image is None: | |
| return None | |
| if isinstance(image, Image.Image): | |
| buffered = BytesIO() | |
| image.save(buffered, format="PNG") | |
| img_str = base64.b64encode(buffered.getvalue()).decode('utf-8') | |
| return img_str | |
| return None | |
| def extract_cot_and_answer(text): | |
| """从响应中提取CoT推理过程和最终答案""" | |
| # 匹配<reasoning>标签内的内容 | |
| reasoning_pattern = re.compile(r'<reasoning>(.*?)</reasoning>', re.DOTALL) | |
| match = reasoning_pattern.search(text) | |
| if match: | |
| cot = match.group(1).strip() | |
| # 移除reasoning标签及其内容,得到最终答案 | |
| answer = reasoning_pattern.sub('', text).strip() | |
| return cot, answer | |
| else: | |
| # 如果没有reasoning标签,整个响应就是答案 | |
| return "", text | |
| def format_message_with_image(message_text, image_path=None): | |
| """格式化包含图片的消息""" | |
| if image_path: | |
| # 创建包含图片和文本的消息 | |
| return f'<img src="{image_path}" style="max-width: 200px; max-height: 200px; border-radius: 8px; margin-bottom: 10px;"><br>{message_text}' | |
| return message_text | |
| def call_step_api_stream(message, history): | |
| """调用Step API进行流式对话,支持多模态输入""" | |
| print(f"[DEBUG] Starting API call - Message type: {type(message)}") | |
| if not message: | |
| print("[DEBUG] No message provided") | |
| yield history, "", "" | |
| return | |
| if not STEP_API_KEY: | |
| print("[DEBUG] API key not configured") | |
| error_msg = "❌ API key not configured. Please add STEP_API_KEY in Settings." | |
| history.append([message if isinstance(message, str) else "Message", error_msg]) | |
| yield history, "", "" | |
| return | |
| print(f"[DEBUG] API Key exists: {bool(STEP_API_KEY)}") | |
| # 处理多模态输入 | |
| text_content = "" | |
| image_content = None | |
| display_message = "" | |
| # Gradio MultimodalTextbox 返回一个字典 | |
| if isinstance(message, dict): | |
| text_content = message.get("text", "") | |
| files = message.get("files", []) | |
| # 处理图片文件 | |
| if files and len(files) > 0: | |
| image_path = files[0] # 取第一张图片 | |
| try: | |
| img = Image.open(image_path) | |
| image_content = image_to_base64(img) | |
| # 创建显示消息,包含图片缩略图 | |
| display_message = format_message_with_image(text_content, image_path) | |
| print(f"[DEBUG] Image processed successfully") | |
| except Exception as e: | |
| print(f"[DEBUG] Failed to process image: {e}") | |
| display_message = text_content | |
| else: | |
| display_message = text_content | |
| else: | |
| # 纯文本消息 | |
| text_content = str(message) | |
| display_message = text_content | |
| # 添加用户消息到历史 | |
| history.append([display_message, ""]) | |
| yield history, "", "" | |
| # 构造API消息 | |
| messages = [] | |
| # 添加历史对话(只提取文本部分,不包含HTML) | |
| for h in history[:-1]: # 不包含当前消息 | |
| if h[0]: # 用户消息 | |
| # 从HTML中提取纯文本 | |
| user_text = re.sub(r'<[^>]+>', '', h[0]) if '<' in h[0] else h[0] | |
| messages.append({"role": "user", "content": user_text}) | |
| if h[1]: # 助手回复 | |
| messages.append({"role": "assistant", "content": h[1]}) | |
| # 构造当前消息 | |
| if image_content: | |
| # 有图片的情况 | |
| current_content = [ | |
| {"type": "image_url", "image_url": {"url": f"data:image/jpg;base64,{image_content}", "detail": "high"}} | |
| ] | |
| if text_content: | |
| current_content.append({"type": "text", "text": text_content}) | |
| messages.append({"role": "user", "content": current_content}) | |
| else: | |
| # 纯文本 | |
| messages.append({"role": "user", "content": text_content}) | |
| print(f"[DEBUG] Messages count: {len(messages)}") | |
| # 创建客户端 | |
| try: | |
| client = OpenAI(api_key=STEP_API_KEY, base_url=BASE_URL) | |
| print("[DEBUG] Client created successfully") | |
| except Exception as e: | |
| print(f"[DEBUG] Client initialization failed: {e}") | |
| history[-1][1] = f"❌ Client initialization failed: {str(e)}" | |
| yield history, "", "" | |
| return | |
| # 调用API | |
| try: | |
| print("[DEBUG] Calling API...") | |
| response = client.chat.completions.create( | |
| model="step-3", | |
| messages=messages, | |
| temperature=0.7, | |
| max_tokens=2000, | |
| stream=True | |
| ) | |
| print("[DEBUG] API call successful, processing stream...") | |
| # 处理流式响应 | |
| full_response = "" | |
| current_cot = "" | |
| current_answer = "" | |
| chunk_count = 0 | |
| for chunk in response: | |
| chunk_count += 1 | |
| if chunk.choices and len(chunk.choices) > 0: | |
| delta = chunk.choices[0].delta | |
| if hasattr(delta, 'content') and delta.content: | |
| full_response += delta.content | |
| # 实时提取CoT和答案 | |
| current_cot, current_answer = extract_cot_and_answer(full_response) | |
| # 更新历史中的回复 | |
| if current_cot and current_answer: | |
| # 如果有CoT,显示完整格式 | |
| history[-1][1] = f"💭 **Reasoning Process:**\n\n{current_cot}\n\n---\n\n📝 **Answer:**\n\n{current_answer}" | |
| elif current_cot: | |
| # 只有CoT,还没有答案 | |
| history[-1][1] = f"💭 **Reasoning Process:**\n\n{current_cot}\n\n---\n\n📝 **Answer:**\n\n*Generating...*" | |
| else: | |
| # 没有CoT,直接显示答案 | |
| history[-1][1] = current_answer | |
| print(f"[DEBUG] Chunk {chunk_count}: processed") | |
| yield history, current_cot, current_answer | |
| if not full_response: | |
| print("[DEBUG] No response content received") | |
| history[-1][1] = "⚠️ No response received from API" | |
| yield history, "", "" | |
| else: | |
| print(f"[DEBUG] Final response length: {len(full_response)} chars") | |
| except Exception as e: | |
| print(f"[DEBUG] API request failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| history[-1][1] = f"❌ API request failed: {str(e)}" | |
| yield history, "", "" | |
| def clear_history(): | |
| """Clear conversation history""" | |
| return [], None | |
| # 创建Gradio界面 | |
| with gr.Blocks(title="Step-3", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(""" | |
| # 🤖 Step-3 | |
| Hello, I am Step-3! | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| # 对话界面 | |
| chatbot = gr.Chatbot( | |
| height=600, | |
| show_label=False, | |
| elem_id="chatbot", | |
| bubble_full_width=False, | |
| render_markdown=True | |
| ) | |
| with gr.Row(): | |
| # 多模态输入框 - 支持文本和图片 | |
| msg = gr.MultimodalTextbox( | |
| placeholder="Type your message here... (You can paste images directly)", | |
| show_label=False, | |
| file_types=["image"], | |
| container=False, | |
| submit_btn="Send" | |
| ) | |
| clear_btn = gr.Button("Clear", scale=0) | |
| with gr.Column(scale=1): | |
| # CoT推理过程展示 | |
| gr.Markdown("### 💭 Chain of Thought") | |
| cot_display = gr.Textbox( | |
| label="Reasoning Process", | |
| lines=10, | |
| max_lines=15, | |
| show_label=False, | |
| interactive=False, | |
| show_copy_button=True | |
| ) | |
| gr.Markdown("### 📝 Final Answer") | |
| answer_display = gr.Textbox( | |
| label="Answer", | |
| lines=10, | |
| max_lines=15, | |
| show_label=False, | |
| interactive=False, | |
| show_copy_button=True | |
| ) | |
| # 事件处理 | |
| msg.submit( | |
| call_step_api_stream, | |
| [msg, chatbot], | |
| [chatbot, cot_display, answer_display] | |
| ) | |
| clear_btn.click( | |
| clear_history, | |
| None, | |
| [chatbot, msg] | |
| ) | |
| # 页脚 | |
| gr.Markdown(""" | |
| --- | |
| <div style="text-align: center;"> | |
| <img src="https://huggingface.co/stepfun-ai/step3/resolve/main/figures/stepfun-logo.png" alt="StepFun Logo" style="height: 40px; margin: 10px;"> | |
| <br> | |
| Powered by <a href="https://www.stepfun.com/" target="_blank">StepFun</a> | |
| </div> | |
| """) | |
| # 启动应用 | |
| if __name__ == "__main__": | |
| print(f"[DEBUG] Starting app with API key: {'Set' if STEP_API_KEY else 'Not set'}") | |
| print(f"[DEBUG] Base URL: {BASE_URL}") | |
| demo.queue(max_size=10) | |
| demo.launch( | |
| share=False, | |
| debug=True | |
| ) |