Bio-QA-Agent / app.py
jackkuo's picture
update
f7d9ae0
import gradio as gr
import asyncio
import json
import subprocess
import sys
from typing import AsyncGenerator
# 预定义的200个常用英文单词
ENGLISH_WORDS = [
"the", "be", "to", "of", "and", "a", "in", "that", "have", "I",
"it", "for", "not", "on", "with", "he", "as", "you", "do", "at",
"this", "but", "his", "by", "from", "they", "we", "say", "her", "she",
"or", "an", "will", "my", "one", "all", "would", "there", "their", "what",
"so", "up", "out", "if", "about", "who", "get", "which", "go", "me",
"when", "make", "can", "like", "time", "no", "just", "him", "know", "take",
"people", "into", "year", "your", "good", "some", "could", "them", "see", "other",
"than", "then", "now", "look", "only", "come", "its", "over", "think", "also",
"back", "after", "use", "two", "how", "our", "work", "first", "well", "way",
"even", "new", "want", "because", "any", "these", "give", "day", "most", "us",
"very", "life", "after", "call", "world", "over", "still", "take", "every", "through",
"before", "long", "where", "much", "should", "well", "people", "down", "own", "work",
"first", "good", "new", "write", "our", "used", "me", "man", "too", "any",
"day", "same", "right", "look", "think", "also", "around", "another", "came"
]
async def generate_words_stream(query: str) -> AsyncGenerator[str, None]:
"""生成200个英文单词的流式输出(Gradio版本)"""
# 发送开始标记
yield f"🚀 开始生成200个英文单词...\n"
# 流式输出200个单词
for i, word in enumerate(ENGLISH_WORDS, 1):
# 模拟一些处理时间,让流式效果更明显
await asyncio.sleep(0.05)
# 发送单词
yield word
# 发送完成标记
yield f"\n✅ 200个英文单词生成完成!"
def generate_words_mcp(query: str = "give me some english words") -> str:
"""
Generate 200 English words based on a query.
Args:
query (str): The input query for generating words. Default is "give me some english words".
Returns:
str: A message indicating the number of words generated and the complete list of words.
"""
try:
# 直接返回200个英文单词,简化MCP函数
words_text = " ".join(ENGLISH_WORDS)
return f"成功生成 {len(ENGLISH_WORDS)} 个英文单词: {words_text}"
except Exception as e:
return f"生成单词时出错: {str(e)}"
def get_word_count(query: str = "count words") -> str:
"""
Get the count of English words available.
Args:
query (str): The input query. Default is "count words".
Returns:
str: The total number of English words available.
"""
return f"总共有 {len(ENGLISH_WORDS)} 个英文单词可用"
# 创建对话界面
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# 🧠 English Words Generator")
gr.Markdown("输入你的查询,AI 将为你生成 200 个英文单词(流式输出)")
chatbot = gr.Chatbot(
height=500,
show_label=False,
container=True,
bubble_full_width=False
)
with gr.Row():
with gr.Column(scale=8):
query_input = gr.Textbox(
placeholder="输入你的查询,比如:'给我一些常用英文单词'",
label="",
show_label=False,
lines=2
)
with gr.Column(scale=1):
send_btn = gr.Button("发送", variant="primary", size="lg")
with gr.Column(scale=1):
clear_btn = gr.Button("清空", variant="secondary")
with gr.Column(scale=1):
download_btn = gr.Button("下载", variant="secondary")
def handle_user_input(query, messages):
"""添加用户消息"""
if query.strip():
# 使用正确的格式:[user_message, bot_message]
new_messages = messages + [[query, None]]
return "", new_messages
return query, messages
def clear_chat():
"""清空对话"""
return []
def download_chat(messages):
"""下载对话记录为文本文件"""
if not messages:
return None
# 生成对话内容
from datetime import datetime
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
chat_content = "# 英文单词生成对话记录\n\n"
chat_content += f"导出时间: {current_time}\n"
chat_content += f"对话条数: {len(messages)}\n\n"
chat_content += "=" * 50 + "\n\n"
for i, message in enumerate(messages, 1):
if len(message) == 2:
user_msg, bot_msg = message
chat_content += f"## 对话 {i}\n\n"
chat_content += f"**用户**: {user_msg}\n\n"
if bot_msg:
chat_content += f"**AI**: {bot_msg}\n\n"
else:
chat_content += f"**AI**: 正在生成中...\n\n"
chat_content += "-" * 30 + "\n\n"
# 创建临时文件
import tempfile
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# 创建临时文件
temp_file = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt', prefix='chat_')
temp_file.write(chat_content)
temp_file.close()
return temp_file.name
async def generate_response(messages):
"""根据用户最后一条消息流式生成 AI 回复"""
if not messages or not messages[-1] or len(messages[-1]) != 2:
yield messages
return
last_user_msg = messages[-1][0] # 获取用户消息
# 流式拼接回复
full_response = ""
async for token in generate_words_stream(last_user_msg):
if token.strip():
full_response += " " + token
# 更新最后一条消息的bot回复部分
messages[-1][1] = full_response.strip()
yield messages
# 用户提交时,先加消息 → 再生成回复
query_input.submit(handle_user_input, inputs=[query_input, chatbot], outputs=[query_input, chatbot])\
.then(generate_response, inputs=chatbot, outputs=chatbot)
send_btn.click(handle_user_input, inputs=[query_input, chatbot], outputs=[query_input, chatbot])\
.then(generate_response, inputs=chatbot, outputs=chatbot)
clear_btn.click(clear_chat, outputs=chatbot)
# 下载按钮事件
download_btn.click(download_chat, inputs=chatbot, outputs=gr.File(label="下载对话记录"))
# 启动应用
if __name__ == "__main__":
demo.launch(mcp_server=True)