Spaces:
Sleeping
Sleeping
| # app.py - Hugging Face 多模态 AI 智能体 (终极精简修复版) | |
| # 你只需要在 Settings -> Secrets 中设置 HF_TOKEN | |
| import os | |
| import time | |
| import json | |
| import re | |
| import base64 | |
| import tempfile | |
| import gradio as gr | |
| # ======================= | |
| # 配置:Hugging Face Token(从 Secrets 获取) | |
| # ======================= | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| if not HF_TOKEN: | |
| raise ValueError("请设置 HF_TOKEN 环境变量(Settings → Secrets)") | |
| # ⚠️ 关键修改:直接使用 Hugging Face 官方 Inference API | |
| # 无需部署 Endpoint,支持 后缀 | |
| INFERENCE_API_BASE_URL = "https://api-inference.huggingface.co/v1" | |
| # 添加调试模式 | |
| DEBUG_MODE = os.getenv("DEBUG_MODE", "false").lower() == "true" | |
| def debug_print(message): | |
| """调试打印函数""" | |
| if DEBUG_MODE: | |
| print(f"[DEBUG] {message}") | |
| # 添加网络连接测试函数 | |
| def test_hf_connection(): | |
| """测试Hugging Face API连接""" | |
| try: | |
| import requests | |
| test_url = "https://api-inference.huggingface.co/v1/models" | |
| headers = {"Authorization": f"Bearer {HF_TOKEN}"} | |
| debug_print("测试Hugging Face API连接...") | |
| response = requests.get(test_url, headers=headers, timeout=10) | |
| debug_print(f"连接测试响应状态: {response.status_code}") | |
| if response.status_code == 200: | |
| debug_print("Hugging Face API连接成功") | |
| return True | |
| elif response.status_code == 401: | |
| print("错误: Hugging Face API密钥无效") | |
| return False | |
| else: | |
| print(f"警告: Hugging Face API连接测试返回状态码 {response.status_code}") | |
| return True # 其他状态码可能仍然可以工作 | |
| except Exception as e: | |
| print(f"警告: 无法连接到Hugging Face API: {e}") | |
| return False | |
| def extract_json_from_response(response_text): | |
| """ | |
| 从模型响应中提取并解析JSON(修复UTF-8编码问题) | |
| :param response_text: 模型的响应文本 | |
| :return: 解析后的JSON对象 | |
| """ | |
| if not response_text: | |
| raise ValueError("响应文本为空") | |
| # 步骤1: 强制转换为UTF-8,避免编码混淆 | |
| try: | |
| response_text = response_text.encode('latin1').decode('utf-8') | |
| except Exception: | |
| try: | |
| response_text = response_text.encode('utf-8').decode('utf-8') | |
| except Exception: | |
| pass | |
| # 清理响应文本,移除可能的前缀或后缀 | |
| response_text = response_text.strip() | |
| # 移除可能的 Markdown 代码块标记 | |
| if response_text.startswith("```json"): | |
| response_text = response_text[7:] | |
| if response_text.startswith("```"): | |
| response_text = response_text[3:] | |
| if response_text.endswith("```"): | |
| response_text = response_text[:-3] | |
| response_text = response_text.strip() | |
| # 方法1: 直接解析 | |
| try: | |
| return json.loads(response_text) | |
| except json.JSONDecodeError as e: | |
| debug_print(f"直接解析失败: {e}") | |
| # 方法2: 查找第一个完整的JSON对象 | |
| stack = [] | |
| start = -1 | |
| for i, char in enumerate(response_text): | |
| if char == '{': | |
| if not stack: | |
| start = i | |
| stack.append(char) | |
| elif char == '}': | |
| if stack: | |
| stack.pop() | |
| if not stack and start != -1: | |
| json_str = response_text[start:i+1] | |
| try: | |
| json_str = json_str.encode('utf-8').decode('utf-8') | |
| return json.loads(json_str) | |
| except (json.JSONDecodeError, UnicodeDecodeError): | |
| continue | |
| # 方法3: 使用正则表达式提取 | |
| json_matches = re.findall(r'\{(?:[^{}]|(?R))*\}', response_text) | |
| if json_matches: | |
| clean_json = max(json_matches, key=len) | |
| clean_json = clean_json.strip() | |
| clean_json = re.sub(r'(?<!\\)"', '"', clean_json) | |
| try: | |
| clean_json = clean_json.encode('utf-8').decode('utf-8') | |
| return json.loads(clean_json) | |
| except (json.JSONDecodeError, UnicodeDecodeError) as e: | |
| debug_print(f"清理后的JSON解析失败: {e}") | |
| # 如果所有方法都失败,抛出更详细的错误信息 | |
| raise ValueError(f"无法从响应中提取有效的JSON。响应长度: {len(response_text)}, 响应前缀: {response_text[:100] if len(response_text) > 100 else response_text}") | |
| # ======================= | |
| # 工具函数:调用模型 | |
| # ======================= | |
| def query_model(prompt, model_name="Qwen/Qwen3-4B-Thinking-2507"): | |
| """ | |
| 通用模型调用函数。 | |
| :param prompt: 输入提示词 | |
| :param model_name: 要调用的完整模型ID,例如 "Qwen/Qwen3-4B-Thinking-2507" | |
| :return: 模型生成的文本 | |
| """ | |
| try: | |
| from openai import OpenAI | |
| except ImportError: | |
| raise ImportError("请安装 openai: pip install openai") | |
| client = OpenAI( | |
| base_url=INFERENCE_API_BASE_URL, # 使用官方 API | |
| api_key=HF_TOKEN # 使用你的 Token | |
| ) | |
| # 添加模型名称验证和备用模型 | |
| available_models = [ | |
| "Qwen/Qwen3-4B-Thinking-2507", | |
| "Qwen/Qwen2.5-7B-Instruct", | |
| "meta-llama/Llama-3.1-8B-Instruct", | |
| "mistralai/Mistral-Nemo-Instruct-2407" | |
| ] | |
| # 如果指定的模型不在可用列表中,使用默认模型 | |
| if model_name not in available_models: | |
| print(f"警告: 模型 {model_name} 不在推荐列表中,将使用默认模型") | |
| model_name = "Qwen/Qwen3-4B-Thinking-2507" | |
| try: | |
| debug_print(f"正在调用模型: {model_name}") | |
| debug_print(f"提示词长度: {len(prompt)} 字符") | |
| response = client.chat.completions.create( | |
| model=model_name, | |
| messages=[ | |
| {"role": "user", "content": prompt} | |
| ], | |
| max_tokens=1000, | |
| temperature=0.7 | |
| ) | |
| debug_print(f"API响应状态: 成功") | |
| if response.choices: | |
| content = response.choices[0].message.content.strip() | |
| debug_print(f"响应内容长度: {len(content) if content else 0} 字符") | |
| # 检查空响应 | |
| if not content: | |
| print("警告: 模型返回空内容") | |
| return None | |
| try: | |
| content = content.encode('utf-8').decode('utf-8') | |
| except Exception: | |
| pass | |
| return content | |
| else: | |
| print("警告: API响应中没有choices字段") | |
| return None | |
| except Exception as e: | |
| error_msg = f"Error calling model: {e}" | |
| print(error_msg) | |
| debug_print(f"错误详情: {type(e).__name__}: {str(e)}") | |
| # 更详细的错误处理 | |
| if "401" in str(e): | |
| print("错误:API密钥无效。请检查HF_TOKEN环境变量。") | |
| elif "404" in str(e): | |
| print(f"错误:模型 {model_name} 未找到。尝试使用备用模型...") | |
| # 尝试备用模型 | |
| for backup_model in available_models: | |
| if backup_model != model_name: | |
| print(f"尝试备用模型: {backup_model}") | |
| try: | |
| response = client.chat.completions.create( | |
| model=backup_model, | |
| messages=[ | |
| {"role": "user", "content": prompt} | |
| ], | |
| max_tokens=1000, | |
| temperature=0.7 | |
| ) | |
| if response.choices: | |
| content = response.choices[0].message.content.strip() | |
| # 检查空响应 | |
| if not content: | |
| print(f"备用模型 {backup_model} 返回空内容") | |
| continue | |
| try: | |
| content = content.encode('utf-8').decode('utf-8') | |
| except Exception: | |
| pass | |
| print(f"备用模型 {backup_model} 调用成功") | |
| return content | |
| except Exception as backup_e: | |
| print(f"备用模型 {backup_model} 调用失败: {backup_e}") | |
| continue | |
| print("所有模型都调用失败") | |
| elif "timeout" in str(e).lower() or "time out" in str(e).lower(): | |
| print("错误:API调用超时,请检查网络连接。") | |
| elif "connection" in str(e).lower(): | |
| print("错误:网络连接问题,请检查网络设置。") | |
| return None | |
| # ======================= | |
| # 核心组件:任务规划器 (Planner) | |
| # ======================= | |
| def plan_tasks(user_request, filename=None): | |
| """ | |
| 调用LLM将用户需求分解为具体的、可执行的步骤。 | |
| 返回一个包含步骤列表的字典。 | |
| """ | |
| file_info = f"用户上传了文件: {filename}" if filename else "用户没有上传文件。" | |
| # 重构提示词,避免编码偏移 | |
| planning_prompt = ( | |
| "你是一个AI任务规划专家。请根据以下信息,将用户的复杂需求分解为一系列具体的、可执行的原子步骤。\n\n" | |
| f"{file_info}\n" | |
| f"用户需求: {user_request}\n\n" | |
| "请严格按照以下JSON格式输出你的计划,不要包含任何其他文字,只输出JSON:\n\n" | |
| "{\n" | |
| ' "thought": "你的整体思考过程",\n' | |
| ' "steps": [\n' | |
| ' {\n' | |
| ' "step_number": 1,\n' | |
| ' "description": "第一步要做什么的详细描述",\n' | |
| ' "action": "需要执行的动作类型,例如: \'call_model\'",\n' | |
| ' "model_id": "Hugging Face上要调用的模型ID,根据文件类型和需求选择最合适的模型。",\n' | |
| ' "input": "该步骤需要的输入参数(可以是字符串或对象)"\n' | |
| " }\n" | |
| " ]\n" | |
| "}" | |
| ) | |
| debug_print("开始调用模型进行任务规划...") | |
| planning_response = query_model(planning_prompt) | |
| debug_print(f"模型响应: {planning_response[:200] if planning_response else '空响应'}") | |
| # 增强空响应处理 | |
| if not planning_response: | |
| print("任务规划失败: 模型返回空响应") | |
| print("可能的原因:") | |
| print("1. Hugging Face API密钥无效") | |
| print("2. 网络连接问题") | |
| print("3. 模型当前不可用") | |
| print("4. 请求超时") | |
| raise Exception("任务规划失败: 模型返回空响应") | |
| try: | |
| plan = extract_json_from_response(planning_response) | |
| debug_print("JSON解析成功") | |
| return plan | |
| except Exception as e: | |
| error_msg = f"规划解析失败: {e}\n模型响应内容: {planning_response[:500] if planning_response else '空响应'}" | |
| print(error_msg) | |
| # 添加更多调试信息 | |
| if planning_response: | |
| print(f"响应长度: {len(planning_response)} 字符") | |
| print(f"响应前100字符: {planning_response[:100] if len(planning_response) > 100 else planning_response}") | |
| raise Exception("任务规划失败,请重新描述您的需求。") | |
| # ======================= | |
| # 核心组件:任务验证器 (Validator) | |
| # ======================= | |
| def validate_step(step, step_result, original_request): | |
| """ | |
| 调用LLM验证当前步骤的执行结果是否满足要求。 | |
| :return: (是否通过, 反馈信息) | |
| """ | |
| validation_prompt = ( | |
| "你是一个严格的AI任务质量检查员。请根据原始用户需求,判断当前步骤的执行结果是否合格。\n\n" | |
| f"原始用户需求: {original_request}\n" | |
| f"当前步骤描述: {step.get('description', '无描述')}\n" | |
| f"当前步骤执行结果: {step_result}\n\n" | |
| "请严格按照以下JSON格式输出你的验证结果,不要包含任何其他文字,只输出JSON:\n\n" | |
| "{\n" | |
| ' "is_valid": true 或 false,\n' | |
| ' "feedback": "你的详细反馈,如果失败请说明原因"\n' | |
| "}" | |
| ) | |
| debug_print("开始调用模型进行步骤验证...") | |
| validation_response = query_model(validation_prompt) | |
| debug_print(f"验证响应: {validation_response[:200] if validation_response else '空响应'}") | |
| try: | |
| if not validation_response: | |
| raise ValueError("模型返回空响应") | |
| result = extract_json_from_response(validation_response) | |
| is_valid = result.get("is_valid", False) | |
| feedback = result.get("feedback", "验证无反馈") | |
| debug_print(f"验证结果: 通过={is_valid}, 反馈={feedback}") | |
| return is_valid, feedback | |
| except Exception as e: | |
| error_msg = f"验证解析失败: {e}\n模型响应内容: {validation_response[:500] if validation_response else '空响应'}" | |
| print(error_msg) | |
| return False, "验证过程出现异常,请检查。" | |
| # ======================= | |
| # 核心组件:任务执行器 (Executor) | |
| # ======================= | |
| def execute_step(step, uploaded_file): | |
| """ | |
| 根据步骤描述,调用具体的 Hugging Face 模型执行操作。 | |
| :return: (执行结果的文本描述, 生成的文件路径(如果有)) | |
| """ | |
| action_type = step.get("action") | |
| model_id = step.get("model_id") | |
| tool_input = step.get("input", {}) | |
| description = step.get("description", "") | |
| result_text = "执行成功。" | |
| generated_file = None | |
| try: | |
| from openai import OpenAI | |
| client = OpenAI( | |
| base_url=INFERENCE_API_BASE_URL, | |
| api_key=HF_TOKEN | |
| ) | |
| messages = [] | |
| if uploaded_file: | |
| with open(uploaded_file.name, "rb") as file: | |
| file_bytes = file.read() | |
| file_base64 = base64.b64encode(file_bytes).decode('utf-8') | |
| file_ext = uploaded_file.name.split('.')[-1].lower() | |
| file_context = f"[用户上传的文件 (类型: {file_ext}) 已转换为Base64编码,内容如下]:\n{file_base64}\n\n" | |
| messages.append({"role": "user", "content": file_context}) | |
| if isinstance(tool_input, str): | |
| user_instruction = tool_input | |
| else: | |
| user_instruction = json.dumps(tool_input, ensure_ascii=False) | |
| messages.append({"role": "user", "content": user_instruction}) | |
| response = client.chat.completions.create( | |
| model=model_id, | |
| messages=messages, | |
| max_tokens=1500, | |
| temperature=0.7 | |
| ) | |
| result_text = response.choices[0].message.content.strip() | |
| if "data:image/" in result_text or "application/" in result_text: | |
| try: | |
| header, encoded = result_text.split(",", 1) | |
| file_data = base64.b64decode(encoded) | |
| if "image/png" in header: | |
| ext = "png" | |
| elif "image/jpeg" in header: | |
| ext = "jpg" | |
| elif "application/pdf" in header: | |
| ext = "pdf" | |
| else: | |
| ext = "bin" | |
| temp_dir = tempfile.mkdtemp() | |
| file_path = os.path.join(temp_dir, f"generated_output.{ext}") | |
| with open(file_path, "wb") as f: | |
| f.write(file_data) | |
| generated_file = file_path | |
| result_text = f"已生成文件: generated_output.{ext}" | |
| except Exception as decode_error: | |
| print(f"Base64解码失败: {decode_error}") | |
| except Exception as e: | |
| raise Exception(f"执行动作 '{action_type}' 时出错: {str(e)}") | |
| return result_text, generated_file | |
| # ======================= | |
| # 主控函数:AI 智能体 (Agent) | |
| # ======================= | |
| def ai_agent_master(uploaded_file, user_request): | |
| """ | |
| AI智能体主控函数 | |
| :param uploaded_file: Gradio上传的文件对象 | |
| :param user_request: 用户的文本需求 | |
| :yield: (执行日志, 下载文件路径) | |
| """ | |
| chat_history = [] | |
| chat_history.append({"role": "user", "content": f"需求: {user_request}"}) | |
| if uploaded_file: | |
| chat_history.append({"role": "user", "content": f"已上传文件: {uploaded_file.name}"}) | |
| yield chat_history, None | |
| try: | |
| chat_history.append({"role": "assistant", "content": "正在分析需求并分解任务..."}) | |
| yield chat_history, None | |
| # 添加重试机制 | |
| max_retries = 3 | |
| retry_count = 0 | |
| task_plan = None | |
| while retry_count < max_retries and task_plan is None: | |
| try: | |
| task_plan = plan_tasks(user_request, uploaded_file.name if uploaded_file else None) | |
| except Exception as e: | |
| retry_count += 1 | |
| if retry_count < max_retries: | |
| print(f"任务规划失败,{retry_count}秒后重试... (第{retry_count}次)") | |
| time.sleep(retry_count) # 递增延迟 | |
| else: | |
| raise e # 达到最大重试次数,抛出异常 | |
| if not task_plan or "steps" not in task_plan: | |
| raise Exception("任务规划返回无效格式,请重试。") | |
| plan_summary = "\n".join([f"{s['step_number']}. [{s['model_id']}] {s['description']}" for s in task_plan.get("steps", [])]) | |
| chat_history.append({"role": "assistant", "content": f"任务分解完成:\n{plan_summary}"}) | |
| yield chat_history, None | |
| final_generated_file = None | |
| for step in task_plan.get("steps", []): | |
| step_num = step.get("step_number", "未知") | |
| step_desc = step.get("description", "无描述") | |
| step_model = step.get("model_id", "未知") | |
| chat_history.append({"role": "assistant", "content": f"正在调用模型 {step_model} 执行步骤 {step_num}: {step_desc}..."}) | |
| yield chat_history, None | |
| # 添加步骤执行重试机制 | |
| step_retry_count = 0 | |
| step_result = None | |
| step_file = None | |
| while step_retry_count < max_retries and step_result is None: | |
| try: | |
| step_result, step_file = execute_step(step, uploaded_file) | |
| except Exception as e: | |
| step_retry_count += 1 | |
| if step_retry_count < max_retries: | |
| print(f"步骤执行失败,{step_retry_count}秒后重试... (第{step_retry_count}次)") | |
| time.sleep(step_retry_count) | |
| else: | |
| raise e # 达到最大重试次数,抛出异常 | |
| if step_file: | |
| final_generated_file = step_file | |
| if not step_result: | |
| raise Exception(f"步骤 {step_num} 执行返回空结果") | |
| display_result = step_result if len(step_result) < 500 else step_result[:500] + "... (结果过长已截断)" | |
| chat_history.append({"role": "assistant", "content": f"步骤 {step_num} 结果: {display_result}"}) | |
| yield chat_history, None | |
| chat_history.append({"role": "assistant", "content": f"正在验证步骤 {step_num}..."}) | |
| yield chat_history, None | |
| # 添加验证重试机制 | |
| validation_retry_count = 0 | |
| is_valid = False | |
| validation_feedback = "" | |
| while validation_retry_count < max_retries and not is_valid: | |
| try: | |
| is_valid, validation_feedback = validate_step(step, step_result, user_request) | |
| except Exception as e: | |
| validation_retry_count += 1 | |
| if validation_retry_count < max_retries: | |
| print(f"步骤验证失败,{validation_retry_count}秒后重试... (第{validation_retry_count}次)") | |
| time.sleep(validation_retry_count) | |
| else: | |
| # 验证失败不终止整个流程,但记录警告 | |
| print(f"步骤 {step_num} 验证失败: {e}") | |
| is_valid = True # 继续执行而不是终止 | |
| validation_feedback = "验证过程出现异常,但继续执行下一步" | |
| if not is_valid: | |
| chat_history.append({"role": "assistant", "content": f"步骤 {step_num} 未通过验证: {validation_feedback}"}) | |
| yield chat_history, None | |
| # 不再抛出异常终止整个流程,而是记录警告继续 | |
| print(f"警告: 步骤 {step_num} 验证失败,但继续执行下一步") | |
| chat_history.append({"role": "assistant", "content": f"步骤 {step_num} 验证通过!反馈: {validation_feedback}"}) | |
| yield chat_history, None | |
| time.sleep(1) | |
| chat_history.append({"role": "assistant", "content": "所有任务步骤已成功完成!"}) | |
| yield chat_history, final_generated_file | |
| except Exception as e: | |
| error_msg = f"执行过程中出现错误: {str(e)}" | |
| print(f"详细错误信息: {type(e).__name__}: {str(e)}") | |
| chat_history.append({"role": "assistant", "content": error_msg}) | |
| yield chat_history, None | |
| # ======================= | |
| # Gradio 界面 | |
| # ======================= | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# 🤖 多模态 AI 智能体 (终极精简版)") | |
| gr.Markdown("上传任意文件,AI将自动判断类型并调用合适的模型完成您的任务。") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| file_upload = gr.File(label="📂 上传文件 (支持: .docx, .xlsx, .pptx, .pdf, .jpg, .png, .txt 等)") | |
| user_input = gr.Textbox( | |
| label="你的复杂需求", | |
| placeholder="例如:\n1. 总结我上传的PDF。\n2. 分析这个Excel表格,告诉我销售额最高的产品。\n3. 把这张图片里的文字提取出来。\n4. 根据我的Word大纲,写一篇完整的文章。", | |
| lines=4 | |
| ) | |
| submit_btn = gr.Button("🚀 开始执行", variant="primary") | |
| with gr.Column(scale=3): | |
| output = gr.Chatbot(label="执行日志与结果", height=600, type='messages') | |
| download_file = gr.File(label="📥 下载结果文件") | |
| submit_btn.click( | |
| fn=ai_agent_master, | |
| inputs=[file_upload, user_input], | |
| outputs=[output, download_file] | |
| ) | |
| # 启动 | |
| if __name__ == "__main__": | |
| print("正在初始化AI智能体...") | |
| if DEBUG_MODE: | |
| print("调试模式已启用") | |
| # 测试Hugging Face连接 | |
| if test_hf_connection(): | |
| print("Hugging Face API连接测试通过") | |
| else: | |
| print("警告: Hugging Face API连接测试失败,但仍将继续启动...") | |
| print("启动Gradio界面...") | |
| demo.launch() |