Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import asyncio | |
| import json | |
| import subprocess | |
| import sys | |
| from typing import AsyncGenerator | |
| # 预定义的200个常用英文单词 | |
| ENGLISH_WORDS = [ | |
| "the", "be", "to", "of", "and", "a", "in", "that", "have", "I", | |
| "it", "for", "not", "on", "with", "he", "as", "you", "do", "at", | |
| "this", "but", "his", "by", "from", "they", "we", "say", "her", "she", | |
| "or", "an", "will", "my", "one", "all", "would", "there", "their", "what", | |
| "so", "up", "out", "if", "about", "who", "get", "which", "go", "me", | |
| "when", "make", "can", "like", "time", "no", "just", "him", "know", "take", | |
| "people", "into", "year", "your", "good", "some", "could", "them", "see", "other", | |
| "than", "then", "now", "look", "only", "come", "its", "over", "think", "also", | |
| "back", "after", "use", "two", "how", "our", "work", "first", "well", "way", | |
| "even", "new", "want", "because", "any", "these", "give", "day", "most", "us", | |
| "very", "life", "after", "call", "world", "over", "still", "take", "every", "through", | |
| "before", "long", "where", "much", "should", "well", "people", "down", "own", "work", | |
| "first", "good", "new", "write", "our", "used", "me", "man", "too", "any", | |
| "day", "same", "right", "look", "think", "also", "around", "another", "came" | |
| ] | |
| async def generate_words_stream(query: str) -> AsyncGenerator[str, None]: | |
| """生成200个英文单词的流式输出(Gradio版本)""" | |
| # 发送开始标记 | |
| yield f"🚀 开始生成200个英文单词...\n" | |
| # 流式输出200个单词 | |
| for i, word in enumerate(ENGLISH_WORDS, 1): | |
| # 模拟一些处理时间,让流式效果更明显 | |
| await asyncio.sleep(0.05) | |
| # 发送单词 | |
| yield word | |
| # 发送完成标记 | |
| yield f"\n✅ 200个英文单词生成完成!" | |
| def generate_words_mcp(query: str = "give me some english words") -> str: | |
| """ | |
| Generate 200 English words based on a query. | |
| Args: | |
| query (str): The input query for generating words. Default is "give me some english words". | |
| Returns: | |
| str: A message indicating the number of words generated and the complete list of words. | |
| """ | |
| try: | |
| # 直接返回200个英文单词,简化MCP函数 | |
| words_text = " ".join(ENGLISH_WORDS) | |
| return f"成功生成 {len(ENGLISH_WORDS)} 个英文单词: {words_text}" | |
| except Exception as e: | |
| return f"生成单词时出错: {str(e)}" | |
| def get_word_count(query: str = "count words") -> str: | |
| """ | |
| Get the count of English words available. | |
| Args: | |
| query (str): The input query. Default is "count words". | |
| Returns: | |
| str: The total number of English words available. | |
| """ | |
| return f"总共有 {len(ENGLISH_WORDS)} 个英文单词可用" | |
| # 创建对话界面 | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# 🧠 English Words Generator") | |
| gr.Markdown("输入你的查询,AI 将为你生成 200 个英文单词(流式输出)") | |
| chatbot = gr.Chatbot( | |
| height=500, | |
| show_label=False, | |
| container=True, | |
| bubble_full_width=False | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=8): | |
| query_input = gr.Textbox( | |
| placeholder="输入你的查询,比如:'给我一些常用英文单词'", | |
| label="", | |
| show_label=False, | |
| lines=2 | |
| ) | |
| with gr.Column(scale=1): | |
| send_btn = gr.Button("发送", variant="primary", size="lg") | |
| with gr.Column(scale=1): | |
| clear_btn = gr.Button("清空", variant="secondary") | |
| with gr.Column(scale=1): | |
| download_btn = gr.Button("下载", variant="secondary") | |
| def handle_user_input(query, messages): | |
| """添加用户消息""" | |
| if query.strip(): | |
| # 使用正确的格式:[user_message, bot_message] | |
| new_messages = messages + [[query, None]] | |
| return "", new_messages | |
| return query, messages | |
| def clear_chat(): | |
| """清空对话""" | |
| return [] | |
| def download_chat(messages): | |
| """下载对话记录为文本文件""" | |
| if not messages: | |
| return None | |
| # 生成对话内容 | |
| from datetime import datetime | |
| current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| chat_content = "# 英文单词生成对话记录\n\n" | |
| chat_content += f"导出时间: {current_time}\n" | |
| chat_content += f"对话条数: {len(messages)}\n\n" | |
| chat_content += "=" * 50 + "\n\n" | |
| for i, message in enumerate(messages, 1): | |
| if len(message) == 2: | |
| user_msg, bot_msg = message | |
| chat_content += f"## 对话 {i}\n\n" | |
| chat_content += f"**用户**: {user_msg}\n\n" | |
| if bot_msg: | |
| chat_content += f"**AI**: {bot_msg}\n\n" | |
| else: | |
| chat_content += f"**AI**: 正在生成中...\n\n" | |
| chat_content += "-" * 30 + "\n\n" | |
| # 创建临时文件 | |
| import tempfile | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| # 创建临时文件 | |
| temp_file = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt', prefix='chat_') | |
| temp_file.write(chat_content) | |
| temp_file.close() | |
| return temp_file.name | |
| async def generate_response(messages): | |
| """根据用户最后一条消息流式生成 AI 回复""" | |
| if not messages or not messages[-1] or len(messages[-1]) != 2: | |
| yield messages | |
| return | |
| last_user_msg = messages[-1][0] # 获取用户消息 | |
| # 流式拼接回复 | |
| full_response = "" | |
| async for token in generate_words_stream(last_user_msg): | |
| if token.strip(): | |
| full_response += " " + token | |
| # 更新最后一条消息的bot回复部分 | |
| messages[-1][1] = full_response.strip() | |
| yield messages | |
| # 用户提交时,先加消息 → 再生成回复 | |
| query_input.submit(handle_user_input, inputs=[query_input, chatbot], outputs=[query_input, chatbot])\ | |
| .then(generate_response, inputs=chatbot, outputs=chatbot) | |
| send_btn.click(handle_user_input, inputs=[query_input, chatbot], outputs=[query_input, chatbot])\ | |
| .then(generate_response, inputs=chatbot, outputs=chatbot) | |
| clear_btn.click(clear_chat, outputs=chatbot) | |
| # 下载按钮事件 | |
| download_btn.click(download_chat, inputs=chatbot, outputs=gr.File(label="下载对话记录")) | |
| # 启动应用 | |
| if __name__ == "__main__": | |
| demo.launch(mcp_server=True) | |