import os import json import uuid import math import random import sqlite3 import datetime import hashlib import requests from flask import Flask, render_template, request, jsonify, g, send_file from dotenv import load_dotenv load_dotenv() BASE_DIR = os.path.dirname(os.path.abspath(__file__)) INSTANCE_DIR = os.path.join(BASE_DIR, "instance") os.makedirs(INSTANCE_DIR, exist_ok=True) app = Flask(__name__) app.config["DATABASE"] = os.path.join(INSTANCE_DIR, "microgrid.db") app.config["SILICONFLOW_API_KEY"] = os.getenv("SILICONFLOW_API_KEY", "").strip() app.config["SILICONFLOW_BASE_URL"] = os.getenv("SILICONFLOW_BASE_URL", "https://api.siliconflow.cn/v1").strip() app.config["SILICONFLOW_MODEL"] = os.getenv("SILICONFLOW_MODEL", "deepseek-ai/DeepSeek-V3").strip() def get_db(): db = getattr(g, "_database", None) if db is None: db = g._database = sqlite3.connect(app.config["DATABASE"]) db.row_factory = sqlite3.Row return db @app.teardown_appcontext def close_connection(exception): db = getattr(g, "_database", None) if db is not None: db.close() def now_ts(): return datetime.datetime.now().isoformat() def init_db(): db = sqlite3.connect(app.config["DATABASE"]) cursor = db.cursor() cursor.execute( """ CREATE TABLE IF NOT EXISTS cycles ( id TEXT PRIMARY KEY, title TEXT, scenario TEXT, result TEXT, status TEXT, created_at TEXT ) """ ) cursor.execute( """ CREATE TABLE IF NOT EXISTS logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, cycle_id TEXT, role TEXT, content TEXT, created_at TEXT ) """ ) cursor.execute( """ CREATE TABLE IF NOT EXISTS memories ( key TEXT PRIMARY KEY, value TEXT, updated_at TEXT ) """ ) cursor.execute( """ CREATE TABLE IF NOT EXISTS validations ( id INTEGER PRIMARY KEY AUTOINCREMENT, cycle_id TEXT, passed INTEGER, issues TEXT, created_at TEXT ) """ ) cursor.execute( """ CREATE TABLE IF NOT EXISTS files ( id TEXT PRIMARY KEY, filename TEXT, size_bytes INTEGER, sha256 TEXT, content_type TEXT, is_binary INTEGER, storage_path TEXT, created_at TEXT ) """ ) db.commit() cursor.execute("SELECT COUNT(1) FROM cycles") count = cursor.fetchone()[0] if count == 0: sample_id = str(uuid.uuid4()) scenario = { "title": "示例:园区A峰值调度", "demand_mw": 120, "solar_mw": 42, "wind_mw": 28, "storage_mwh": 80, "storage_soc": 0.45, "priority": "成本优先", "notes": "示例备注:保障冷链负荷,允许需求响应10%。", } db.execute( "INSERT INTO cycles (id, title, scenario, result, status, created_at) VALUES (?, ?, ?, ?, ?, ?)", ( sample_id, scenario["title"], json.dumps(scenario, ensure_ascii=False), json.dumps( { "scenario": scenario, "decision": { "dispatch": { "solar_mw": 42, "wind_mw": 28, "storage_discharge_mw": 18, "storage_charge_mw": 0, "grid_import_mw": 32, "grid_export_mw": 0, "demand_response_mw": 0, "storage_soc_next": 0.22, }, "kpi": {"cost": 27.5, "carbon": 16.8, "reliability": 0.99}, "notes": "示例调度已完成", }, "validation": {"passed": True, "issues": ["校验通过"]}, "memory": { "last_priority": "成本优先", "last_cost": 27.5, "last_carbon": 16.8, "last_reliability": 0.99, "iterations": 0, }, "iterations": 0, }, ensure_ascii=False, ), "completed", now_ts(), ), ) db.commit() db.close() init_db() def log_event(cycle_id, role, content): db = get_db() db.execute( "INSERT INTO logs (cycle_id, role, content, created_at) VALUES (?, ?, ?, ?)", (cycle_id, role, content, now_ts()), ) db.commit() def set_memory(key, value): db = get_db() row = db.execute("SELECT key FROM memories WHERE key = ?", (key,)).fetchone() payload = json.dumps(value, ensure_ascii=False) if row: db.execute("UPDATE memories SET value = ?, updated_at = ? WHERE key = ?", (payload, now_ts(), key)) else: db.execute("INSERT INTO memories (key, value, updated_at) VALUES (?, ?, ?)", (key, payload, now_ts())) db.commit() def get_memory(key, default=None): db = get_db() row = db.execute("SELECT value FROM memories WHERE key = ?", (key,)).fetchone() if not row: return default try: return json.loads(row["value"]) except json.JSONDecodeError: return default def call_llm_json(system_prompt, user_prompt): api_key = app.config["SILICONFLOW_API_KEY"] if not api_key: return None url = f"{app.config['SILICONFLOW_BASE_URL']}/chat/completions" headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} payload = { "model": app.config["SILICONFLOW_MODEL"], "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ], "temperature": 0.2, "max_tokens": 900, "response_format": {"type": "json_object"}, } try: response = requests.post(url, headers=headers, json=payload, timeout=25) response.raise_for_status() data = response.json() content = data.get("choices", [{}])[0].get("message", {}).get("content", "") return json.loads(content) except Exception: return None def tool_forecast(scenario): demand = scenario["demand_mw"] solar = scenario["solar_mw"] wind = scenario["wind_mw"] forecast = [] for hour in range(6): factor = 0.92 + random.random() * 0.16 forecast.append( { "hour": hour + 1, "demand_mw": round(demand * factor, 2), "solar_mw": round(solar * (0.7 + random.random() * 0.4), 2), "wind_mw": round(wind * (0.6 + random.random() * 0.5), 2), } ) return forecast def tool_price(): schedule = [] base = 0.52 + random.random() * 0.12 for hour in range(6): peak = 1.25 if hour in (2, 3, 4) else 1.0 schedule.append(round(base * peak + random.random() * 0.08, 3)) return { "energy_price": round(base, 3), "carbon_price": round(0.08 + random.random() * 0.05, 3), "schedule": schedule, } def tool_grid_constraints(): return {"max_import_mw": 80.0, "max_export_mw": 40.0} def tool_demand_response(): return {"max_response_mw": 12.0, "response_cost": 0.15} def planner_agent(scenario, tools): system_prompt = "你是微电网调度规划智能体,输出JSON,包含目标、优先级、步骤。" user_prompt = f"场景: {json.dumps(scenario, ensure_ascii=False)}\n工具: {json.dumps(tools, ensure_ascii=False)}" result = call_llm_json(system_prompt, user_prompt) if result: return result priority = scenario.get("priority", "成本优先") return { "目标": "在满足负荷的前提下优化成本与碳排", "优先级": priority, "步骤": [ "读取需求与可再生出力", "评估储能SOC与可调度能力", "结合电价与碳价决定充放电与并网", "生成调度方案并进入校验", ], } def risk_agent(scenario, tools, plan): system_prompt = "你是电力系统风险评估智能体,输出JSON,包含风险列表与缓释建议。" user_prompt = f"计划: {json.dumps(plan, ensure_ascii=False)}\n场景: {json.dumps(scenario, ensure_ascii=False)}" result = call_llm_json(system_prompt, user_prompt) if result: return result risks = [] if scenario["storage_soc"] < 0.2: risks.append("储能SOC偏低,抗波动能力下降") if scenario["demand_mw"] > 120: risks.append("负荷偏高,可能逼近并网上限") return { "风险": risks or ["暂无明显风险"], "缓释": "必要时启用需求响应与负荷切分策略", } def optimizer_agent(scenario, tools, plan, risk): demand = scenario["demand_mw"] solar = scenario["solar_mw"] wind = scenario["wind_mw"] storage_mwh = scenario["storage_mwh"] storage_soc = scenario["storage_soc"] grid_limit = tools["grid"]["max_import_mw"] export_limit = tools["grid"]["max_export_mw"] response_limit = tools["response"]["max_response_mw"] max_rate = storage_mwh * 0.5 available_discharge = storage_mwh * storage_soc forecast = tools["forecast"] schedule_price = tools["price"]["schedule"] carbon_price = tools["price"]["carbon_price"] grid_carbon = 0.72 priority = scenario.get("priority", "成本优先") weight_cost = 0.5 weight_carbon = 0.3 weight_reliability = 0.2 if priority == "碳优先": weight_cost, weight_carbon, weight_reliability = 0.2, 0.6, 0.2 elif priority == "可靠性优先": weight_cost, weight_carbon, weight_reliability = 0.2, 0.2, 0.6 hourly = [] total_cost = 0.0 total_carbon = 0.0 total_demand = 0.0 total_unserved = 0.0 soc = storage_soc for idx, point in enumerate(forecast): hour_price = schedule_price[idx] demand_h = point["demand_mw"] solar_h = point["solar_mw"] wind_h = point["wind_mw"] total_demand += demand_h max_rate_h = storage_mwh * 0.5 available_discharge_h = storage_mwh * soc net = demand_h - solar_h - wind_h if priority == "碳优先": discharge = min(max_rate_h, available_discharge_h, max(0.0, net * 0.8)) elif priority == "成本优先" and hour_price < 0.6: discharge = min(max_rate_h * 0.4, available_discharge_h, max(0.0, net)) else: discharge = min(max_rate_h, available_discharge_h, max(0.0, net)) remaining = net - discharge grid_import = min(max(0.0, remaining), grid_limit) demand_response = min(response_limit, max(0.0, remaining - grid_import)) unserved = max(0.0, remaining - grid_import - demand_response) surplus = max(0.0, solar_h + wind_h + discharge - demand_h) charge = min(max_rate_h, storage_mwh * (1 - soc), surplus) export = max(0.0, min(export_limit, surplus - charge)) soc = soc + (charge - discharge) / storage_mwh if storage_mwh > 0 else soc soc = max(0.0, min(1.0, soc)) cost = grid_import * hour_price + demand_response * tools["response"]["response_cost"] carbon = grid_import * grid_carbon - (solar_h + wind_h) * 0.18 total_cost += cost total_carbon += carbon total_unserved += unserved hourly.append( { "hour": point["hour"], "demand_mw": round(demand_h, 2), "solar_mw": round(solar_h, 2), "wind_mw": round(wind_h, 2), "storage_discharge_mw": round(discharge, 2), "storage_charge_mw": round(charge, 2), "grid_import_mw": round(grid_import, 2), "grid_export_mw": round(export, 2), "demand_response_mw": round(demand_response, 2), "unserved_mw": round(unserved, 2), "soc": round(soc, 3), "price": hour_price, } ) reliability = round(1 - (total_unserved / total_demand if total_demand > 0 else 0), 3) cost = round(total_cost, 3) carbon = round(total_carbon, 3) score = round((100 - cost) * weight_cost + (100 - abs(carbon)) * weight_carbon + reliability * 100 * weight_reliability, 2) return { "dispatch": { "solar_mw": round(solar, 2), "wind_mw": round(wind, 2), "storage_soc_next": round(soc, 3), "hourly": hourly, }, "kpi": {"cost": cost, "carbon": carbon, "reliability": reliability, "score": score}, "notes": "已综合考虑电价、碳价、需求响应与并网约束", "plan": plan, "risk": risk, } def validate_dispatch(scenario, decision, tools): issues = [] hourly = decision["dispatch"].get("hourly", []) grid_limit = tools["grid"]["max_import_mw"] for point in hourly: supplied = ( point["solar_mw"] + point["wind_mw"] + point["storage_discharge_mw"] + point["grid_import_mw"] - point["storage_charge_mw"] - point["grid_export_mw"] ) if supplied + 0.01 < point["demand_mw"]: issues.append(f"小时{point['hour']}供给不足") if point["grid_import_mw"] > grid_limit + 0.01: issues.append(f"小时{point['hour']}并网功率超限") if not (0 <= point["soc"] <= 1): issues.append(f"小时{point['hour']}储能SOC越界") passed = len(issues) == 0 return {"passed": passed, "issues": issues or ["校验通过"]} def iterate_adjustment(scenario, decision, tools): dispatch = decision["dispatch"].copy() hourly = [] grid_limit = tools["grid"]["max_import_mw"] for point in dispatch.get("hourly", []): if point["unserved_mw"] > 0: extra = min(point["unserved_mw"], grid_limit - point["grid_import_mw"]) point["grid_import_mw"] = round(point["grid_import_mw"] + extra, 2) point["unserved_mw"] = round(max(0.0, point["unserved_mw"] - extra), 2) hourly.append(point) dispatch["hourly"] = hourly decision["dispatch"] = dispatch return decision def run_cycle(payload): cycle_id = str(uuid.uuid4()) title = payload.get("title", "未命名调度") scenario = { "title": title, "demand_mw": float(payload.get("demand_mw", 120)), "solar_mw": float(payload.get("solar_mw", 40)), "wind_mw": float(payload.get("wind_mw", 30)), "storage_mwh": float(payload.get("storage_mwh", 80)), "storage_soc": float(payload.get("storage_soc", 0.45)), "priority": payload.get("priority", "成本优先"), "notes": payload.get("notes", ""), } tools = { "forecast": tool_forecast(scenario), "price": tool_price(), "grid": tool_grid_constraints(), "response": tool_demand_response(), } db = get_db() db.execute( "INSERT INTO cycles (id, title, scenario, result, status, created_at) VALUES (?, ?, ?, ?, ?, ?)", (cycle_id, title, json.dumps(scenario, ensure_ascii=False), "{}", "running", now_ts()), ) db.commit() log_event(cycle_id, "system", "进入闭环:推理/决策/工具行动/状态记忆/校验/迭代") log_event(cycle_id, "tool_action", f"工具预测: {json.dumps(tools, ensure_ascii=False)}") plan = planner_agent(scenario, tools) log_event(cycle_id, "reasoning", json.dumps(plan, ensure_ascii=False)) risk = risk_agent(scenario, tools, plan) log_event(cycle_id, "reasoning", json.dumps(risk, ensure_ascii=False)) decision = optimizer_agent(scenario, tools, plan, risk) log_event(cycle_id, "decision", json.dumps(decision, ensure_ascii=False)) validation = validate_dispatch(scenario, decision, tools) log_event(cycle_id, "validation", json.dumps(validation, ensure_ascii=False)) iteration_count = 0 while not validation["passed"] and iteration_count < 2: iteration_count += 1 decision = iterate_adjustment(scenario, decision, tools) log_event(cycle_id, "iteration", f"第{iteration_count}次迭代调整") validation = validate_dispatch(scenario, decision, tools) log_event(cycle_id, "validation", json.dumps(validation, ensure_ascii=False)) memory_snapshot = { "last_priority": scenario["priority"], "last_cost": decision["kpi"]["cost"], "last_carbon": decision["kpi"]["carbon"], "last_reliability": decision["kpi"]["reliability"], "iterations": iteration_count, "last_score": decision["kpi"]["score"], } set_memory("microgrid_state", memory_snapshot) log_event(cycle_id, "memory", json.dumps(memory_snapshot, ensure_ascii=False)) db.execute( "INSERT INTO validations (cycle_id, passed, issues, created_at) VALUES (?, ?, ?, ?)", (cycle_id, 1 if validation["passed"] else 0, json.dumps(validation["issues"], ensure_ascii=False), now_ts()), ) db.execute( "UPDATE cycles SET result = ?, status = ? WHERE id = ?", ( json.dumps( { "scenario": scenario, "decision": decision, "validation": validation, "memory": memory_snapshot, "iterations": iteration_count, }, ensure_ascii=False, ), "completed" if validation["passed"] else "needs_review", cycle_id, ), ) db.commit() return { "id": cycle_id, "status": "completed" if validation["passed"] else "needs_review", "scenario": scenario, "decision": decision, "validation": validation, "memory": memory_snapshot, "iterations": iteration_count, } @app.route("/") def index(): return render_template("index.html") @app.route("/api/run_cycle", methods=["POST"]) def api_run_cycle(): payload = request.json or {} result = run_cycle(payload) return jsonify(result) @app.route("/api/cycles", methods=["GET"]) def api_cycles(): db = get_db() rows = db.execute( "SELECT id, title, status, created_at FROM cycles ORDER BY created_at DESC LIMIT 30" ).fetchall() return jsonify( [ { "id": row["id"], "title": row["title"], "status": row["status"], "created_at": row["created_at"], } for row in rows ] ) @app.route("/api/cycles/", methods=["GET"]) def api_cycle_detail(cycle_id): db = get_db() row = db.execute("SELECT * FROM cycles WHERE id = ?", (cycle_id,)).fetchone() if not row: return jsonify({"error": "未找到任务"}), 404 result = json.loads(row["result"] or "{}") return jsonify(result) @app.route("/api/replay/", methods=["GET"]) def api_replay(cycle_id): db = get_db() logs = db.execute( "SELECT role, content, created_at FROM logs WHERE cycle_id = ? ORDER BY id ASC", (cycle_id,) ).fetchall() return jsonify( [ {"role": row["role"], "content": row["content"], "created_at": row["created_at"]} for row in logs ] ) @app.route("/api/memory", methods=["GET"]) def api_memory(): snapshot = get_memory("microgrid_state", {}) return jsonify(snapshot) @app.route("/api/stats", methods=["GET"]) def api_stats(): db = get_db() rows = db.execute("SELECT result, status, created_at FROM cycles").fetchall() total = len(rows) completed = sum(1 for r in rows if r["status"] == "completed") costs = [] carbons = [] for row in rows: try: result = json.loads(row["result"] or "{}") kpi = result.get("decision", {}).get("kpi", {}) if "cost" in kpi: costs.append(kpi["cost"]) if "carbon" in kpi: carbons.append(kpi["carbon"]) except Exception: continue return jsonify( { "total": total, "completed": completed, "avg_cost": round(sum(costs) / len(costs), 3) if costs else 0, "avg_carbon": round(sum(carbons) / len(carbons), 3) if carbons else 0, "last_run": rows[0]["created_at"] if rows else "暂无", } ) @app.route("/api/scenario_suggest", methods=["GET"]) def api_scenario_suggest(): last = get_memory("microgrid_state", {}) return jsonify( { "title": "智能建议场景", "demand_mw": 110 + random.randint(-10, 12), "solar_mw": 45 + random.randint(-8, 6), "wind_mw": 30 + random.randint(-6, 6), "storage_mwh": 90 + random.randint(-15, 15), "storage_soc": round(0.35 + random.random() * 0.45, 2), "priority": last.get("last_priority", "成本优先"), "notes": "建议场景:参考历史记忆与负荷波动生成。", } ) def detect_binary(file_path): with open(file_path, "rb") as f: chunk = f.read(1024) return b"\0" in chunk def finalize_file(file_id, filename, storage_path, content_type): size = os.path.getsize(storage_path) sha256_hash = hashlib.sha256() with open(storage_path, "rb") as f: for block in iter(lambda: f.read(4096), b""): sha256_hash.update(block) is_binary = 1 if detect_binary(storage_path) else 0 db = get_db() db.execute( "INSERT INTO files (id, filename, size_bytes, sha256, content_type, is_binary, storage_path, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", (file_id, filename, size, sha256_hash.hexdigest(), content_type or "", is_binary, storage_path, now_ts()), ) db.commit() return { "id": file_id, "filename": filename, "size_bytes": size, "sha256": sha256_hash.hexdigest(), "is_binary": bool(is_binary), } @app.route("/api/upload", methods=["POST"]) def api_upload(): if "file" not in request.files: return jsonify({"error": "未检测到文件"}), 400 file = request.files["file"] if not file.filename: return jsonify({"error": "文件名为空"}), 400 filename = os.path.basename(file.filename) file_id = str(uuid.uuid4()) storage_path = os.path.join(INSTANCE_DIR, f"{file_id}_{filename}") file.save(storage_path) payload = finalize_file(file_id, filename, storage_path, file.content_type) return jsonify({"status": "ok", "file": payload}) @app.route("/api/upload_chunk", methods=["POST"]) def api_upload_chunk(): file_id = request.headers.get("X-File-Id") filename = request.headers.get("X-File-Name") total = int(request.headers.get("X-Chunk-Total", "1")) index = int(request.headers.get("X-Chunk-Index", "0")) if not file_id or not filename: return jsonify({"error": "缺少文件标识"}), 400 chunk = request.data if not chunk: return jsonify({"error": "分片内容为空"}), 400 temp_path = os.path.join(INSTANCE_DIR, f"{file_id}.part") with open(temp_path, "ab") as f: f.write(chunk) if index + 1 == total: final_name = os.path.basename(filename) final_path = os.path.join(INSTANCE_DIR, f"{file_id}_{final_name}") os.replace(temp_path, final_path) payload = finalize_file(file_id, final_name, final_path, request.headers.get("X-Content-Type")) return jsonify({"status": "completed", "file": payload}) return jsonify({"status": "chunk_received", "index": index}) @app.route("/api/files", methods=["GET"]) def api_files(): db = get_db() rows = db.execute( "SELECT id, filename, size_bytes, sha256, is_binary, created_at FROM files ORDER BY created_at DESC LIMIT 50" ).fetchall() return jsonify( [ { "id": row["id"], "filename": row["filename"], "size_bytes": row["size_bytes"], "sha256": row["sha256"], "is_binary": bool(row["is_binary"]), "created_at": row["created_at"], } for row in rows ] ) @app.route("/api/files/", methods=["GET"]) def api_download(file_id): db = get_db() row = db.execute("SELECT storage_path, filename FROM files WHERE id = ?", (file_id,)).fetchone() if not row: return jsonify({"error": "未找到文件"}), 404 return send_file(row["storage_path"], as_attachment=True, download_name=row["filename"]) if __name__ == "__main__": app.run(host="0.0.0.0", port=7860, debug=True)