OptiQ / api /routes /digital_twin.py
mohammademad2003's picture
first commit
7477316
"""
Digital Twin endpoint — Load scaling and scenario analysis.
"""
from __future__ import annotations
import json
from typing import Optional
from fastapi import APIRouter, HTTPException, Depends
from pydantic import BaseModel, Field
from src.grid.loader import load_network, clone_network
from src.grid.power_flow import get_baseline, evaluate_topology
from src.hybrid.pipeline import run_hybrid_pipeline
from src.evaluation.metrics import compute_impact
from api.auth import optional_auth, FirebaseUser
from api.database import log_usage, log_audit
router = APIRouter()
class DigitalTwinRequest(BaseModel):
system: str = Field(default="case33bw", description="IEEE test system name")
load_multiplier: float = Field(
default=1.0,
ge=0.5,
le=2.0,
description="Load scaling factor (0.5 = 50% load, 1.3 = 130% load)"
)
optimize: bool = Field(default=False, description="Whether to run optimization")
method: str = Field(default="hybrid", description="Optimization method if optimize=True")
@router.post("/digital-twin")
def digital_twin(req: DigitalTwinRequest, user: FirebaseUser = Depends(optional_auth)):
"""Digital Twin simulation with load scaling.
Scales all loads by the given multiplier and optionally runs optimization.
This simulates different operating conditions (peak, off-peak, etc.).
"""
try:
net = load_network(req.system)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc))
# Store original loads
original_p = net.load["p_mw"].copy()
original_q = net.load["q_mvar"].copy()
# Scale loads
net.load["p_mw"] *= req.load_multiplier
net.load["q_mvar"] *= req.load_multiplier
# Get baseline with scaled loads
baseline = get_baseline(net)
if not baseline.get("converged"):
raise HTTPException(status_code=500, detail="Power flow did not converge for this load level")
result = {
"system": req.system,
"load_multiplier": req.load_multiplier,
"total_load_mw": float(net.load["p_mw"].sum()),
"baseline": baseline,
}
# Run optimization if requested
if req.optimize:
use_quantum = req.method in ("quantum", "hybrid")
use_ai = req.method in ("ai", "hybrid")
pipeline_result = run_hybrid_pipeline(
net,
use_quantum=use_quantum,
use_ai=use_ai,
quantum_iters=300,
quantum_restarts=3,
quantum_top_k=5,
)
if "error" not in pipeline_result:
result["optimized"] = pipeline_result["optimized"]
result["impact"] = pipeline_result["impact"]
result["method"] = pipeline_result["method"]
result["timings"] = pipeline_result["timings"]
# Log if authenticated
if user:
try:
impact = pipeline_result["impact"]
log_usage(
firebase_uid=user.uid,
system=req.system,
method=f"digital_twin_{req.method}",
baseline_loss_kw=baseline["total_loss_kw"],
optimized_loss_kw=pipeline_result["optimized"]["total_loss_kw"],
energy_saved_kwh=impact["energy_saved_mwh_year"] * 1000,
co2_saved_kg=impact["co2_saved_tonnes_year"] * 1000,
money_saved_usd=impact["cost_saved_usd_year"],
computation_time_sec=pipeline_result["timings"]["total_sec"],
load_multiplier=req.load_multiplier,
switches_changed=json.dumps(pipeline_result["optimized"].get("open_lines", [])),
)
except Exception:
pass
else:
result["optimization_error"] = pipeline_result["error"]
return result
@router.get("/digital-twin/scenarios")
def get_scenarios(system: str = "case33bw"):
"""Get predefined load scenarios for digital twin analysis."""
try:
net = load_network(system)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc))
base_load = float(net.load["p_mw"].sum())
scenarios = [
{"name": "Light Load (70%)", "multiplier": 0.7, "load_mw": round(base_load * 0.7, 2)},
{"name": "Morning (85%)", "multiplier": 0.85, "load_mw": round(base_load * 0.85, 2)},
{"name": "Nominal (100%)", "multiplier": 1.0, "load_mw": round(base_load, 2)},
{"name": "Peak (115%)", "multiplier": 1.15, "load_mw": round(base_load * 1.15, 2)},
{"name": "Heavy Load (130%)", "multiplier": 1.3, "load_mw": round(base_load * 1.3, 2)},
]
return {"system": system, "base_load_mw": base_load, "scenarios": scenarios}