File size: 6,540 Bytes
15be6bb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 | """
Budget audit pipeline — logs every slot spend and provides rollup reports.
Appends one row per control tick to a parquet file. The weekly rollup
verifies cumulative sacrifice stays within the 5% annual ceiling.
Usage:
# In control_loop.py tick():
from src.budget_audit import BudgetAuditLog
audit = BudgetAuditLog()
audit.log_slot(tick_result)
# Weekly report:
python -m src.budget_audit --report
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from datetime import date, datetime, timezone
from pathlib import Path
from typing import Optional
import pandas as pd
from config.settings import (
DATA_DIR,
MAX_ENERGY_REDUCTION_PCT,
SYSTEM_CAPACITY_KW,
)
logger = logging.getLogger(__name__)
AUDIT_DIR = DATA_DIR / "budget_audit"
AUDIT_PATH = AUDIT_DIR / "slot_log.parquet"
@dataclass
class SlotRecord:
"""One row in the audit log."""
timestamp: datetime
date: date
slot_index: int
planned_offset_deg: float
actual_offset_deg: float
energy_cost_kwh: float
budget_spent_kwh: float
budget_remaining_kwh: float
gate_passed: bool
source: str
stage_id: str
class BudgetAuditLog:
"""Append-only parquet log for budget slot spends."""
def __init__(self, path: Path = AUDIT_PATH):
self.path = path
self.path.parent.mkdir(parents=True, exist_ok=True)
def log_slot(self, tick_result) -> None:
"""Append a tick result to the audit log."""
try:
record = {
"timestamp": getattr(tick_result, "timestamp", datetime.now(timezone.utc)),
"date": str(getattr(tick_result, "timestamp", datetime.now(timezone.utc)).date()
if hasattr(getattr(tick_result, "timestamp", None), "date")
else date.today()),
"slot_index": getattr(tick_result, "slot_index", -1),
"planned_offset_deg": getattr(tick_result, "plan_offset_deg", 0.0),
"actual_offset_deg": getattr(tick_result, "target_angle", 0.0),
"energy_cost_kwh": getattr(tick_result, "energy_cost_kwh", 0.0),
"budget_spent_kwh": getattr(tick_result, "budget_spent_kwh", 0.0),
"budget_remaining_kwh": getattr(tick_result, "budget_remaining_kwh", 0.0),
"gate_passed": getattr(tick_result, "live_gate_passed", False),
"source": getattr(tick_result, "source", ""),
"stage_id": getattr(tick_result, "stage_id", "unknown"),
}
new_row = pd.DataFrame([record])
if self.path.exists():
existing = pd.read_parquet(self.path)
combined = pd.concat([existing, new_row], ignore_index=True)
else:
combined = new_row
combined.to_parquet(self.path, index=False)
logger.debug("Audit log: slot %d, cost=%.4f kWh", record["slot_index"], record["energy_cost_kwh"])
except Exception as exc:
logger.warning("Budget audit log failed: %s", exc)
def load(self) -> pd.DataFrame:
"""Load the full audit log."""
if self.path.exists():
return pd.read_parquet(self.path)
return pd.DataFrame()
def daily_summary(self, target_date: Optional[date] = None) -> dict:
"""Summarize a single day's budget usage."""
df = self.load()
if df.empty:
return {"error": "No audit data"}
if target_date is None:
target_date = date.today()
day = df[df["date"] == str(target_date)]
if day.empty:
return {"date": str(target_date), "slots": 0, "total_cost_kwh": 0.0}
return {
"date": str(target_date),
"slots": len(day),
"total_cost_kwh": round(float(day["energy_cost_kwh"].sum()), 4),
"interventions": int(day["gate_passed"].sum()),
"max_offset_deg": round(float(day["actual_offset_deg"].abs().max()), 1),
"budget_remaining_kwh": round(float(day["budget_remaining_kwh"].iloc[-1]), 4),
}
def weekly_report(self) -> dict:
"""Generate a weekly rollup report for budget compliance."""
df = self.load()
if df.empty:
return {"error": "No audit data"}
total_cost = float(df["energy_cost_kwh"].sum())
days = df["date"].nunique()
daily_potential_kwh = SYSTEM_CAPACITY_KW * 6.0 # ~6 peak sun hours
annual_potential_kwh = daily_potential_kwh * 365
ceiling_kwh = annual_potential_kwh * MAX_ENERGY_REDUCTION_PCT / 100.0
# Project annual rate from observed data
if days > 0:
daily_rate = total_cost / days
projected_annual = daily_rate * 365
else:
daily_rate = 0
projected_annual = 0
compliant = projected_annual <= ceiling_kwh
return {
"period_days": days,
"total_cost_kwh": round(total_cost, 3),
"daily_avg_kwh": round(daily_rate, 4),
"projected_annual_kwh": round(projected_annual, 1),
"ceiling_kwh": round(ceiling_kwh, 1),
"ceiling_pct": MAX_ENERGY_REDUCTION_PCT,
"utilization_pct": round(projected_annual / ceiling_kwh * 100, 1) if ceiling_kwh > 0 else 0,
"compliant": compliant,
"total_interventions": int(df["gate_passed"].sum()),
"intervention_rate_pct": round(float(df["gate_passed"].mean()) * 100, 1),
}
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
if __name__ == "__main__":
import argparse
import json
parser = argparse.ArgumentParser(description="Budget audit report")
parser.add_argument("--report", action="store_true", help="Weekly rollup report")
parser.add_argument("--daily", type=str, help="Daily summary for YYYY-MM-DD")
args = parser.parse_args()
audit = BudgetAuditLog()
if args.report:
print(json.dumps(audit.weekly_report(), indent=2))
elif args.daily:
print(json.dumps(audit.daily_summary(date.fromisoformat(args.daily)), indent=2))
else:
df = audit.load()
if df.empty:
print("No audit data yet.")
else:
print(f"Audit log: {len(df)} slots, {df['date'].nunique()} days")
print(json.dumps(audit.weekly_report(), indent=2))
|