Trae Assistant
Initial commit: Enhanced Spatial Yield Agent
bfdc641
import os
import json
import time
import requests
import markdown
from flask import Flask, render_template, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
app = Flask(__name__)
# 数据库配置,使用 SQLite 进行数据持久化
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///spatial_yield.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SECRET_KEY'] = 'spatial-yield-secret-key'
db = SQLAlchemy(app)
# --- 数据库模型 ---
class SpatialAsset(db.Model):
"""空间资产模型:管理低空航线、算力节点等资产"""
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
type = db.Column(db.String(50)) # 航线, 算力, 频谱, 基础设施
status = db.Column(db.String(50), default='闲置') # 闲置, 占用, 维护
yield_rate = db.Column(db.Float, default=0.0) # 收益率 (%)
value = db.Column(db.Float, default=0.0) # 当前价值 (CNY)
description = db.Column(db.Text, default="")
last_updated = db.Column(db.DateTime, default=datetime.utcnow)
class AgentLog(db.Model):
"""Agent 推理与决策日志"""
id = db.Column(db.Integer, primary_key=True)
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
query = db.Column(db.Text)
thought = db.Column(db.Text)
action = db.Column(db.Text)
result = db.Column(db.Text)
status = db.Column(db.String(50)) # 成功, 失败
class UploadedFile(db.Model):
"""文件上传记录"""
id = db.Column(db.Integer, primary_key=True)
filename = db.Column(db.String(255), nullable=False)
filepath = db.Column(db.String(512), nullable=False)
file_type = db.Column(db.String(50))
upload_time = db.Column(db.DateTime, default=datetime.utcnow)
# 初始化数据库
with app.app_context():
db.create_all()
# 预置一些初始资产
if SpatialAsset.query.count() == 0:
assets = [
SpatialAsset(name="深港低空物流航线 A1", type="航线", status="闲置", yield_rate=12.5, value=150000, description="连接深圳与香港的关键物流低空通道"),
SpatialAsset(name="轨道边缘算力节点-Alpha", type="算力", status="闲置", yield_rate=25.0, value=500000, description="位于近地轨道的边缘计算节点"),
SpatialAsset(name="5G-G低空专用频谱-1", type="频谱", status="占用", yield_rate=8.0, value=1200000, description="低空无人机专用通信频谱资源"),
SpatialAsset(name="上海临港无人机货运港", type="基础设施", status="维护", yield_rate=5.5, value=3500000, description="大型全自动无人机起降与货运中心"),
SpatialAsset(name="北京大兴低空巡检线路", type="航线", status="闲置", yield_rate=15.2, value=220000, description="覆盖大兴机场周边区域的巡检航线"),
SpatialAsset(name="粤港澳大湾区算力网关", type="算力", status="占用", yield_rate=18.5, value=850000, description="大湾区低空经济核心调度网关")
]
db.session.bulk_save_objects(assets)
db.session.commit()
# --- Agent 核心逻辑 ---
SILICON_FLOW_API_KEY = os.getenv("SILICON_FLOW_API_KEY", "sk-vimuseiptfbomzegyuvmebjzooncsqbyjtlddrfodzcdskgi")
SILICON_FLOW_URL = "https://api.siliconflow.cn/v1/chat/completions"
def call_llm(messages):
"""调用 LLM API"""
headers = {
"Authorization": f"Bearer {SILICON_FLOW_API_KEY}",
"Content-Type": "application/json"
}
payload = {
"model": "deepseek-ai/DeepSeek-V3",
"messages": messages,
"temperature": 0.7,
"max_tokens": 2048
}
try:
response = requests.post(SILICON_FLOW_URL, json=payload, headers=headers, timeout=60)
response.raise_for_status()
return response.json()['choices'][0]['message']['content']
except Exception as e:
return f"LLM 调用失败: {str(e)}"
# 工具定义
def tool_list_assets():
"""列出所有空间资产"""
assets = SpatialAsset.query.all()
return json.dumps([{"id": a.id, "name": a.name, "type": a.type, "status": a.status, "yield": a.yield_rate, "value": a.value} for a in assets], ensure_ascii=False)
def tool_optimize_asset(asset_id, action):
"""优化资产状态以提高收益。action 可选: 'activate' (激活), 'maintenance' (维护), 'release' (释放)"""
try:
asset = SpatialAsset.query.get(int(asset_id))
if not asset:
return "资产未找到"
if action == "activate":
asset.status = "占用"
asset.yield_rate += 2.0
elif action == "maintenance":
asset.status = "维护"
asset.yield_rate = 0.0
elif action == "release":
asset.status = "闲置"
asset.yield_rate = max(0, asset.yield_rate - 1.0)
db.session.commit()
return f"资产 {asset.name} 状态已更新为 {asset.status}, 收益率更新为 {asset.yield_rate}%"
except Exception as e:
return f"优化失败: {str(e)}"
def tool_market_analysis(asset_type):
"""分析特定类型资产的市场趋势"""
market_data = {
"航线": "低空物流需求激增,大湾区区域航线价值上升 15%,建议加大投入。",
"算力": "由于边缘计算需求,轨道节点租赁价格上涨,建议保持高负荷运行。",
"频谱": "政策调整,低空频谱准入收紧,资产稀缺性增加,长期看好。",
"基础设施": "各地政府加大补贴力度,基础设施建设回报周期缩短。"
}
return market_data.get(asset_type, "暂无该类型市场数据,建议进行广泛市场调研。")
def tool_add_asset(name, type, value, description=""):
"""新增一项空间资产。参数:名称, 类型, 初始价值, 描述"""
try:
new_asset = SpatialAsset(name=name, type=type, value=float(value), description=description, status="闲置", yield_rate=5.0)
db.session.add(new_asset)
db.session.commit()
return f"成功添加资产: {name} ({type})"
except Exception as e:
return f"添加资产失败: {str(e)}"
def tool_forecast_yield(asset_id):
"""预测特定资产在未来 30 天的收益走势"""
asset = SpatialAsset.query.get(int(asset_id))
if not asset:
return "资产不存在"
import random
trend = "上涨" if random.random() > 0.4 else "平稳"
predicted_yield = asset.yield_rate * (1 + random.uniform(0.05, 0.2))
return f"资产 {asset.name} 预测走势:{trend}。预计收益率将达到 {predicted_yield:.2f}%。"
AVAILABLE_TOOLS = {
"list_assets": tool_list_assets,
"optimize_asset": tool_optimize_asset,
"market_analysis": tool_market_analysis,
"add_asset": tool_add_asset,
"forecast_yield": tool_forecast_yield
}
def run_agent(user_input):
"""ReAct Agent 运行循环"""
system_prompt = """你是一个专业的‘空间收益智能代理’ (Spatial Yield Agent)。
你的目标是管理和优化低空经济与空间计算相关的数字资产。
你有以下工具可以使用:
1. list_assets(): 列出当前所有资产。
2. optimize_asset(asset_id, action): 优化特定资产。action 可选: 'activate', 'maintenance', 'release'。
3. market_analysis(asset_type): 分析特定类型资产的市场趋势。
4. add_asset(name, type, value, description): 添加新资产。
5. forecast_yield(asset_id): 预测资产收益走势。
请按照以下格式思考和行动:
Thought: 思考当前状态和下一步计划。
Action: 工具名称(参数)
Observation: 工具返回的结果。
... (重复上述步骤)
Final Answer: 最终给用户的回复(必须使用中文,提供专业的建议和分析)。
注意:
- 当你输出 Action 后,请立即停止,等待 Observation。
- 最终答案要详尽,结合 Observation 给出具体的管理建议。
- 如果用户只是打招呼,直接 Final Answer 即可。"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_input}
]
max_steps = 6
agent_thought_process = []
for step in range(max_steps):
llm_output = call_llm(messages)
if not llm_output or "LLM 调用失败" in llm_output:
return f"Agent 遇到错误: {llm_output}", agent_thought_process
# 检查是否包含 Action
if "Action:" in llm_output:
# 提取 Thought 和 Action
parts = llm_output.split("Action:")
thought_part = parts[0]
action_line = parts[1].split("\n")[0].strip()
llm_output = f"{thought_part}Action: {action_line}"
agent_thought_process.append(llm_output)
if "Final Answer:" in llm_output:
final_answer = llm_output.split("Final Answer:")[-1].strip()
# 保存日志
log = AgentLog(
query=user_input,
thought="\n".join(agent_thought_process),
action="Multi-Step",
result=final_answer,
status="成功"
)
db.session.add(log)
db.session.commit()
return final_answer, agent_thought_process
if "Action:" in llm_output:
try:
action_line = [l for l in llm_output.split("\n") if "Action:" in l][0]
action_content = action_line.replace("Action:", "").strip()
import re
match = re.match(r"(\w+)\((.*)\)", action_content)
if match:
func_name = match.group(1)
args_str = match.group(2)
if func_name in AVAILABLE_TOOLS:
# 解析参数,处理逗号和引号
import ast
# 尝试更安全的参数解析
try:
# 包装成元组解析
args = ast.literal_eval(f"({args_str})")
if not isinstance(args, tuple):
args = (args,)
except:
# 回退到简单的分割
args = [a.strip().strip("'").strip('"') for a in args_str.split(",") if a.strip()]
observation = AVAILABLE_TOOLS[func_name](*args)
messages.append({"role": "assistant", "content": llm_output})
messages.append({"role": "user", "content": f"Observation: {observation}"})
agent_thought_process.append(f"Observation: {observation}")
continue
observation = "工具调用格式错误,请确保使用 '工具名(参数1, 参数2)' 格式。"
messages.append({"role": "user", "content": f"Observation: {observation}"})
agent_thought_process.append(f"Observation: {observation}")
except Exception as e:
observation = f"工具执行出错: {str(e)}"
messages.append({"role": "user", "content": f"Observation: {observation}"})
agent_thought_process.append(f"Observation: {observation}")
continue
return "Agent 思考时间过长,请尝试简化您的指令。", agent_thought_process
# --- 路由设置 ---
@app.route('/')
def index():
return render_template('index.html')
@app.route('/api/chat', methods=['POST'])
def chat():
data = request.json
user_input = data.get('message')
if not user_input:
return jsonify({"error": "消息不能为空"}), 400
answer, process = run_agent(user_input)
# 解析 Markdown
html_answer = markdown.markdown(answer)
html_process = [markdown.markdown(p) for p in process]
return jsonify({
"answer": html_answer,
"process": html_process
})
@app.route('/api/assets')
def get_assets():
assets = SpatialAsset.query.all()
return jsonify([{
"id": a.id,
"name": a.name,
"type": a.type,
"status": a.status,
"yield": a.yield_rate,
"value": a.value,
"description": a.description
} for a in assets])
@app.route('/api/logs')
def get_logs():
logs = AgentLog.query.order_by(AgentLog.timestamp.desc()).limit(10).all()
return jsonify([{
"timestamp": l.timestamp.strftime("%Y-%m-%d %H:%M:%S"),
"query": l.query,
"result": l.result
} for l in logs])
# 文件上传配置
UPLOAD_FOLDER = 'uploads'
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
@app.route('/api/upload', methods=['POST'])
def upload_file():
"""文件上传接口,支持大文件模拟与记录"""
if 'file' not in request.files:
return jsonify({"error": "未发现文件"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"error": "未选择文件"}), 400
try:
filename = file.filename
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
# 记录到数据库
new_file = UploadedFile(
filename=filename,
filepath=filepath,
file_type=filename.split('.')[-1] if '.' in filename else 'unknown'
)
db.session.add(new_file)
db.session.commit()
return jsonify({
"message": "文件上传成功",
"filename": filename,
"size": os.path.getsize(filepath)
})
except Exception as e:
return jsonify({"error": f"上传失败: {str(e)}"}), 500
@app.route('/api/stats')
def get_stats():
"""获取系统统计数据"""
total_assets = SpatialAsset.query.count()
total_value = db.session.query(db.func.sum(SpatialAsset.value)).scalar() or 0
avg_yield = db.session.query(db.func.avg(SpatialAsset.yield_rate)).scalar() or 0
return jsonify({
"total_assets": total_assets,
"total_value": total_value,
"avg_yield": round(avg_yield, 2)
})
if __name__ == '__main__':
# 确保在 0.0.0.0 上运行以便外部访问(如 HF Spaces)
port = int(os.environ.get("PORT", 5001))
app.run(host='0.0.0.0', port=port, debug=True)