api / src /budget_audit.py
safraeli's picture
Vectorize Farquhar, DI ControlLoop, gate pipeline, budget audit, chatbot Hebrew
15be6bb verified
"""
Budget audit pipeline — logs every slot spend and provides rollup reports.
Appends one row per control tick to a parquet file. The weekly rollup
verifies cumulative sacrifice stays within the 5% annual ceiling.
Usage:
# In control_loop.py tick():
from src.budget_audit import BudgetAuditLog
audit = BudgetAuditLog()
audit.log_slot(tick_result)
# Weekly report:
python -m src.budget_audit --report
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from datetime import date, datetime, timezone
from pathlib import Path
from typing import Optional
import pandas as pd
from config.settings import (
DATA_DIR,
MAX_ENERGY_REDUCTION_PCT,
SYSTEM_CAPACITY_KW,
)
logger = logging.getLogger(__name__)
AUDIT_DIR = DATA_DIR / "budget_audit"
AUDIT_PATH = AUDIT_DIR / "slot_log.parquet"
@dataclass
class SlotRecord:
"""One row in the audit log."""
timestamp: datetime
date: date
slot_index: int
planned_offset_deg: float
actual_offset_deg: float
energy_cost_kwh: float
budget_spent_kwh: float
budget_remaining_kwh: float
gate_passed: bool
source: str
stage_id: str
class BudgetAuditLog:
"""Append-only parquet log for budget slot spends."""
def __init__(self, path: Path = AUDIT_PATH):
self.path = path
self.path.parent.mkdir(parents=True, exist_ok=True)
def log_slot(self, tick_result) -> None:
"""Append a tick result to the audit log."""
try:
record = {
"timestamp": getattr(tick_result, "timestamp", datetime.now(timezone.utc)),
"date": str(getattr(tick_result, "timestamp", datetime.now(timezone.utc)).date()
if hasattr(getattr(tick_result, "timestamp", None), "date")
else date.today()),
"slot_index": getattr(tick_result, "slot_index", -1),
"planned_offset_deg": getattr(tick_result, "plan_offset_deg", 0.0),
"actual_offset_deg": getattr(tick_result, "target_angle", 0.0),
"energy_cost_kwh": getattr(tick_result, "energy_cost_kwh", 0.0),
"budget_spent_kwh": getattr(tick_result, "budget_spent_kwh", 0.0),
"budget_remaining_kwh": getattr(tick_result, "budget_remaining_kwh", 0.0),
"gate_passed": getattr(tick_result, "live_gate_passed", False),
"source": getattr(tick_result, "source", ""),
"stage_id": getattr(tick_result, "stage_id", "unknown"),
}
new_row = pd.DataFrame([record])
if self.path.exists():
existing = pd.read_parquet(self.path)
combined = pd.concat([existing, new_row], ignore_index=True)
else:
combined = new_row
combined.to_parquet(self.path, index=False)
logger.debug("Audit log: slot %d, cost=%.4f kWh", record["slot_index"], record["energy_cost_kwh"])
except Exception as exc:
logger.warning("Budget audit log failed: %s", exc)
def load(self) -> pd.DataFrame:
"""Load the full audit log."""
if self.path.exists():
return pd.read_parquet(self.path)
return pd.DataFrame()
def daily_summary(self, target_date: Optional[date] = None) -> dict:
"""Summarize a single day's budget usage."""
df = self.load()
if df.empty:
return {"error": "No audit data"}
if target_date is None:
target_date = date.today()
day = df[df["date"] == str(target_date)]
if day.empty:
return {"date": str(target_date), "slots": 0, "total_cost_kwh": 0.0}
return {
"date": str(target_date),
"slots": len(day),
"total_cost_kwh": round(float(day["energy_cost_kwh"].sum()), 4),
"interventions": int(day["gate_passed"].sum()),
"max_offset_deg": round(float(day["actual_offset_deg"].abs().max()), 1),
"budget_remaining_kwh": round(float(day["budget_remaining_kwh"].iloc[-1]), 4),
}
def weekly_report(self) -> dict:
"""Generate a weekly rollup report for budget compliance."""
df = self.load()
if df.empty:
return {"error": "No audit data"}
total_cost = float(df["energy_cost_kwh"].sum())
days = df["date"].nunique()
daily_potential_kwh = SYSTEM_CAPACITY_KW * 6.0 # ~6 peak sun hours
annual_potential_kwh = daily_potential_kwh * 365
ceiling_kwh = annual_potential_kwh * MAX_ENERGY_REDUCTION_PCT / 100.0
# Project annual rate from observed data
if days > 0:
daily_rate = total_cost / days
projected_annual = daily_rate * 365
else:
daily_rate = 0
projected_annual = 0
compliant = projected_annual <= ceiling_kwh
return {
"period_days": days,
"total_cost_kwh": round(total_cost, 3),
"daily_avg_kwh": round(daily_rate, 4),
"projected_annual_kwh": round(projected_annual, 1),
"ceiling_kwh": round(ceiling_kwh, 1),
"ceiling_pct": MAX_ENERGY_REDUCTION_PCT,
"utilization_pct": round(projected_annual / ceiling_kwh * 100, 1) if ceiling_kwh > 0 else 0,
"compliant": compliant,
"total_interventions": int(df["gate_passed"].sum()),
"intervention_rate_pct": round(float(df["gate_passed"].mean()) * 100, 1),
}
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
if __name__ == "__main__":
import argparse
import json
parser = argparse.ArgumentParser(description="Budget audit report")
parser.add_argument("--report", action="store_true", help="Weekly rollup report")
parser.add_argument("--daily", type=str, help="Daily summary for YYYY-MM-DD")
args = parser.parse_args()
audit = BudgetAuditLog()
if args.report:
print(json.dumps(audit.weekly_report(), indent=2))
elif args.daily:
print(json.dumps(audit.daily_summary(date.fromisoformat(args.daily)), indent=2))
else:
df = audit.load()
if df.empty:
print("No audit data yet.")
else:
print(f"Audit log: {len(df)} slots, {df['date'].nunique()} days")
print(json.dumps(audit.weekly_report(), indent=2))