Trae Assistant
init microgrid dispatch agent
971cb65
import os
import json
import uuid
import math
import random
import sqlite3
import datetime
import hashlib
import requests
from flask import Flask, render_template, request, jsonify, g, send_file
from dotenv import load_dotenv
load_dotenv()
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
INSTANCE_DIR = os.path.join(BASE_DIR, "instance")
os.makedirs(INSTANCE_DIR, exist_ok=True)
app = Flask(__name__)
app.config["DATABASE"] = os.path.join(INSTANCE_DIR, "microgrid.db")
app.config["SILICONFLOW_API_KEY"] = os.getenv("SILICONFLOW_API_KEY", "").strip()
app.config["SILICONFLOW_BASE_URL"] = os.getenv("SILICONFLOW_BASE_URL", "https://api.siliconflow.cn/v1").strip()
app.config["SILICONFLOW_MODEL"] = os.getenv("SILICONFLOW_MODEL", "deepseek-ai/DeepSeek-V3").strip()
def get_db():
db = getattr(g, "_database", None)
if db is None:
db = g._database = sqlite3.connect(app.config["DATABASE"])
db.row_factory = sqlite3.Row
return db
@app.teardown_appcontext
def close_connection(exception):
db = getattr(g, "_database", None)
if db is not None:
db.close()
def now_ts():
return datetime.datetime.now().isoformat()
def init_db():
db = sqlite3.connect(app.config["DATABASE"])
cursor = db.cursor()
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS cycles (
id TEXT PRIMARY KEY,
title TEXT,
scenario TEXT,
result TEXT,
status TEXT,
created_at TEXT
)
"""
)
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
cycle_id TEXT,
role TEXT,
content TEXT,
created_at TEXT
)
"""
)
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS memories (
key TEXT PRIMARY KEY,
value TEXT,
updated_at TEXT
)
"""
)
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS validations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
cycle_id TEXT,
passed INTEGER,
issues TEXT,
created_at TEXT
)
"""
)
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS files (
id TEXT PRIMARY KEY,
filename TEXT,
size_bytes INTEGER,
sha256 TEXT,
content_type TEXT,
is_binary INTEGER,
storage_path TEXT,
created_at TEXT
)
"""
)
db.commit()
cursor.execute("SELECT COUNT(1) FROM cycles")
count = cursor.fetchone()[0]
if count == 0:
sample_id = str(uuid.uuid4())
scenario = {
"title": "示例:园区A峰值调度",
"demand_mw": 120,
"solar_mw": 42,
"wind_mw": 28,
"storage_mwh": 80,
"storage_soc": 0.45,
"priority": "成本优先",
"notes": "示例备注:保障冷链负荷,允许需求响应10%。",
}
db.execute(
"INSERT INTO cycles (id, title, scenario, result, status, created_at) VALUES (?, ?, ?, ?, ?, ?)",
(
sample_id,
scenario["title"],
json.dumps(scenario, ensure_ascii=False),
json.dumps(
{
"scenario": scenario,
"decision": {
"dispatch": {
"solar_mw": 42,
"wind_mw": 28,
"storage_discharge_mw": 18,
"storage_charge_mw": 0,
"grid_import_mw": 32,
"grid_export_mw": 0,
"demand_response_mw": 0,
"storage_soc_next": 0.22,
},
"kpi": {"cost": 27.5, "carbon": 16.8, "reliability": 0.99},
"notes": "示例调度已完成",
},
"validation": {"passed": True, "issues": ["校验通过"]},
"memory": {
"last_priority": "成本优先",
"last_cost": 27.5,
"last_carbon": 16.8,
"last_reliability": 0.99,
"iterations": 0,
},
"iterations": 0,
},
ensure_ascii=False,
),
"completed",
now_ts(),
),
)
db.commit()
db.close()
init_db()
def log_event(cycle_id, role, content):
db = get_db()
db.execute(
"INSERT INTO logs (cycle_id, role, content, created_at) VALUES (?, ?, ?, ?)",
(cycle_id, role, content, now_ts()),
)
db.commit()
def set_memory(key, value):
db = get_db()
row = db.execute("SELECT key FROM memories WHERE key = ?", (key,)).fetchone()
payload = json.dumps(value, ensure_ascii=False)
if row:
db.execute("UPDATE memories SET value = ?, updated_at = ? WHERE key = ?", (payload, now_ts(), key))
else:
db.execute("INSERT INTO memories (key, value, updated_at) VALUES (?, ?, ?)", (key, payload, now_ts()))
db.commit()
def get_memory(key, default=None):
db = get_db()
row = db.execute("SELECT value FROM memories WHERE key = ?", (key,)).fetchone()
if not row:
return default
try:
return json.loads(row["value"])
except json.JSONDecodeError:
return default
def call_llm_json(system_prompt, user_prompt):
api_key = app.config["SILICONFLOW_API_KEY"]
if not api_key:
return None
url = f"{app.config['SILICONFLOW_BASE_URL']}/chat/completions"
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
payload = {
"model": app.config["SILICONFLOW_MODEL"],
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
"temperature": 0.2,
"max_tokens": 900,
"response_format": {"type": "json_object"},
}
try:
response = requests.post(url, headers=headers, json=payload, timeout=25)
response.raise_for_status()
data = response.json()
content = data.get("choices", [{}])[0].get("message", {}).get("content", "")
return json.loads(content)
except Exception:
return None
def tool_forecast(scenario):
demand = scenario["demand_mw"]
solar = scenario["solar_mw"]
wind = scenario["wind_mw"]
forecast = []
for hour in range(6):
factor = 0.92 + random.random() * 0.16
forecast.append(
{
"hour": hour + 1,
"demand_mw": round(demand * factor, 2),
"solar_mw": round(solar * (0.7 + random.random() * 0.4), 2),
"wind_mw": round(wind * (0.6 + random.random() * 0.5), 2),
}
)
return forecast
def tool_price():
schedule = []
base = 0.52 + random.random() * 0.12
for hour in range(6):
peak = 1.25 if hour in (2, 3, 4) else 1.0
schedule.append(round(base * peak + random.random() * 0.08, 3))
return {
"energy_price": round(base, 3),
"carbon_price": round(0.08 + random.random() * 0.05, 3),
"schedule": schedule,
}
def tool_grid_constraints():
return {"max_import_mw": 80.0, "max_export_mw": 40.0}
def tool_demand_response():
return {"max_response_mw": 12.0, "response_cost": 0.15}
def planner_agent(scenario, tools):
system_prompt = "你是微电网调度规划智能体,输出JSON,包含目标、优先级、步骤。"
user_prompt = f"场景: {json.dumps(scenario, ensure_ascii=False)}\n工具: {json.dumps(tools, ensure_ascii=False)}"
result = call_llm_json(system_prompt, user_prompt)
if result:
return result
priority = scenario.get("priority", "成本优先")
return {
"目标": "在满足负荷的前提下优化成本与碳排",
"优先级": priority,
"步骤": [
"读取需求与可再生出力",
"评估储能SOC与可调度能力",
"结合电价与碳价决定充放电与并网",
"生成调度方案并进入校验",
],
}
def risk_agent(scenario, tools, plan):
system_prompt = "你是电力系统风险评估智能体,输出JSON,包含风险列表与缓释建议。"
user_prompt = f"计划: {json.dumps(plan, ensure_ascii=False)}\n场景: {json.dumps(scenario, ensure_ascii=False)}"
result = call_llm_json(system_prompt, user_prompt)
if result:
return result
risks = []
if scenario["storage_soc"] < 0.2:
risks.append("储能SOC偏低,抗波动能力下降")
if scenario["demand_mw"] > 120:
risks.append("负荷偏高,可能逼近并网上限")
return {
"风险": risks or ["暂无明显风险"],
"缓释": "必要时启用需求响应与负荷切分策略",
}
def optimizer_agent(scenario, tools, plan, risk):
demand = scenario["demand_mw"]
solar = scenario["solar_mw"]
wind = scenario["wind_mw"]
storage_mwh = scenario["storage_mwh"]
storage_soc = scenario["storage_soc"]
grid_limit = tools["grid"]["max_import_mw"]
export_limit = tools["grid"]["max_export_mw"]
response_limit = tools["response"]["max_response_mw"]
max_rate = storage_mwh * 0.5
available_discharge = storage_mwh * storage_soc
forecast = tools["forecast"]
schedule_price = tools["price"]["schedule"]
carbon_price = tools["price"]["carbon_price"]
grid_carbon = 0.72
priority = scenario.get("priority", "成本优先")
weight_cost = 0.5
weight_carbon = 0.3
weight_reliability = 0.2
if priority == "碳优先":
weight_cost, weight_carbon, weight_reliability = 0.2, 0.6, 0.2
elif priority == "可靠性优先":
weight_cost, weight_carbon, weight_reliability = 0.2, 0.2, 0.6
hourly = []
total_cost = 0.0
total_carbon = 0.0
total_demand = 0.0
total_unserved = 0.0
soc = storage_soc
for idx, point in enumerate(forecast):
hour_price = schedule_price[idx]
demand_h = point["demand_mw"]
solar_h = point["solar_mw"]
wind_h = point["wind_mw"]
total_demand += demand_h
max_rate_h = storage_mwh * 0.5
available_discharge_h = storage_mwh * soc
net = demand_h - solar_h - wind_h
if priority == "碳优先":
discharge = min(max_rate_h, available_discharge_h, max(0.0, net * 0.8))
elif priority == "成本优先" and hour_price < 0.6:
discharge = min(max_rate_h * 0.4, available_discharge_h, max(0.0, net))
else:
discharge = min(max_rate_h, available_discharge_h, max(0.0, net))
remaining = net - discharge
grid_import = min(max(0.0, remaining), grid_limit)
demand_response = min(response_limit, max(0.0, remaining - grid_import))
unserved = max(0.0, remaining - grid_import - demand_response)
surplus = max(0.0, solar_h + wind_h + discharge - demand_h)
charge = min(max_rate_h, storage_mwh * (1 - soc), surplus)
export = max(0.0, min(export_limit, surplus - charge))
soc = soc + (charge - discharge) / storage_mwh if storage_mwh > 0 else soc
soc = max(0.0, min(1.0, soc))
cost = grid_import * hour_price + demand_response * tools["response"]["response_cost"]
carbon = grid_import * grid_carbon - (solar_h + wind_h) * 0.18
total_cost += cost
total_carbon += carbon
total_unserved += unserved
hourly.append(
{
"hour": point["hour"],
"demand_mw": round(demand_h, 2),
"solar_mw": round(solar_h, 2),
"wind_mw": round(wind_h, 2),
"storage_discharge_mw": round(discharge, 2),
"storage_charge_mw": round(charge, 2),
"grid_import_mw": round(grid_import, 2),
"grid_export_mw": round(export, 2),
"demand_response_mw": round(demand_response, 2),
"unserved_mw": round(unserved, 2),
"soc": round(soc, 3),
"price": hour_price,
}
)
reliability = round(1 - (total_unserved / total_demand if total_demand > 0 else 0), 3)
cost = round(total_cost, 3)
carbon = round(total_carbon, 3)
score = round((100 - cost) * weight_cost + (100 - abs(carbon)) * weight_carbon + reliability * 100 * weight_reliability, 2)
return {
"dispatch": {
"solar_mw": round(solar, 2),
"wind_mw": round(wind, 2),
"storage_soc_next": round(soc, 3),
"hourly": hourly,
},
"kpi": {"cost": cost, "carbon": carbon, "reliability": reliability, "score": score},
"notes": "已综合考虑电价、碳价、需求响应与并网约束",
"plan": plan,
"risk": risk,
}
def validate_dispatch(scenario, decision, tools):
issues = []
hourly = decision["dispatch"].get("hourly", [])
grid_limit = tools["grid"]["max_import_mw"]
for point in hourly:
supplied = (
point["solar_mw"]
+ point["wind_mw"]
+ point["storage_discharge_mw"]
+ point["grid_import_mw"]
- point["storage_charge_mw"]
- point["grid_export_mw"]
)
if supplied + 0.01 < point["demand_mw"]:
issues.append(f"小时{point['hour']}供给不足")
if point["grid_import_mw"] > grid_limit + 0.01:
issues.append(f"小时{point['hour']}并网功率超限")
if not (0 <= point["soc"] <= 1):
issues.append(f"小时{point['hour']}储能SOC越界")
passed = len(issues) == 0
return {"passed": passed, "issues": issues or ["校验通过"]}
def iterate_adjustment(scenario, decision, tools):
dispatch = decision["dispatch"].copy()
hourly = []
grid_limit = tools["grid"]["max_import_mw"]
for point in dispatch.get("hourly", []):
if point["unserved_mw"] > 0:
extra = min(point["unserved_mw"], grid_limit - point["grid_import_mw"])
point["grid_import_mw"] = round(point["grid_import_mw"] + extra, 2)
point["unserved_mw"] = round(max(0.0, point["unserved_mw"] - extra), 2)
hourly.append(point)
dispatch["hourly"] = hourly
decision["dispatch"] = dispatch
return decision
def run_cycle(payload):
cycle_id = str(uuid.uuid4())
title = payload.get("title", "未命名调度")
scenario = {
"title": title,
"demand_mw": float(payload.get("demand_mw", 120)),
"solar_mw": float(payload.get("solar_mw", 40)),
"wind_mw": float(payload.get("wind_mw", 30)),
"storage_mwh": float(payload.get("storage_mwh", 80)),
"storage_soc": float(payload.get("storage_soc", 0.45)),
"priority": payload.get("priority", "成本优先"),
"notes": payload.get("notes", ""),
}
tools = {
"forecast": tool_forecast(scenario),
"price": tool_price(),
"grid": tool_grid_constraints(),
"response": tool_demand_response(),
}
db = get_db()
db.execute(
"INSERT INTO cycles (id, title, scenario, result, status, created_at) VALUES (?, ?, ?, ?, ?, ?)",
(cycle_id, title, json.dumps(scenario, ensure_ascii=False), "{}", "running", now_ts()),
)
db.commit()
log_event(cycle_id, "system", "进入闭环:推理/决策/工具行动/状态记忆/校验/迭代")
log_event(cycle_id, "tool_action", f"工具预测: {json.dumps(tools, ensure_ascii=False)}")
plan = planner_agent(scenario, tools)
log_event(cycle_id, "reasoning", json.dumps(plan, ensure_ascii=False))
risk = risk_agent(scenario, tools, plan)
log_event(cycle_id, "reasoning", json.dumps(risk, ensure_ascii=False))
decision = optimizer_agent(scenario, tools, plan, risk)
log_event(cycle_id, "decision", json.dumps(decision, ensure_ascii=False))
validation = validate_dispatch(scenario, decision, tools)
log_event(cycle_id, "validation", json.dumps(validation, ensure_ascii=False))
iteration_count = 0
while not validation["passed"] and iteration_count < 2:
iteration_count += 1
decision = iterate_adjustment(scenario, decision, tools)
log_event(cycle_id, "iteration", f"第{iteration_count}次迭代调整")
validation = validate_dispatch(scenario, decision, tools)
log_event(cycle_id, "validation", json.dumps(validation, ensure_ascii=False))
memory_snapshot = {
"last_priority": scenario["priority"],
"last_cost": decision["kpi"]["cost"],
"last_carbon": decision["kpi"]["carbon"],
"last_reliability": decision["kpi"]["reliability"],
"iterations": iteration_count,
"last_score": decision["kpi"]["score"],
}
set_memory("microgrid_state", memory_snapshot)
log_event(cycle_id, "memory", json.dumps(memory_snapshot, ensure_ascii=False))
db.execute(
"INSERT INTO validations (cycle_id, passed, issues, created_at) VALUES (?, ?, ?, ?)",
(cycle_id, 1 if validation["passed"] else 0, json.dumps(validation["issues"], ensure_ascii=False), now_ts()),
)
db.execute(
"UPDATE cycles SET result = ?, status = ? WHERE id = ?",
(
json.dumps(
{
"scenario": scenario,
"decision": decision,
"validation": validation,
"memory": memory_snapshot,
"iterations": iteration_count,
},
ensure_ascii=False,
),
"completed" if validation["passed"] else "needs_review",
cycle_id,
),
)
db.commit()
return {
"id": cycle_id,
"status": "completed" if validation["passed"] else "needs_review",
"scenario": scenario,
"decision": decision,
"validation": validation,
"memory": memory_snapshot,
"iterations": iteration_count,
}
@app.route("/")
def index():
return render_template("index.html")
@app.route("/api/run_cycle", methods=["POST"])
def api_run_cycle():
payload = request.json or {}
result = run_cycle(payload)
return jsonify(result)
@app.route("/api/cycles", methods=["GET"])
def api_cycles():
db = get_db()
rows = db.execute(
"SELECT id, title, status, created_at FROM cycles ORDER BY created_at DESC LIMIT 30"
).fetchall()
return jsonify(
[
{
"id": row["id"],
"title": row["title"],
"status": row["status"],
"created_at": row["created_at"],
}
for row in rows
]
)
@app.route("/api/cycles/<cycle_id>", methods=["GET"])
def api_cycle_detail(cycle_id):
db = get_db()
row = db.execute("SELECT * FROM cycles WHERE id = ?", (cycle_id,)).fetchone()
if not row:
return jsonify({"error": "未找到任务"}), 404
result = json.loads(row["result"] or "{}")
return jsonify(result)
@app.route("/api/replay/<cycle_id>", methods=["GET"])
def api_replay(cycle_id):
db = get_db()
logs = db.execute(
"SELECT role, content, created_at FROM logs WHERE cycle_id = ? ORDER BY id ASC", (cycle_id,)
).fetchall()
return jsonify(
[
{"role": row["role"], "content": row["content"], "created_at": row["created_at"]}
for row in logs
]
)
@app.route("/api/memory", methods=["GET"])
def api_memory():
snapshot = get_memory("microgrid_state", {})
return jsonify(snapshot)
@app.route("/api/stats", methods=["GET"])
def api_stats():
db = get_db()
rows = db.execute("SELECT result, status, created_at FROM cycles").fetchall()
total = len(rows)
completed = sum(1 for r in rows if r["status"] == "completed")
costs = []
carbons = []
for row in rows:
try:
result = json.loads(row["result"] or "{}")
kpi = result.get("decision", {}).get("kpi", {})
if "cost" in kpi:
costs.append(kpi["cost"])
if "carbon" in kpi:
carbons.append(kpi["carbon"])
except Exception:
continue
return jsonify(
{
"total": total,
"completed": completed,
"avg_cost": round(sum(costs) / len(costs), 3) if costs else 0,
"avg_carbon": round(sum(carbons) / len(carbons), 3) if carbons else 0,
"last_run": rows[0]["created_at"] if rows else "暂无",
}
)
@app.route("/api/scenario_suggest", methods=["GET"])
def api_scenario_suggest():
last = get_memory("microgrid_state", {})
return jsonify(
{
"title": "智能建议场景",
"demand_mw": 110 + random.randint(-10, 12),
"solar_mw": 45 + random.randint(-8, 6),
"wind_mw": 30 + random.randint(-6, 6),
"storage_mwh": 90 + random.randint(-15, 15),
"storage_soc": round(0.35 + random.random() * 0.45, 2),
"priority": last.get("last_priority", "成本优先"),
"notes": "建议场景:参考历史记忆与负荷波动生成。",
}
)
def detect_binary(file_path):
with open(file_path, "rb") as f:
chunk = f.read(1024)
return b"\0" in chunk
def finalize_file(file_id, filename, storage_path, content_type):
size = os.path.getsize(storage_path)
sha256_hash = hashlib.sha256()
with open(storage_path, "rb") as f:
for block in iter(lambda: f.read(4096), b""):
sha256_hash.update(block)
is_binary = 1 if detect_binary(storage_path) else 0
db = get_db()
db.execute(
"INSERT INTO files (id, filename, size_bytes, sha256, content_type, is_binary, storage_path, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(file_id, filename, size, sha256_hash.hexdigest(), content_type or "", is_binary, storage_path, now_ts()),
)
db.commit()
return {
"id": file_id,
"filename": filename,
"size_bytes": size,
"sha256": sha256_hash.hexdigest(),
"is_binary": bool(is_binary),
}
@app.route("/api/upload", methods=["POST"])
def api_upload():
if "file" not in request.files:
return jsonify({"error": "未检测到文件"}), 400
file = request.files["file"]
if not file.filename:
return jsonify({"error": "文件名为空"}), 400
filename = os.path.basename(file.filename)
file_id = str(uuid.uuid4())
storage_path = os.path.join(INSTANCE_DIR, f"{file_id}_{filename}")
file.save(storage_path)
payload = finalize_file(file_id, filename, storage_path, file.content_type)
return jsonify({"status": "ok", "file": payload})
@app.route("/api/upload_chunk", methods=["POST"])
def api_upload_chunk():
file_id = request.headers.get("X-File-Id")
filename = request.headers.get("X-File-Name")
total = int(request.headers.get("X-Chunk-Total", "1"))
index = int(request.headers.get("X-Chunk-Index", "0"))
if not file_id or not filename:
return jsonify({"error": "缺少文件标识"}), 400
chunk = request.data
if not chunk:
return jsonify({"error": "分片内容为空"}), 400
temp_path = os.path.join(INSTANCE_DIR, f"{file_id}.part")
with open(temp_path, "ab") as f:
f.write(chunk)
if index + 1 == total:
final_name = os.path.basename(filename)
final_path = os.path.join(INSTANCE_DIR, f"{file_id}_{final_name}")
os.replace(temp_path, final_path)
payload = finalize_file(file_id, final_name, final_path, request.headers.get("X-Content-Type"))
return jsonify({"status": "completed", "file": payload})
return jsonify({"status": "chunk_received", "index": index})
@app.route("/api/files", methods=["GET"])
def api_files():
db = get_db()
rows = db.execute(
"SELECT id, filename, size_bytes, sha256, is_binary, created_at FROM files ORDER BY created_at DESC LIMIT 50"
).fetchall()
return jsonify(
[
{
"id": row["id"],
"filename": row["filename"],
"size_bytes": row["size_bytes"],
"sha256": row["sha256"],
"is_binary": bool(row["is_binary"]),
"created_at": row["created_at"],
}
for row in rows
]
)
@app.route("/api/files/<file_id>", methods=["GET"])
def api_download(file_id):
db = get_db()
row = db.execute("SELECT storage_path, filename FROM files WHERE id = ?", (file_id,)).fetchone()
if not row:
return jsonify({"error": "未找到文件"}), 404
return send_file(row["storage_path"], as_attachment=True, download_name=row["filename"])
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860, debug=True)