Spaces:
Sleeping
Sleeping
| import logging | |
| import os | |
| import re | |
| import time | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import gradio as gr | |
| import modelscope_studio.components.antd as antd | |
| import modelscope_studio.components.antdx as antdx | |
| import modelscope_studio.components.base as ms | |
| import modelscope_studio.components.pro as pro | |
| from mem0 import Memory | |
| from modelscope_studio.components.pro.chatbot import (ChatbotBotConfig, | |
| ChatbotPromptsConfig, | |
| ChatbotUserConfig, | |
| ChatbotWelcomeConfig) | |
| from openai import OpenAI | |
| # 配置日志 | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler('xinyuan_chat.log'), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class AppConfig: | |
| """应用配置类""" | |
| users_file: str = "users.txt" | |
| memory_path: str = "./faiss_memories" | |
| model_name: str = "xinyuan-32b-v0609" | |
| max_tokens: int = 32768 | |
| temperature: float = 0.6 | |
| top_p: float = 0.95 | |
| chatbot_height: int = 1000 | |
| min_username_length: int = 3 | |
| max_memory_results: int = 5 | |
| class MemoryManager: | |
| """记忆管理器""" | |
| def __init__(self, config_path: str): | |
| self.config = { | |
| "vector_store": { | |
| "provider": "faiss", | |
| "config": { | |
| "collection_name": "xinyuan_memories", | |
| "path": config_path, | |
| "distance_strategy": "euclidean" | |
| } | |
| } | |
| } | |
| try: | |
| self.memory = Memory.from_config(self.config) | |
| logger.info(f"Memory manager initialized with path: {config_path}") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize memory manager: {e}") | |
| raise | |
| def search_memories(self, query: str, user_id: str, limit: int = 5) -> List[Dict[str, Any]]: | |
| """搜索相关记忆""" | |
| try: | |
| if not query or not user_id: | |
| return [] | |
| results = self.memory.search(query=query, user_id=user_id, limit=limit) | |
| if results and 'results' in results: | |
| return sorted(results['results'], key=lambda x: x.get('score', 0), reverse=True) | |
| return [] | |
| except Exception as e: | |
| logger.error(f"Error searching memories for user {user_id}: {e}") | |
| return [] | |
| def add_memory(self, messages: List[Dict[str, str]], user_id: str) -> bool: | |
| """添加记忆""" | |
| try: | |
| if not messages or not user_id: | |
| return False | |
| self.memory.add(messages, user_id=user_id) | |
| logger.info(f"Memory added for user {user_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error adding memory for user {user_id}: {e}") | |
| return False | |
| class UserManager: | |
| """用户管理器""" | |
| def __init__(self, users_file: str, min_username_length: int = 3): | |
| self.users_file = Path(users_file) | |
| self.min_username_length = min_username_length | |
| self._ensure_users_file_exists() | |
| def _ensure_users_file_exists(self): | |
| """确保用户文件存在""" | |
| if not self.users_file.exists(): | |
| self.users_file.touch() | |
| logger.info(f"Created users file: {self.users_file}") | |
| def load_users(self) -> set: | |
| """加载已注册用户列表""" | |
| try: | |
| with open(self.users_file, 'r', encoding='utf-8') as f: | |
| users = {line.strip() for line in f if line.strip()} | |
| logger.debug(f"Loaded {len(users)} users") | |
| return users | |
| except Exception as e: | |
| logger.error(f"Error loading users: {e}") | |
| return set() | |
| def save_user(self, username: str) -> bool: | |
| """保存新用户到文件""" | |
| try: | |
| with open(self.users_file, 'a', encoding='utf-8') as f: | |
| f.write(f"{username}\n") | |
| logger.info(f"User {username} saved to file") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error saving user {username}: {e}") | |
| return False | |
| def is_valid_username(self, username: str) -> bool: | |
| """验证用户名是否有效""" | |
| if not username or not isinstance(username, str): | |
| return False | |
| # 检查长度 | |
| if len(username) < self.min_username_length: | |
| return False | |
| # 检查格式:以字母开头,只包含字母、数字和下划线 | |
| return bool(re.match(r'^[a-zA-Z][a-zA-Z0-9_]*$', username)) | |
| def login_user(self, username: str) -> Tuple[bool, str]: | |
| """用户登录验证""" | |
| if not self.is_valid_username(username): | |
| return False, "用户名无效!用户名必须以英文字母开头,只能包含英文字母、数字和下划线,且长度至少3位。" | |
| users = self.load_users() | |
| if username in users: | |
| logger.info(f"User {username} logged in successfully") | |
| return True, f"欢迎回来,{username}!" | |
| else: | |
| logger.warning(f"Login attempt for unregistered user: {username}") | |
| return False, f"用户 {username} 未注册,请先注册。" | |
| def register_user(self, username: str) -> Tuple[bool, str]: | |
| """用户注册""" | |
| if not self.is_valid_username(username): | |
| return False, "用户名无效!用户名必须以英文字母开头,只能包含英文字母、数字和下划线,且长度至少3位。" | |
| users = self.load_users() | |
| if username in users: | |
| logger.warning(f"Registration attempt for existing user: {username}") | |
| return False, f"用户名 {username} 已存在,请直接登录。" | |
| if self.save_user(username): | |
| logger.info(f"User {username} registered successfully") | |
| return True, f"注册成功!欢迎,{username}!" | |
| else: | |
| return False, "注册失败,请稍后重试。" | |
| class ChatManager: | |
| """聊天管理器""" | |
| def __init__(self, config: AppConfig, memory_manager: MemoryManager): | |
| self.config = config | |
| self.memory_manager = memory_manager | |
| self.client = self._initialize_openai_client() | |
| def _initialize_openai_client(self) -> OpenAI: | |
| """初始化OpenAI客户端""" | |
| try: | |
| # 可以根据需要配置API密钥和基础URL | |
| gw_api_key = os.getenv("GW_API_KEY") | |
| client = OpenAI( | |
| base_url='https://api.geniuworks.com/v2', | |
| api_key=gw_api_key, | |
| ) | |
| logger.info("OpenAI client initialized successfully") | |
| return client | |
| except Exception as e: | |
| logger.error(f"Failed to initialize OpenAI client: {e}") | |
| raise | |
| def format_history(self, sender_value: str, history: List[Dict], username: Optional[str] = None) -> List[Dict[str, str]]: | |
| """格式化聊天历史""" | |
| messages = [] | |
| # 添加系统提示 | |
| if username: | |
| system_prompt = f"""You are Xinyuan, a large language model trained by Cylingo Group. You are a helpful assistant. 目前和你聊天的用户是{username}.""" | |
| # 搜索相关记忆 | |
| if sender_value: | |
| related_memories = self.memory_manager.search_memories( | |
| query=sender_value, | |
| user_id=username, | |
| limit=self.config.max_memory_results | |
| ) | |
| if related_memories: | |
| memory_content = "\n相关记忆:\n" | |
| for idx, memory in enumerate(related_memories): | |
| memory_content += f"记忆{idx + 1}:{memory.get('memory', '')} (相关度: {memory.get('score', 0):.3f})\n" | |
| system_prompt += memory_content | |
| messages.append({"role": "system", "content": system_prompt}) | |
| # 添加历史对话 | |
| for item in history: | |
| if item.get("role") == "user": | |
| messages.append({"role": "user", "content": item.get("content", "")}) | |
| elif item.get("role") == "assistant" and item.get("content"): | |
| # 提取助手回复的文本内容 | |
| content_list = item.get("content", []) | |
| if content_list and len(content_list) > 1: | |
| assistant_content = content_list[-1].get("content", "") | |
| if assistant_content: | |
| messages.append({"role": "assistant", "content": assistant_content}) | |
| return messages | |
| def create_chat_completion(self, messages: List[Dict[str, str]]) -> Any: | |
| """创建聊天完成请求""" | |
| try: | |
| return self.client.chat.completions.create( | |
| model=self.config.model_name, | |
| messages=messages, | |
| stream=True, | |
| max_tokens=self.config.max_tokens, | |
| temperature=self.config.temperature, | |
| top_p=self.config.top_p, | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error creating chat completion: {e}") | |
| raise | |
| # 全局配置和管理器实例 | |
| config = AppConfig() | |
| memory_manager = MemoryManager(config.memory_path) | |
| user_manager = UserManager(config.users_file, config.min_username_length) | |
| chat_manager = ChatManager(config, memory_manager) | |
| # Gradio界面函数 | |
| def handle_auth(username: str, is_register: bool) -> Tuple: | |
| """处理认证逻辑""" | |
| try: | |
| if is_register: | |
| success, message = user_manager.register_user(username) | |
| else: | |
| success, message = user_manager.login_user(username) | |
| if success: | |
| return ( | |
| gr.update(visible=False), # 隐藏登录界面 | |
| gr.update(visible=True), # 显示聊天界面 | |
| gr.update(message=message, type="success", visible=True), | |
| username | |
| ) | |
| else: | |
| return ( | |
| gr.update(visible=True), # 保持登录界面可见 | |
| gr.update(visible=False), # 隐藏聊天界面 | |
| gr.update(message=message, type="error", visible=True), | |
| "" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error in handle_auth: {e}") | |
| return ( | |
| gr.update(visible=True), | |
| gr.update(visible=False), | |
| gr.update(message="系统错误,请稍后重试。", type="error", visible=True), | |
| "" | |
| ) | |
| def prompt_select(e: gr.EventData) -> gr.update: | |
| """处理提示选择""" | |
| try: | |
| return gr.update(value=e._data["payload"][0]["value"]["description"]) | |
| except Exception as e: | |
| logger.error(f"Error in prompt_select: {e}") | |
| return gr.update(value="") | |
| def clear() -> gr.update: | |
| """清空聊天记录""" | |
| return gr.update(value=None) | |
| def retry(chatbot_value: List, e: gr.EventData, username: Optional[str] = None): | |
| """重试功能""" | |
| try: | |
| index = e._data["payload"][0]["index"] | |
| chatbot_value = chatbot_value[:index] | |
| yield gr.update(value=None, loading=True), gr.update(value=chatbot_value), gr.update(disabled=True) | |
| for chunk in submit(None, chatbot_value, username): | |
| yield chunk | |
| except Exception as e: | |
| logger.error(f"Error in retry: {e}") | |
| yield gr.update(value=None, loading=False), gr.update(value=chatbot_value), gr.update(disabled=False) | |
| def cancel(chatbot_value: List) -> Tuple: | |
| """取消当前对话""" | |
| try: | |
| if chatbot_value: | |
| chatbot_value[-1]["loading"] = False | |
| chatbot_value[-1]["status"] = "done" | |
| chatbot_value[-1]["footer"] = "Chat completion paused" | |
| return ( | |
| gr.update(value=chatbot_value), | |
| gr.update(loading=False), | |
| gr.update(disabled=False) | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error in cancel: {e}") | |
| return ( | |
| gr.update(value=chatbot_value), | |
| gr.update(loading=False), | |
| gr.update(disabled=False) | |
| ) | |
| def submit(sender_value: Optional[str], chatbot_value: List, username: Optional[str] = None): | |
| """提交聊天消息""" | |
| start_time = time.time() | |
| try: | |
| # 添加用户消息 | |
| if sender_value is not None: | |
| chatbot_value.append({ | |
| "role": "user", | |
| "content": sender_value, | |
| }) | |
| # 格式化历史消息 | |
| history_messages = chat_manager.format_history(sender_value, chatbot_value, username) | |
| # 添加助手消息占位符 | |
| chatbot_value.append({ | |
| "role": "assistant", | |
| "content": [], | |
| "loading": True, | |
| "status": "pending" | |
| }) | |
| # 更新UI状态 | |
| yield ( | |
| gr.update(value=None, loading=True), # sender | |
| gr.update(value=chatbot_value), # chatbot | |
| gr.update(disabled=True) # clear_btn | |
| ) | |
| # 创建聊天完成请求 | |
| response = chat_manager.create_chat_completion(history_messages) | |
| # 处理流式响应 | |
| thought_done = False | |
| message_content = chatbot_value[-1]["content"] | |
| # 初始化消息内容结构 | |
| message_content.append({ | |
| "copyable": False, | |
| "editable": False, | |
| "type": "tool", | |
| "content": "", | |
| "options": {"title": "Thinking..."} | |
| }) | |
| message_content.append({ | |
| "type": "text", | |
| "content": "", | |
| }) | |
| full_assistant_content = "" | |
| # 处理流式响应 | |
| for chunk in response: | |
| try: | |
| reasoning_content = getattr(chunk.choices[0].delta, 'reasoning_content', None) or "" | |
| content = getattr(chunk.choices[0].delta, 'content', None) or "" | |
| chatbot_value[-1]["loading"] = False | |
| message_content[-2]["content"] += reasoning_content | |
| message_content[-1]["content"] += content | |
| if content: | |
| full_assistant_content += content | |
| if content and not thought_done: | |
| thought_done = True | |
| thought_cost_time = f"{time.time() - start_time:.2f}" | |
| message_content[-2]["options"]["title"] = f"End of Thought ({thought_cost_time}s)" | |
| message_content[-2]["options"]["status"] = "done" | |
| yield ( | |
| gr.update(), # sender | |
| gr.update(value=chatbot_value), # chatbot | |
| gr.update() # clear_btn | |
| ) | |
| except Exception as chunk_error: | |
| logger.error(f"Error processing chunk: {chunk_error}") | |
| continue | |
| # 保存到记忆 | |
| if username and sender_value and full_assistant_content: | |
| memory_messages = [ | |
| {'role': 'user', 'content': sender_value}, | |
| {'role': 'assistant', 'content': full_assistant_content} | |
| ] | |
| memory_manager.add_memory(memory_messages, username) | |
| # 完成响应 | |
| total_time = f"{time.time() - start_time:.2f}s" | |
| chatbot_value[-1]["footer"] = total_time | |
| chatbot_value[-1]["status"] = "done" | |
| yield ( | |
| gr.update(loading=False), # sender | |
| gr.update(value=chatbot_value), # chatbot | |
| gr.update(disabled=False) # clear_btn | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error in submit: {e}") | |
| # 错误处理 | |
| if chatbot_value: | |
| chatbot_value[-1]["loading"] = False | |
| chatbot_value[-1]["status"] = "done" | |
| chatbot_value[-1]["content"] = "抱歉,处理您的请求时出现错误,请稍后重试。" | |
| yield ( | |
| gr.update(loading=False), # sender | |
| gr.update(value=chatbot_value), # chatbot | |
| gr.update(disabled=False) # clear_btn | |
| ) | |
| # 创建Gradio界面 | |
| def create_interface(): | |
| """创建Gradio界面""" | |
| with gr.Blocks(title="Xinyuan 聊天助手") as demo, ms.Application(), antdx.XProvider(): | |
| # 状态变量 | |
| current_user = gr.State("") | |
| # 登录界面 | |
| with antd.Flex(vertical=True, gap="large", elem_id="login_container") as login_container: | |
| with antd.Card(title="欢迎使用 Xinyuan 聊天助手"): | |
| with antd.Flex(vertical=True, gap="middle"): | |
| antd.Typography.Title("用户登录/注册", level=3) | |
| antd.Typography.Text("请输入您的英文用户名(3位以上,仅支持英文字母、数字和下划线)") | |
| username_input = antd.Input( | |
| placeholder="请输入用户名(如:john_doe)", | |
| size="large" | |
| ) | |
| with antd.Flex(gap="small"): | |
| login_btn = antd.Button("登录", type="primary", size="large") | |
| register_btn = antd.Button("注册", size="large") | |
| auth_message = antd.Alert( | |
| message="请输入用户名", | |
| type="info", | |
| visible=False | |
| ) | |
| # 聊天界面 | |
| with antd.Flex(vertical=True, gap="middle", visible=False) as chat_container: | |
| # 用户信息栏 | |
| with antd.Flex(justify="space-between", align="center"): | |
| user_info = gr.Markdown("") | |
| logout_btn = antd.Button("退出登录", size="small") | |
| # 聊天机器人组件 | |
| chatbot = pro.Chatbot( | |
| height=config.chatbot_height, | |
| welcome_config=ChatbotWelcomeConfig( | |
| variant="borderless", | |
| icon="./xinyuan.png", | |
| title="Hello, I'm Xinyuan👋", | |
| description="You can input text to get started.", | |
| prompts=ChatbotPromptsConfig( | |
| title="How can I help you today?", | |
| styles={ | |
| "list": {"width": '100%'}, | |
| "item": {"flex": 1}, | |
| }, | |
| items=[ | |
| { | |
| "label": "💝 心理学与实际应用", | |
| "children": [ | |
| {"description": "课题分离是什么意思?"}, | |
| {"description": "回避型依恋和焦虑型依恋有什么区别?还有其他依恋类型吗?"}, | |
| {"description": "为什么我背单词的时候总是只记得开头和结尾,中间全忘了?"} | |
| ] | |
| }, | |
| { | |
| "label": "👪 儿童教育与发展", | |
| "children": [ | |
| {"description": "什么是正念养育?"}, | |
| {"description": "2岁孩子分离焦虑严重,送托育中心天天哭闹怎么办?"}, | |
| {"description": "4岁娃说话不清还爱打人,是心理问题还是欠管教?"} | |
| ] | |
| } | |
| ] | |
| ) | |
| ), | |
| user_config=ChatbotUserConfig( | |
| avatar="https://api.dicebear.com/7.x/miniavs/svg?seed=3", | |
| variant="shadow" | |
| ), | |
| bot_config=ChatbotBotConfig( | |
| header='Xinyuan', | |
| avatar="./xinyuan.png", | |
| actions=["copy", "retry"], | |
| variant="shadow" | |
| ), | |
| ) | |
| # 发送器组件 | |
| with antdx.Sender() as sender: | |
| with ms.Slot("prefix"): | |
| with antd.Button(value=None, color="default", variant="text") as clear_btn: | |
| with ms.Slot("icon"): | |
| antd.Icon("ClearOutlined") | |
| # 事件处理函数 | |
| def handle_login(username: str): | |
| return handle_auth(username, False) | |
| def handle_register(username: str): | |
| return handle_auth(username, True) | |
| def handle_logout(): | |
| return ( | |
| gr.update(visible=True), # 显示登录界面 | |
| gr.update(visible=False), # 隐藏聊天界面 | |
| gr.update(message="已退出登录", type="info", visible=True), | |
| gr.update(value=""), # 清空用户名输入 | |
| "", # 清空用户信息显示 | |
| "" # 清空当前用户状态 | |
| ) | |
| def update_user_info(username: str) -> str: | |
| return f"**当前用户: {username}**" if username else "" | |
| # 绑定事件 | |
| login_btn.click( | |
| fn=handle_login, | |
| inputs=[username_input], | |
| outputs=[login_container, chat_container, auth_message, current_user] | |
| ).then( | |
| fn=update_user_info, | |
| inputs=[current_user], | |
| outputs=[user_info] | |
| ) | |
| register_btn.click( | |
| fn=handle_register, | |
| inputs=[username_input], | |
| outputs=[login_container, chat_container, auth_message, current_user] | |
| ).then( | |
| fn=update_user_info, | |
| inputs=[current_user], | |
| outputs=[user_info] | |
| ) | |
| logout_btn.click( | |
| fn=handle_logout, | |
| outputs=[login_container, chat_container, auth_message, username_input, user_info, current_user] | |
| ) | |
| # 聊天功能事件绑定 | |
| clear_btn.click(fn=clear, outputs=[chatbot]) | |
| submit_event = sender.submit( | |
| fn=submit, | |
| inputs=[sender, chatbot, current_user], | |
| outputs=[sender, chatbot, clear_btn] | |
| ) | |
| sender.cancel( | |
| fn=cancel, | |
| inputs=[chatbot], | |
| outputs=[chatbot, sender, clear_btn], | |
| cancels=[submit_event], | |
| queue=False | |
| ) | |
| chatbot.retry( | |
| fn=retry, | |
| inputs=[chatbot, current_user], | |
| outputs=[sender, chatbot, clear_btn] | |
| ) | |
| chatbot.welcome_prompt_select(fn=prompt_select, outputs=[sender]) | |
| return demo | |
| def main(): | |
| """主函数""" | |
| try: | |
| logger.info("Starting Xinyuan Chat Application") | |
| demo = create_interface() | |
| demo.queue().launch() | |
| except Exception as e: | |
| logger.error(f"Failed to start application: {e}") | |
| raise | |
| if __name__ == "__main__": | |
| main() | |