import gradio as gr import asyncio import json import subprocess import sys from typing import AsyncGenerator # 预定义的200个常用英文单词 ENGLISH_WORDS = [ "the", "be", "to", "of", "and", "a", "in", "that", "have", "I", "it", "for", "not", "on", "with", "he", "as", "you", "do", "at", "this", "but", "his", "by", "from", "they", "we", "say", "her", "she", "or", "an", "will", "my", "one", "all", "would", "there", "their", "what", "so", "up", "out", "if", "about", "who", "get", "which", "go", "me", "when", "make", "can", "like", "time", "no", "just", "him", "know", "take", "people", "into", "year", "your", "good", "some", "could", "them", "see", "other", "than", "then", "now", "look", "only", "come", "its", "over", "think", "also", "back", "after", "use", "two", "how", "our", "work", "first", "well", "way", "even", "new", "want", "because", "any", "these", "give", "day", "most", "us", "very", "life", "after", "call", "world", "over", "still", "take", "every", "through", "before", "long", "where", "much", "should", "well", "people", "down", "own", "work", "first", "good", "new", "write", "our", "used", "me", "man", "too", "any", "day", "same", "right", "look", "think", "also", "around", "another", "came" ] async def generate_words_stream(query: str) -> AsyncGenerator[str, None]: """生成200个英文单词的流式输出(Gradio版本)""" # 发送开始标记 yield f"🚀 开始生成200个英文单词...\n" # 流式输出200个单词 for i, word in enumerate(ENGLISH_WORDS, 1): # 模拟一些处理时间,让流式效果更明显 await asyncio.sleep(0.05) # 发送单词 yield word # 发送完成标记 yield f"\n✅ 200个英文单词生成完成!" def generate_words_mcp(query: str = "give me some english words") -> str: """ Generate 200 English words based on a query. Args: query (str): The input query for generating words. Default is "give me some english words". Returns: str: A message indicating the number of words generated and the complete list of words. """ try: # 直接返回200个英文单词,简化MCP函数 words_text = " ".join(ENGLISH_WORDS) return f"成功生成 {len(ENGLISH_WORDS)} 个英文单词: {words_text}" except Exception as e: return f"生成单词时出错: {str(e)}" def get_word_count(query: str = "count words") -> str: """ Get the count of English words available. Args: query (str): The input query. Default is "count words". Returns: str: The total number of English words available. """ return f"总共有 {len(ENGLISH_WORDS)} 个英文单词可用" # 创建对话界面 with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# 🧠 English Words Generator") gr.Markdown("输入你的查询,AI 将为你生成 200 个英文单词(流式输出)") chatbot = gr.Chatbot( height=500, show_label=False, container=True, bubble_full_width=False ) with gr.Row(): with gr.Column(scale=8): query_input = gr.Textbox( placeholder="输入你的查询,比如:'给我一些常用英文单词'", label="", show_label=False, lines=2 ) with gr.Column(scale=1): send_btn = gr.Button("发送", variant="primary", size="lg") with gr.Column(scale=1): clear_btn = gr.Button("清空", variant="secondary") with gr.Column(scale=1): download_btn = gr.Button("下载", variant="secondary") def handle_user_input(query, messages): """添加用户消息""" if query.strip(): # 使用正确的格式:[user_message, bot_message] new_messages = messages + [[query, None]] return "", new_messages return query, messages def clear_chat(): """清空对话""" return [] def download_chat(messages): """下载对话记录为文本文件""" if not messages: return None # 生成对话内容 from datetime import datetime current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") chat_content = "# 英文单词生成对话记录\n\n" chat_content += f"导出时间: {current_time}\n" chat_content += f"对话条数: {len(messages)}\n\n" chat_content += "=" * 50 + "\n\n" for i, message in enumerate(messages, 1): if len(message) == 2: user_msg, bot_msg = message chat_content += f"## 对话 {i}\n\n" chat_content += f"**用户**: {user_msg}\n\n" if bot_msg: chat_content += f"**AI**: {bot_msg}\n\n" else: chat_content += f"**AI**: 正在生成中...\n\n" chat_content += "-" * 30 + "\n\n" # 创建临时文件 import tempfile timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # 创建临时文件 temp_file = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt', prefix='chat_') temp_file.write(chat_content) temp_file.close() return temp_file.name async def generate_response(messages): """根据用户最后一条消息流式生成 AI 回复""" if not messages or not messages[-1] or len(messages[-1]) != 2: yield messages return last_user_msg = messages[-1][0] # 获取用户消息 # 流式拼接回复 full_response = "" async for token in generate_words_stream(last_user_msg): if token.strip(): full_response += " " + token # 更新最后一条消息的bot回复部分 messages[-1][1] = full_response.strip() yield messages # 用户提交时,先加消息 → 再生成回复 query_input.submit(handle_user_input, inputs=[query_input, chatbot], outputs=[query_input, chatbot])\ .then(generate_response, inputs=chatbot, outputs=chatbot) send_btn.click(handle_user_input, inputs=[query_input, chatbot], outputs=[query_input, chatbot])\ .then(generate_response, inputs=chatbot, outputs=chatbot) clear_btn.click(clear_chat, outputs=chatbot) # 下载按钮事件 download_btn.click(download_chat, inputs=chatbot, outputs=gr.File(label="下载对话记录")) # 启动应用 if __name__ == "__main__": demo.launch(mcp_server=True)