Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import time | |
| import base64 | |
| from openai import OpenAI | |
| import os | |
| from io import BytesIO | |
| from PIL import Image | |
| # 配置 | |
| BASE_URL = "https://api.stepfun.com/v1" | |
| # 从环境变量获取API密钥 | |
| STEP_API_KEY = os.environ.get("STEP_API_KEY", "") | |
| # 可选模型 | |
| MODELS = ["step-3", "step-r1-v-mini"] | |
| def image_to_base64(image): | |
| """将PIL图像转换为base64字符串""" | |
| if image is None: | |
| return None | |
| if isinstance(image, Image.Image): | |
| buffered = BytesIO() | |
| image.save(buffered, format="PNG") | |
| img_str = base64.b64encode(buffered.getvalue()).decode('utf-8') | |
| return img_str | |
| return None | |
| def call_step_api(image, prompt, model, temperature=0.7, max_tokens=2000): | |
| """调用Step API进行分析,支持纯文本和图像+文本""" | |
| if not prompt: | |
| yield "", "❌ 请输入提示词" | |
| return | |
| if not STEP_API_KEY: | |
| yield "", "❌ API密钥未配置。请在 Hugging Face Space 的 Settings 中添加 STEP_API_KEY 环境变量。" | |
| return | |
| # 构造消息内容 - 参考官方示例 | |
| if image is not None: | |
| # 有图片的情况 | |
| try: | |
| base64_image = image_to_base64(image) | |
| if base64_image is None: | |
| yield "", "❌ 图片处理失败" | |
| return | |
| # 按照官方示例的格式构造消息 | |
| messages = [ | |
| {"role": "user", "content": [ | |
| {"type": "image_url", "image_url": {"url": f"data:image/jpg;base64,{base64_image}", "detail": "high"}}, | |
| {"type": "text", "text": prompt} | |
| ]} | |
| ] | |
| except Exception as e: | |
| yield "", f"❌ 图片处理错误: {str(e)}" | |
| return | |
| else: | |
| # 纯文本的情况 | |
| messages = [ | |
| {"role": "user", "content": prompt} | |
| ] | |
| # 创建OpenAI客户端 - 完全按照官方示例 | |
| try: | |
| client = OpenAI(api_key=STEP_API_KEY, base_url=BASE_URL) | |
| except Exception as e: | |
| yield "", f"❌ 客户端初始化失败: {str(e)}" | |
| return | |
| # 记录开始时间 | |
| start_time = time.time() | |
| try: | |
| # 调用API - 按照官方示例 | |
| response = client.chat.completions.create( | |
| model=model, | |
| messages=messages, | |
| stream=True | |
| ) | |
| except Exception as e: | |
| yield "", f"❌ API请求失败: {str(e)}" | |
| return | |
| # 处理流式响应 | |
| full_response = "" | |
| reasoning_content = "" | |
| final_answer = "" | |
| is_reasoning = False | |
| reasoning_started = False | |
| try: | |
| for chunk in response: | |
| # 按照官方示例处理chunk | |
| if chunk.choices and len(chunk.choices) > 0: | |
| delta = chunk.choices[0].delta | |
| if hasattr(delta, 'content') and delta.content: | |
| content = delta.content | |
| full_response += content | |
| # 检测reasoning标记 | |
| if "<reasoning>" in content: | |
| is_reasoning = True | |
| reasoning_started = True | |
| # 处理标记前后的内容 | |
| parts = content.split("<reasoning>") | |
| if parts[0]: | |
| final_answer += parts[0] | |
| if len(parts) > 1: | |
| reasoning_content += parts[1] | |
| elif "</reasoning>" in content: | |
| # 处理结束标记 | |
| parts = content.split("</reasoning>") | |
| if parts[0]: | |
| reasoning_content += parts[0] | |
| is_reasoning = False | |
| if len(parts) > 1: | |
| final_answer += parts[1] | |
| elif is_reasoning: | |
| reasoning_content += content | |
| else: | |
| final_answer += content | |
| # 实时输出 | |
| yield reasoning_content, final_answer | |
| except Exception as e: | |
| yield reasoning_content, final_answer + f"\n\n❌ 流处理错误: {str(e)}" | |
| return | |
| # 添加生成时间 | |
| elapsed_time = time.time() - start_time | |
| time_info = f"\n\n⏱️ 生成用时: {elapsed_time:.2f}秒" | |
| final_answer += time_info | |
| yield reasoning_content, final_answer | |
| # 创建Gradio界面 | |
| with gr.Blocks(title="Step-3", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(""" | |
| # 🤖 Step-3 | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| # 输入区域 | |
| image_input = gr.Image( | |
| label="上传图片(可选)", | |
| type="pil", | |
| height=300 | |
| ) | |
| prompt_input = gr.Textbox( | |
| label="提示词", | |
| placeholder="输入你的问题或描述...", | |
| lines=3, | |
| value="" | |
| ) | |
| with gr.Accordion("高级设置", open=False): | |
| model_select = gr.Dropdown( | |
| choices=MODELS, | |
| value=MODELS[0], | |
| label="选择模型" | |
| ) | |
| temperature_slider = gr.Slider( | |
| minimum=0, | |
| maximum=1, | |
| value=0.7, | |
| step=0.1, | |
| label="Temperature" | |
| ) | |
| max_tokens_slider = gr.Slider( | |
| minimum=100, | |
| maximum=4000, | |
| value=2000, | |
| step=100, | |
| label="最大输出长度" | |
| ) | |
| submit_btn = gr.Button("🚀 开始分析", variant="primary") | |
| clear_btn = gr.Button("🗑️ 清空", variant="secondary") | |
| with gr.Column(scale=1): | |
| # 推理过程展示 | |
| with gr.Accordion("💭 推理过程 (CoT)", open=True): | |
| reasoning_output = gr.Textbox( | |
| label="思考过程", | |
| lines=10, | |
| max_lines=15, | |
| show_copy_button=True, | |
| interactive=False | |
| ) | |
| # 最终答案展示 | |
| answer_output = gr.Textbox( | |
| label="📝 分析结果", | |
| lines=15, | |
| max_lines=25, | |
| show_copy_button=True, | |
| interactive=False | |
| ) | |
| # 事件处理 - 流式输出到两个文本框 | |
| submit_btn.click( | |
| fn=call_step_api, | |
| inputs=[ | |
| image_input, | |
| prompt_input, | |
| model_select, | |
| temperature_slider, | |
| max_tokens_slider | |
| ], | |
| outputs=[reasoning_output, answer_output], | |
| show_progress=True | |
| ) | |
| clear_btn.click( | |
| fn=lambda: (None, "", "", ""), | |
| inputs=[], | |
| outputs=[image_input, prompt_input, reasoning_output, answer_output] | |
| ) | |
| # 页脚 | |
| gr.Markdown(""" | |
| --- | |
| Powered by [Step-3](https://www.stepfun.com/) | |
| """) | |
| # 启动应用 | |
| if __name__ == "__main__": | |
| demo.launch() |