| |
| from flask import Blueprint, request, jsonify |
| import os |
| import json |
| import time |
| import uuid |
| import requests |
| from config import STREAM_API_KEY, STREAM_BASE_URL, DEFAULT_MODEL |
|
|
| agent_builder_bp = Blueprint('agent_builder', __name__) |
|
|
| |
| AGENTS_DIR = 'agents' |
| os.makedirs(AGENTS_DIR, exist_ok=True) |
|
|
| @agent_builder_bp.route('/create', methods=['POST']) |
| def create_agent(): |
| """创建新的Agent""" |
| try: |
| data = request.json |
| name = data.get('name') |
| description = data.get('description', '') |
| subject = data.get('subject', name) |
| instructor = data.get('instructor', '教师') |
| plugins = data.get('plugins', []) |
| knowledge_bases = data.get('knowledge_bases', []) |
| workflow = data.get('workflow') |
| |
| if not name: |
| return jsonify({ |
| "success": False, |
| "message": "Agent名称不能为空" |
| }), 400 |
| |
| |
| agent_id = f"agent_{uuid.uuid4().hex[:8]}_{int(time.time())}" |
| |
| |
| agent_config = { |
| "id": agent_id, |
| "name": name, |
| "description": description, |
| "subject": subject, |
| "instructor": instructor, |
| "created_at": int(time.time()), |
| "plugins": plugins, |
| "knowledge_bases": knowledge_bases, |
| "workflow": workflow or { |
| "nodes": [], |
| "edges": [] |
| }, |
| "distributions": [], |
| "stats": { |
| "usage_count": 0, |
| "last_used": None |
| } |
| } |
| |
| |
| with open(os.path.join(AGENTS_DIR, f"{agent_id}.json"), 'w', encoding='utf-8') as f: |
| json.dump(agent_config, f, ensure_ascii=False, indent=2) |
| |
| return jsonify({ |
| "success": True, |
| "agent_id": agent_id, |
| "message": f"Agent '{name}' 创建成功" |
| }) |
| |
| except Exception as e: |
| import traceback |
| traceback.print_exc() |
| return jsonify({ |
| "success": False, |
| "message": str(e) |
| }), 500 |
|
|
| @agent_builder_bp.route('/list', methods=['GET']) |
| def list_agents(): |
| """获取所有Agent列表""" |
| try: |
| agents = [] |
| |
| for filename in os.listdir(AGENTS_DIR): |
| if filename.endswith('.json'): |
| with open(os.path.join(AGENTS_DIR, filename), 'r', encoding='utf-8') as f: |
| agent_config = json.load(f) |
| |
| |
| agents.append({ |
| "id": agent_config.get("id"), |
| "name": agent_config.get("name"), |
| "description": agent_config.get("description"), |
| "subject": agent_config.get("subject", agent_config.get("name")), |
| "instructor": agent_config.get("instructor", "教师"), |
| "created_at": agent_config.get("created_at"), |
| "plugins": agent_config.get("plugins", []), |
| "knowledge_bases": agent_config.get("knowledge_bases", []), |
| "usage_count": agent_config.get("stats", {}).get("usage_count", 0), |
| "distribution_count": len(agent_config.get("distributions", [])) |
| }) |
| |
| |
| agents.sort(key=lambda x: x.get("created_at", 0), reverse=True) |
| |
| return jsonify({ |
| "success": True, |
| "agents": agents |
| }) |
| |
| except Exception as e: |
| import traceback |
| traceback.print_exc() |
| return jsonify({ |
| "success": False, |
| "message": str(e) |
| }), 500 |
|
|
| @agent_builder_bp.route('/<agent_id>', methods=['GET']) |
| def get_agent(agent_id): |
| """获取特定Agent的配置""" |
| try: |
| agent_path = os.path.join(AGENTS_DIR, f"{agent_id}.json") |
| |
| if not os.path.exists(agent_path): |
| return jsonify({ |
| "success": False, |
| "message": "Agent不存在" |
| }), 404 |
| |
| with open(agent_path, 'r', encoding='utf-8') as f: |
| agent_config = json.load(f) |
| |
| return jsonify({ |
| "success": True, |
| "agent": agent_config |
| }) |
| |
| except Exception as e: |
| import traceback |
| traceback.print_exc() |
| return jsonify({ |
| "success": False, |
| "message": str(e) |
| }), 500 |
|
|
| @agent_builder_bp.route('/<agent_id>', methods=['PUT']) |
| def update_agent(agent_id): |
| """更新Agent配置""" |
| try: |
| data = request.json |
| agent_path = os.path.join(AGENTS_DIR, f"{agent_id}.json") |
| |
| if not os.path.exists(agent_path): |
| return jsonify({ |
| "success": False, |
| "message": "Agent不存在" |
| }), 404 |
| |
| |
| with open(agent_path, 'r', encoding='utf-8') as f: |
| agent_config = json.load(f) |
| |
| |
| if 'name' in data: |
| agent_config['name'] = data['name'] |
| |
| if 'description' in data: |
| agent_config['description'] = data['description'] |
| |
| if 'plugins' in data: |
| agent_config['plugins'] = data['plugins'] |
| |
| if 'knowledge_bases' in data: |
| agent_config['knowledge_bases'] = data['knowledge_bases'] |
| |
| if 'workflow' in data: |
| agent_config['workflow'] = data['workflow'] |
| |
| |
| agent_config['updated_at'] = int(time.time()) |
| |
| |
| with open(agent_path, 'w', encoding='utf-8') as f: |
| json.dump(agent_config, f, ensure_ascii=False, indent=2) |
| |
| return jsonify({ |
| "success": True, |
| "message": "Agent更新成功" |
| }) |
| |
| except Exception as e: |
| import traceback |
| traceback.print_exc() |
| return jsonify({ |
| "success": False, |
| "message": str(e) |
| }), 500 |
|
|
| @agent_builder_bp.route('/<agent_id>', methods=['DELETE']) |
| def delete_agent(agent_id): |
| """删除Agent""" |
| try: |
| agent_path = os.path.join(AGENTS_DIR, f"{agent_id}.json") |
| |
| if not os.path.exists(agent_path): |
| return jsonify({ |
| "success": False, |
| "message": "Agent不存在" |
| }), 404 |
| |
| |
| os.remove(agent_path) |
| |
| return jsonify({ |
| "success": True, |
| "message": "Agent删除成功" |
| }) |
| |
| except Exception as e: |
| import traceback |
| traceback.print_exc() |
| return jsonify({ |
| "success": False, |
| "message": str(e) |
| }), 500 |
|
|
| @agent_builder_bp.route('/<agent_id>/distribute', methods=['POST']) |
| def distribute_agent(agent_id): |
| """为Agent创建分发链接""" |
| try: |
| data = request.json |
| expires_in = data.get('expires_in', 0) |
| |
| agent_path = os.path.join(AGENTS_DIR, f"{agent_id}.json") |
| |
| if not os.path.exists(agent_path): |
| return jsonify({ |
| "success": False, |
| "message": "Agent不存在" |
| }), 404 |
| |
| |
| with open(agent_path, 'r', encoding='utf-8') as f: |
| agent_config = json.load(f) |
| |
| |
| token = uuid.uuid4().hex |
| |
| |
| expiry = int(time.time() + expires_in) if expires_in > 0 else 0 |
| |
| |
| distribution = { |
| "id": f"dist_{uuid.uuid4().hex[:6]}", |
| "created_at": int(time.time()), |
| "token": token, |
| "expires_at": expiry, |
| "usage_count": 0 |
| } |
| |
| |
| if "distributions" not in agent_config: |
| agent_config["distributions"] = [] |
| |
| agent_config["distributions"].append(distribution) |
| |
| |
| with open(agent_path, 'w', encoding='utf-8') as f: |
| json.dump(agent_config, f, ensure_ascii=False, indent=2) |
| |
| |
| access_link = f"/student/{agent_id}?token={token}" |
| |
| return jsonify({ |
| "success": True, |
| "distribution": { |
| "id": distribution["id"], |
| "link": access_link, |
| "token": token, |
| "expires_at": expiry |
| }, |
| "message": "分发链接创建成功" |
| }) |
| |
| except Exception as e: |
| import traceback |
| traceback.print_exc() |
| return jsonify({ |
| "success": False, |
| "message": str(e) |
| }), 500 |
|
|
| @agent_builder_bp.route('/ai-assist', methods=['POST']) |
| def ai_assisted_workflow(): |
| """使用AI辅助Agent工作流编排""" |
| try: |
| data = request.json |
| description = data.get('description', '') |
| subject = data.get('subject', '通用学科') |
| knowledge_bases = data.get('knowledge_bases', []) |
| plugins = data.get('plugins', []) |
| |
| if not description: |
| return jsonify({ |
| "success": False, |
| "message": "请提供Agent描述" |
| }), 400 |
| |
| |
| available_knowledge_bases = [] |
| try: |
| |
| kb_response = requests.get("http://localhost:7860/api/knowledge") |
| if kb_response.status_code == 200: |
| kb_data = kb_response.json() |
| if kb_data.get("success"): |
| for kb in kb_data.get("data", []): |
| available_knowledge_bases.append(kb["id"]) |
| except: |
| |
| pass |
| |
| |
| available_plugins = ["code", "visualization", "mindmap"] |
| |
| |
| system_prompt = """你是一个专业的AI工作流设计师。你需要根据用户描述,设计一个适合教育场景的Agent工作流。 |
| 你不仅要设计工作流结构,还要推荐合适的知识库和插件。请确保工作流逻辑合理,能够满足用户的需求。 |
| |
| 工作流应包含以下类型的节点: |
| 1. 意图识别:识别用户输入的意图 |
| 2. 知识库查询:从指定知识库中检索信息 |
| 3. 插件调用:调用特定插件(如代码执行、3D可视化或思维导图) |
| 4. 回复生成:生成最终回复 |
| |
| 用户当前选择的知识库:{knowledge_bases} |
| 系统中可用的知识库有:{available_knowledge_bases} |
| |
| 用户当前选择的插件:{plugins} |
| 系统中可用的插件有:{available_plugins} |
| |
| 这个Agent的主题领域是:{subject} |
| |
| 重要:根据Agent描述,推荐最合适的知识库和插件。在推荐时,只能使用系统中真实可用的知识库和插件。 |
| |
| 请返回三部分内容: |
| 1. 推荐的知识库列表(只能从可用知识库中选择) |
| 2. 推荐的插件列表(只能从可用插件中选择) |
| 3. 完整的工作流JSON结构 |
| |
| JSON格式示例: |
| {{ |
| "recommended_knowledge_bases": ["rag_knowledge1", "rag_knowledge2"], |
| "recommended_plugins": ["code", "visualization"], |
| "workflow": {{ |
| "nodes": [ |
| {{ "id": "node1", "type": "intent_recognition", "data": {{ "name": "意图识别" }} }}, |
| {{ "id": "node2", "type": "knowledge_query", "data": {{ "name": "知识库查询", "knowledge_base_id": "rag_knowledge1" }} }}, |
| {{ "id": "node3", "type": "plugin_call", "data": {{ "name": "调用代码执行插件", "plugin_id": "code" }} }}, |
| {{ "id": "node4", "type": "generate_response", "data": {{ "name": "生成回复" }} }} |
| ], |
| "edges": [ |
| {{ "id": "edge1", "source": "node1", "target": "node2", "condition": "需要知识" }}, |
| {{ "id": "edge2", "source": "node1", "target": "node3", "condition": "需要代码执行" }}, |
| {{ "id": "edge3", "source": "node2", "target": "node4" }}, |
| {{ "id": "edge4", "source": "node3", "target": "node4" }} |
| ] |
| }} |
| }} |
| """ |
| |
| system_prompt = system_prompt.format( |
| knowledge_bases=", ".join(knowledge_bases) if knowledge_bases else "无", |
| available_knowledge_bases=", ".join(available_knowledge_bases) if available_knowledge_bases else "无可用知识库", |
| plugins=", ".join(plugins) if plugins else "无", |
| available_plugins=", ".join(available_plugins), |
| subject=subject |
| ) |
| |
| |
| try: |
| headers = { |
| "Authorization": f"Bearer {STREAM_API_KEY}", |
| "Content-Type": "application/json" |
| } |
| |
| response = requests.post( |
| f"{STREAM_BASE_URL}/chat/completions", |
| headers=headers, |
| json={ |
| "model": DEFAULT_MODEL, |
| "messages": [ |
| {"role": "system", "content": system_prompt}, |
| {"role": "user", "content": f"请为以下描述的教育Agent设计工作流并推荐知识库和插件:\n\n{description}"} |
| ] |
| } |
| ) |
| |
| if response.status_code != 200: |
| return jsonify({ |
| "success": False, |
| "message": f"Error code: {response.status_code} - {response.text}", |
| "workflow": create_default_workflow(), |
| "recommended_knowledge_bases": [], |
| "recommended_plugins": [] |
| }), 200 |
| |
| result = response.json() |
| content = result['choices'][0]['message']['content'] |
| |
| except Exception as api_error: |
| return jsonify({ |
| "success": False, |
| "message": f"无法连接到AI模型服务: {str(api_error)}", |
| "workflow": create_default_workflow(), |
| "recommended_knowledge_bases": [], |
| "recommended_plugins": [] |
| }), 200 |
| |
| |
| import re |
| json_match = re.search(r'```json\n([\s\S]*?)\n```', content) |
| |
| if json_match: |
| workflow_json = json_match.group(1) |
| else: |
| |
| workflow_json = content |
| |
| |
| try: |
| result_data = json.loads(workflow_json) |
| except: |
| |
| workflow_json = re.sub(r'```json\n|\n```', '', content) |
| try: |
| result_data = json.loads(workflow_json) |
| except: |
| |
| import re |
| json_patterns = re.findall(r'\{[\s\S]*?\}', content) |
| if json_patterns: |
| try: |
| |
| longest_json = max(json_patterns, key=len) |
| result_data = json.loads(longest_json) |
| except: |
| |
| return jsonify({ |
| "success": True, |
| "message": "使用默认工作流(AI生成的JSON无效)", |
| "workflow": create_default_workflow(), |
| "recommended_knowledge_bases": [], |
| "recommended_plugins": [] |
| }) |
| else: |
| |
| return jsonify({ |
| "success": True, |
| "message": "使用默认工作流(未找到JSON结构)", |
| "workflow": create_default_workflow(), |
| "recommended_knowledge_bases": [], |
| "recommended_plugins": [] |
| }) |
| |
| |
| recommended_knowledge_bases = result_data.get("recommended_knowledge_bases", []) |
| recommended_plugins = result_data.get("recommended_plugins", []) |
| workflow = result_data.get("workflow", create_default_workflow()) |
| |
| |
| valid_knowledge_bases = [] |
| for kb in recommended_knowledge_bases: |
| if kb in available_knowledge_bases: |
| valid_knowledge_bases.append(kb) |
| |
| |
| valid_plugins = [] |
| for plugin in recommended_plugins: |
| if plugin in available_plugins: |
| valid_plugins.append(plugin) |
| |
| return jsonify({ |
| "success": True, |
| "workflow": workflow, |
| "recommended_knowledge_bases": valid_knowledge_bases, |
| "recommended_plugins": valid_plugins, |
| "message": "已成功创建工作流" |
| }) |
| |
| except Exception as e: |
| import traceback |
| traceback.print_exc() |
| return jsonify({ |
| "success": False, |
| "message": str(e) |
| }), 500 |
|
|
| def create_default_workflow(): |
| """创建一个默认的空工作流""" |
| return { |
| "nodes": [], |
| "edges": [] |
| } |