| """ |
| Budget audit pipeline — logs every slot spend and provides rollup reports. |
| |
| Appends one row per control tick to a parquet file. The weekly rollup |
| verifies cumulative sacrifice stays within the 5% annual ceiling. |
| |
| Usage: |
| # In control_loop.py tick(): |
| from src.budget_audit import BudgetAuditLog |
| audit = BudgetAuditLog() |
| audit.log_slot(tick_result) |
| |
| # Weekly report: |
| python -m src.budget_audit --report |
| """ |
|
|
| from __future__ import annotations |
|
|
| import logging |
| from dataclasses import dataclass |
| from datetime import date, datetime, timezone |
| from pathlib import Path |
| from typing import Optional |
|
|
| import pandas as pd |
|
|
| from config.settings import ( |
| DATA_DIR, |
| MAX_ENERGY_REDUCTION_PCT, |
| SYSTEM_CAPACITY_KW, |
| ) |
|
|
| logger = logging.getLogger(__name__) |
|
|
| AUDIT_DIR = DATA_DIR / "budget_audit" |
| AUDIT_PATH = AUDIT_DIR / "slot_log.parquet" |
|
|
|
|
| @dataclass |
| class SlotRecord: |
| """One row in the audit log.""" |
| timestamp: datetime |
| date: date |
| slot_index: int |
| planned_offset_deg: float |
| actual_offset_deg: float |
| energy_cost_kwh: float |
| budget_spent_kwh: float |
| budget_remaining_kwh: float |
| gate_passed: bool |
| source: str |
| stage_id: str |
|
|
|
|
| class BudgetAuditLog: |
| """Append-only parquet log for budget slot spends.""" |
|
|
| def __init__(self, path: Path = AUDIT_PATH): |
| self.path = path |
| self.path.parent.mkdir(parents=True, exist_ok=True) |
|
|
| def log_slot(self, tick_result) -> None: |
| """Append a tick result to the audit log.""" |
| try: |
| record = { |
| "timestamp": getattr(tick_result, "timestamp", datetime.now(timezone.utc)), |
| "date": str(getattr(tick_result, "timestamp", datetime.now(timezone.utc)).date() |
| if hasattr(getattr(tick_result, "timestamp", None), "date") |
| else date.today()), |
| "slot_index": getattr(tick_result, "slot_index", -1), |
| "planned_offset_deg": getattr(tick_result, "plan_offset_deg", 0.0), |
| "actual_offset_deg": getattr(tick_result, "target_angle", 0.0), |
| "energy_cost_kwh": getattr(tick_result, "energy_cost_kwh", 0.0), |
| "budget_spent_kwh": getattr(tick_result, "budget_spent_kwh", 0.0), |
| "budget_remaining_kwh": getattr(tick_result, "budget_remaining_kwh", 0.0), |
| "gate_passed": getattr(tick_result, "live_gate_passed", False), |
| "source": getattr(tick_result, "source", ""), |
| "stage_id": getattr(tick_result, "stage_id", "unknown"), |
| } |
|
|
| new_row = pd.DataFrame([record]) |
|
|
| if self.path.exists(): |
| existing = pd.read_parquet(self.path) |
| combined = pd.concat([existing, new_row], ignore_index=True) |
| else: |
| combined = new_row |
|
|
| combined.to_parquet(self.path, index=False) |
| logger.debug("Audit log: slot %d, cost=%.4f kWh", record["slot_index"], record["energy_cost_kwh"]) |
|
|
| except Exception as exc: |
| logger.warning("Budget audit log failed: %s", exc) |
|
|
| def load(self) -> pd.DataFrame: |
| """Load the full audit log.""" |
| if self.path.exists(): |
| return pd.read_parquet(self.path) |
| return pd.DataFrame() |
|
|
| def daily_summary(self, target_date: Optional[date] = None) -> dict: |
| """Summarize a single day's budget usage.""" |
| df = self.load() |
| if df.empty: |
| return {"error": "No audit data"} |
|
|
| if target_date is None: |
| target_date = date.today() |
|
|
| day = df[df["date"] == str(target_date)] |
| if day.empty: |
| return {"date": str(target_date), "slots": 0, "total_cost_kwh": 0.0} |
|
|
| return { |
| "date": str(target_date), |
| "slots": len(day), |
| "total_cost_kwh": round(float(day["energy_cost_kwh"].sum()), 4), |
| "interventions": int(day["gate_passed"].sum()), |
| "max_offset_deg": round(float(day["actual_offset_deg"].abs().max()), 1), |
| "budget_remaining_kwh": round(float(day["budget_remaining_kwh"].iloc[-1]), 4), |
| } |
|
|
| def weekly_report(self) -> dict: |
| """Generate a weekly rollup report for budget compliance.""" |
| df = self.load() |
| if df.empty: |
| return {"error": "No audit data"} |
|
|
| total_cost = float(df["energy_cost_kwh"].sum()) |
| days = df["date"].nunique() |
| daily_potential_kwh = SYSTEM_CAPACITY_KW * 6.0 |
| annual_potential_kwh = daily_potential_kwh * 365 |
| ceiling_kwh = annual_potential_kwh * MAX_ENERGY_REDUCTION_PCT / 100.0 |
|
|
| |
| if days > 0: |
| daily_rate = total_cost / days |
| projected_annual = daily_rate * 365 |
| else: |
| daily_rate = 0 |
| projected_annual = 0 |
|
|
| compliant = projected_annual <= ceiling_kwh |
|
|
| return { |
| "period_days": days, |
| "total_cost_kwh": round(total_cost, 3), |
| "daily_avg_kwh": round(daily_rate, 4), |
| "projected_annual_kwh": round(projected_annual, 1), |
| "ceiling_kwh": round(ceiling_kwh, 1), |
| "ceiling_pct": MAX_ENERGY_REDUCTION_PCT, |
| "utilization_pct": round(projected_annual / ceiling_kwh * 100, 1) if ceiling_kwh > 0 else 0, |
| "compliant": compliant, |
| "total_interventions": int(df["gate_passed"].sum()), |
| "intervention_rate_pct": round(float(df["gate_passed"].mean()) * 100, 1), |
| } |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| import argparse |
| import json |
|
|
| parser = argparse.ArgumentParser(description="Budget audit report") |
| parser.add_argument("--report", action="store_true", help="Weekly rollup report") |
| parser.add_argument("--daily", type=str, help="Daily summary for YYYY-MM-DD") |
| args = parser.parse_args() |
|
|
| audit = BudgetAuditLog() |
|
|
| if args.report: |
| print(json.dumps(audit.weekly_report(), indent=2)) |
| elif args.daily: |
| print(json.dumps(audit.daily_summary(date.fromisoformat(args.daily)), indent=2)) |
| else: |
| df = audit.load() |
| if df.empty: |
| print("No audit data yet.") |
| else: |
| print(f"Audit log: {len(df)} slots, {df['date'].nunique()} days") |
| print(json.dumps(audit.weekly_report(), indent=2)) |
|
|