import os import json import time import requests import markdown from flask import Flask, render_template, request, jsonify from flask_sqlalchemy import SQLAlchemy from datetime import datetime from dotenv import load_dotenv # 加载环境变量 load_dotenv() app = Flask(__name__) # 数据库配置,使用 SQLite 进行数据持久化 app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///spatial_yield.db' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SECRET_KEY'] = 'spatial-yield-secret-key' db = SQLAlchemy(app) # --- 数据库模型 --- class SpatialAsset(db.Model): """空间资产模型:管理低空航线、算力节点等资产""" id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(100), nullable=False) type = db.Column(db.String(50)) # 航线, 算力, 频谱, 基础设施 status = db.Column(db.String(50), default='闲置') # 闲置, 占用, 维护 yield_rate = db.Column(db.Float, default=0.0) # 收益率 (%) value = db.Column(db.Float, default=0.0) # 当前价值 (CNY) description = db.Column(db.Text, default="") last_updated = db.Column(db.DateTime, default=datetime.utcnow) class AgentLog(db.Model): """Agent 推理与决策日志""" id = db.Column(db.Integer, primary_key=True) timestamp = db.Column(db.DateTime, default=datetime.utcnow) query = db.Column(db.Text) thought = db.Column(db.Text) action = db.Column(db.Text) result = db.Column(db.Text) status = db.Column(db.String(50)) # 成功, 失败 class UploadedFile(db.Model): """文件上传记录""" id = db.Column(db.Integer, primary_key=True) filename = db.Column(db.String(255), nullable=False) filepath = db.Column(db.String(512), nullable=False) file_type = db.Column(db.String(50)) upload_time = db.Column(db.DateTime, default=datetime.utcnow) # 初始化数据库 with app.app_context(): db.create_all() # 预置一些初始资产 if SpatialAsset.query.count() == 0: assets = [ SpatialAsset(name="深港低空物流航线 A1", type="航线", status="闲置", yield_rate=12.5, value=150000, description="连接深圳与香港的关键物流低空通道"), SpatialAsset(name="轨道边缘算力节点-Alpha", type="算力", status="闲置", yield_rate=25.0, value=500000, description="位于近地轨道的边缘计算节点"), SpatialAsset(name="5G-G低空专用频谱-1", type="频谱", status="占用", yield_rate=8.0, value=1200000, description="低空无人机专用通信频谱资源"), SpatialAsset(name="上海临港无人机货运港", type="基础设施", status="维护", yield_rate=5.5, value=3500000, description="大型全自动无人机起降与货运中心"), SpatialAsset(name="北京大兴低空巡检线路", type="航线", status="闲置", yield_rate=15.2, value=220000, description="覆盖大兴机场周边区域的巡检航线"), SpatialAsset(name="粤港澳大湾区算力网关", type="算力", status="占用", yield_rate=18.5, value=850000, description="大湾区低空经济核心调度网关") ] db.session.bulk_save_objects(assets) db.session.commit() # --- Agent 核心逻辑 --- SILICON_FLOW_API_KEY = os.getenv("SILICON_FLOW_API_KEY", "sk-vimuseiptfbomzegyuvmebjzooncsqbyjtlddrfodzcdskgi") SILICON_FLOW_URL = "https://api.siliconflow.cn/v1/chat/completions" def call_llm(messages): """调用 LLM API""" headers = { "Authorization": f"Bearer {SILICON_FLOW_API_KEY}", "Content-Type": "application/json" } payload = { "model": "deepseek-ai/DeepSeek-V3", "messages": messages, "temperature": 0.7, "max_tokens": 2048 } try: response = requests.post(SILICON_FLOW_URL, json=payload, headers=headers, timeout=60) response.raise_for_status() return response.json()['choices'][0]['message']['content'] except Exception as e: return f"LLM 调用失败: {str(e)}" # 工具定义 def tool_list_assets(): """列出所有空间资产""" assets = SpatialAsset.query.all() return json.dumps([{"id": a.id, "name": a.name, "type": a.type, "status": a.status, "yield": a.yield_rate, "value": a.value} for a in assets], ensure_ascii=False) def tool_optimize_asset(asset_id, action): """优化资产状态以提高收益。action 可选: 'activate' (激活), 'maintenance' (维护), 'release' (释放)""" try: asset = SpatialAsset.query.get(int(asset_id)) if not asset: return "资产未找到" if action == "activate": asset.status = "占用" asset.yield_rate += 2.0 elif action == "maintenance": asset.status = "维护" asset.yield_rate = 0.0 elif action == "release": asset.status = "闲置" asset.yield_rate = max(0, asset.yield_rate - 1.0) db.session.commit() return f"资产 {asset.name} 状态已更新为 {asset.status}, 收益率更新为 {asset.yield_rate}%" except Exception as e: return f"优化失败: {str(e)}" def tool_market_analysis(asset_type): """分析特定类型资产的市场趋势""" market_data = { "航线": "低空物流需求激增,大湾区区域航线价值上升 15%,建议加大投入。", "算力": "由于边缘计算需求,轨道节点租赁价格上涨,建议保持高负荷运行。", "频谱": "政策调整,低空频谱准入收紧,资产稀缺性增加,长期看好。", "基础设施": "各地政府加大补贴力度,基础设施建设回报周期缩短。" } return market_data.get(asset_type, "暂无该类型市场数据,建议进行广泛市场调研。") def tool_add_asset(name, type, value, description=""): """新增一项空间资产。参数:名称, 类型, 初始价值, 描述""" try: new_asset = SpatialAsset(name=name, type=type, value=float(value), description=description, status="闲置", yield_rate=5.0) db.session.add(new_asset) db.session.commit() return f"成功添加资产: {name} ({type})" except Exception as e: return f"添加资产失败: {str(e)}" def tool_forecast_yield(asset_id): """预测特定资产在未来 30 天的收益走势""" asset = SpatialAsset.query.get(int(asset_id)) if not asset: return "资产不存在" import random trend = "上涨" if random.random() > 0.4 else "平稳" predicted_yield = asset.yield_rate * (1 + random.uniform(0.05, 0.2)) return f"资产 {asset.name} 预测走势:{trend}。预计收益率将达到 {predicted_yield:.2f}%。" AVAILABLE_TOOLS = { "list_assets": tool_list_assets, "optimize_asset": tool_optimize_asset, "market_analysis": tool_market_analysis, "add_asset": tool_add_asset, "forecast_yield": tool_forecast_yield } def run_agent(user_input): """ReAct Agent 运行循环""" system_prompt = """你是一个专业的‘空间收益智能代理’ (Spatial Yield Agent)。 你的目标是管理和优化低空经济与空间计算相关的数字资产。 你有以下工具可以使用: 1. list_assets(): 列出当前所有资产。 2. optimize_asset(asset_id, action): 优化特定资产。action 可选: 'activate', 'maintenance', 'release'。 3. market_analysis(asset_type): 分析特定类型资产的市场趋势。 4. add_asset(name, type, value, description): 添加新资产。 5. forecast_yield(asset_id): 预测资产收益走势。 请按照以下格式思考和行动: Thought: 思考当前状态和下一步计划。 Action: 工具名称(参数) Observation: 工具返回的结果。 ... (重复上述步骤) Final Answer: 最终给用户的回复(必须使用中文,提供专业的建议和分析)。 注意: - 当你输出 Action 后,请立即停止,等待 Observation。 - 最终答案要详尽,结合 Observation 给出具体的管理建议。 - 如果用户只是打招呼,直接 Final Answer 即可。""" messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_input} ] max_steps = 6 agent_thought_process = [] for step in range(max_steps): llm_output = call_llm(messages) if not llm_output or "LLM 调用失败" in llm_output: return f"Agent 遇到错误: {llm_output}", agent_thought_process # 检查是否包含 Action if "Action:" in llm_output: # 提取 Thought 和 Action parts = llm_output.split("Action:") thought_part = parts[0] action_line = parts[1].split("\n")[0].strip() llm_output = f"{thought_part}Action: {action_line}" agent_thought_process.append(llm_output) if "Final Answer:" in llm_output: final_answer = llm_output.split("Final Answer:")[-1].strip() # 保存日志 log = AgentLog( query=user_input, thought="\n".join(agent_thought_process), action="Multi-Step", result=final_answer, status="成功" ) db.session.add(log) db.session.commit() return final_answer, agent_thought_process if "Action:" in llm_output: try: action_line = [l for l in llm_output.split("\n") if "Action:" in l][0] action_content = action_line.replace("Action:", "").strip() import re match = re.match(r"(\w+)\((.*)\)", action_content) if match: func_name = match.group(1) args_str = match.group(2) if func_name in AVAILABLE_TOOLS: # 解析参数,处理逗号和引号 import ast # 尝试更安全的参数解析 try: # 包装成元组解析 args = ast.literal_eval(f"({args_str})") if not isinstance(args, tuple): args = (args,) except: # 回退到简单的分割 args = [a.strip().strip("'").strip('"') for a in args_str.split(",") if a.strip()] observation = AVAILABLE_TOOLS[func_name](*args) messages.append({"role": "assistant", "content": llm_output}) messages.append({"role": "user", "content": f"Observation: {observation}"}) agent_thought_process.append(f"Observation: {observation}") continue observation = "工具调用格式错误,请确保使用 '工具名(参数1, 参数2)' 格式。" messages.append({"role": "user", "content": f"Observation: {observation}"}) agent_thought_process.append(f"Observation: {observation}") except Exception as e: observation = f"工具执行出错: {str(e)}" messages.append({"role": "user", "content": f"Observation: {observation}"}) agent_thought_process.append(f"Observation: {observation}") continue return "Agent 思考时间过长,请尝试简化您的指令。", agent_thought_process # --- 路由设置 --- @app.route('/') def index(): return render_template('index.html') @app.route('/api/chat', methods=['POST']) def chat(): data = request.json user_input = data.get('message') if not user_input: return jsonify({"error": "消息不能为空"}), 400 answer, process = run_agent(user_input) # 解析 Markdown html_answer = markdown.markdown(answer) html_process = [markdown.markdown(p) for p in process] return jsonify({ "answer": html_answer, "process": html_process }) @app.route('/api/assets') def get_assets(): assets = SpatialAsset.query.all() return jsonify([{ "id": a.id, "name": a.name, "type": a.type, "status": a.status, "yield": a.yield_rate, "value": a.value, "description": a.description } for a in assets]) @app.route('/api/logs') def get_logs(): logs = AgentLog.query.order_by(AgentLog.timestamp.desc()).limit(10).all() return jsonify([{ "timestamp": l.timestamp.strftime("%Y-%m-%d %H:%M:%S"), "query": l.query, "result": l.result } for l in logs]) # 文件上传配置 UPLOAD_FOLDER = 'uploads' if not os.path.exists(UPLOAD_FOLDER): os.makedirs(UPLOAD_FOLDER) app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER @app.route('/api/upload', methods=['POST']) def upload_file(): """文件上传接口,支持大文件模拟与记录""" if 'file' not in request.files: return jsonify({"error": "未发现文件"}), 400 file = request.files['file'] if file.filename == '': return jsonify({"error": "未选择文件"}), 400 try: filename = file.filename filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) file.save(filepath) # 记录到数据库 new_file = UploadedFile( filename=filename, filepath=filepath, file_type=filename.split('.')[-1] if '.' in filename else 'unknown' ) db.session.add(new_file) db.session.commit() return jsonify({ "message": "文件上传成功", "filename": filename, "size": os.path.getsize(filepath) }) except Exception as e: return jsonify({"error": f"上传失败: {str(e)}"}), 500 @app.route('/api/stats') def get_stats(): """获取系统统计数据""" total_assets = SpatialAsset.query.count() total_value = db.session.query(db.func.sum(SpatialAsset.value)).scalar() or 0 avg_yield = db.session.query(db.func.avg(SpatialAsset.yield_rate)).scalar() or 0 return jsonify({ "total_assets": total_assets, "total_value": total_value, "avg_yield": round(avg_yield, 2) }) if __name__ == '__main__': # 确保在 0.0.0.0 上运行以便外部访问(如 HF Spaces) port = int(os.environ.get("PORT", 5001)) app.run(host='0.0.0.0', port=port, debug=True)