| """ |
| T2.3 · prioritizer.py |
| Appliance load-shedding plan generator. |
| Given a 24h forecast and a business's appliance list, |
| outputs per-appliance, per-hour ON/OFF plan maximizing |
| expected revenue under the 'drop luxury before critical' rule. |
| |
| Usage: |
| from prioritizer import plan, load_data |
| appliances, businesses = load_data() |
| forecast = [...] # from forecaster.py |
| result = plan(forecast, appliances, business_id="salon") |
| print(result) |
| """ |
|
|
| import json |
| from pathlib import Path |
| from typing import Optional |
|
|
| |
| |
| SHED_ORDER = {"luxury": 1, "comfort": 2, "critical": 3} |
| CATEGORY_REVENUE_WEIGHT = {"critical": 1.0, "comfort": 0.6, "luxury": 0.2} |
|
|
| |
| RISK_SHED_FRACTION = { |
| "LOW": 0.0, |
| "MEDIUM": 0.33, |
| "HIGH": 0.66, |
| } |
|
|
|
|
| def load_data(appliances_path="appliances.json", businesses_path="businesses.json"): |
| with open(appliances_path) as f: |
| appliances = json.load(f) |
| with open(businesses_path) as f: |
| businesses = json.load(f) |
| return appliances, businesses |
|
|
|
|
| def get_business_appliances(appliances: list, business: dict) -> list: |
| """Filter appliances to those used by this business.""" |
| ap_map = {a["id"]: a for a in appliances} |
| return [ap_map[aid] for aid in business["appliance_ids"] if aid in ap_map] |
|
|
|
|
| def plan(forecast: list[dict], appliances: list, business_id: str = "salon", |
| businesses_path: str = "businesses.json") -> dict: |
| """ |
| Core planning function. |
| |
| Algorithm: |
| 1. For each hour, determine outage risk level from forecast. |
| 2. Sort appliances by shed priority (luxury first, critical last). |
| 3. Apply shed depth based on risk: LOW=none, MEDIUM=shed luxury, |
| HIGH=shed luxury+comfort. Critical never shed unless P>0.5. |
| 4. Within each category, break ties by lowest revenue-per-watt (shed cheapest first). |
| 5. Calculate expected revenue saved vs naïve full-on. |
| |
| Returns dict with 24-hour plan per appliance + summary stats. |
| """ |
| |
| with open(businesses_path) as f: |
| businesses = json.load(f) |
| biz_map = {b["id"]: b for b in businesses} |
| business = biz_map[business_id] |
| biz_appliances = get_business_appliances(appliances, business) |
|
|
| |
| |
| def shed_sort_key(ap): |
| cat_priority = SHED_ORDER[ap["category"]] |
| rev = ap["revenue_if_running_rwf_per_h"] |
| return (cat_priority, rev) |
|
|
| sorted_appliances = sorted(biz_appliances, key=shed_sort_key) |
|
|
| hourly_plan = [] |
| total_revenue_plan = 0 |
| total_revenue_naive = 0 |
|
|
| for hour_data in forecast: |
| h = hour_data["hour"] |
| p_out = hour_data["p_outage"] |
| risk = hour_data["risk_level"] |
| exp_dur = hour_data["expected_duration_min"] |
|
|
| |
| frac_lost = (exp_dur / 60.0) * p_out |
| frac_lost = min(frac_lost, 1.0) |
|
|
| |
| |
| |
| |
| |
| categories_to_shed = set() |
| if risk == "HIGH": |
| categories_to_shed = {"luxury", "comfort"} |
| elif risk == "MEDIUM": |
| categories_to_shed = {"luxury"} |
| if p_out > 0.50: |
| categories_to_shed.add("critical") |
|
|
| appliance_states = [] |
| hour_revenue_plan = 0 |
| hour_revenue_naive = 0 |
|
|
| for ap in biz_appliances: |
| |
| is_peak = h in business.get("peak_hours", []) |
| if ap["category"] == "critical" and is_peak: |
| |
| state = "ON" |
| elif ap["category"] in categories_to_shed: |
| state = "OFF" |
| else: |
| state = "ON" |
|
|
| |
| base_rev = ap["revenue_if_running_rwf_per_h"] |
| naive_rev = base_rev * (1 - frac_lost) |
| plan_rev = base_rev if state == "ON" else 0 |
|
|
| |
| if state == "ON": |
| plan_rev = base_rev * (1 - frac_lost) |
|
|
| hour_revenue_plan += plan_rev |
| hour_revenue_naive += naive_rev |
|
|
| appliance_states.append({ |
| "appliance_id": ap["id"], |
| "name": ap["name"], |
| "category": ap["category"], |
| "state": state, |
| "watts": ap["watts_avg"] if state == "ON" else 0, |
| "revenue_rwf": round(plan_rev, 0), |
| "shed_reason": f"Risk={risk}, P={p_out:.2f}" if state == "OFF" else None, |
| }) |
|
|
| total_revenue_plan += hour_revenue_plan |
| total_revenue_naive += hour_revenue_naive |
|
|
| hourly_plan.append({ |
| "hour_offset": hour_data["hour_offset"], |
| "timestamp": hour_data["timestamp"], |
| "hour": h, |
| "p_outage": p_out, |
| "risk_level": risk, |
| "expected_duration_min": exp_dur, |
| "appliances": appliance_states, |
| "hour_revenue_plan_rwf": round(hour_revenue_plan, 0), |
| "hour_revenue_naive_rwf": round(hour_revenue_naive, 0), |
| }) |
|
|
| revenue_saved = total_revenue_plan - total_revenue_naive |
| |
| |
| disruption_penalty = total_revenue_naive * 0.20 |
| net_benefit = revenue_saved + disruption_penalty |
|
|
| return { |
| "business": business["name"], |
| "business_id": business_id, |
| "plan": hourly_plan, |
| "summary": { |
| "total_revenue_plan_rwf": round(total_revenue_plan, 0), |
| "total_revenue_naive_rwf": round(total_revenue_naive, 0), |
| "revenue_saved_rwf": round(revenue_saved, 0), |
| "disruption_penalty_avoided_rwf": round(disruption_penalty, 0), |
| "net_benefit_rwf": round(net_benefit, 0), |
| "hours_with_shed": sum( |
| 1 for h in hourly_plan |
| if any(a["state"] == "OFF" for a in h["appliances"]) |
| ), |
| }, |
| } |
|
|
|
|
| def format_digest(plan_result: dict, forecast: list) -> list[str]: |
| """ |
| Generate 3 SMS messages (max 160 chars each) for the morning digest. |
| Designed for feature phone delivery. |
| """ |
| biz = plan_result["business"] |
| summary = plan_result["summary"] |
| hourly = plan_result["plan"] |
|
|
| |
| high_risk_hours = [h for h in hourly if h["risk_level"] == "HIGH"] |
| med_risk_hours = [h for h in hourly if h["risk_level"] == "MEDIUM"] |
|
|
| if high_risk_hours: |
| risk_times = ",".join([str(h["hour"]) + "h" for h in high_risk_hours[:3]]) |
| risk_word = "HIGH" |
| elif med_risk_hours: |
| risk_times = ",".join([str(h["hour"]) + "h" for h in med_risk_hours[:3]]) |
| risk_word = "MED" |
| else: |
| risk_times = "none" |
| risk_word = "LOW" |
|
|
| |
| shed_hours = [h for h in hourly if any(a["state"] == "OFF" for a in h["appliances"])] |
| if shed_hours: |
| sample_hour = shed_hours[0] |
| shed_names = [a["name"].split()[0] for a in sample_hour["appliances"] |
| if a["state"] == "OFF"][:2] |
| shed_str = "+".join(shed_names) |
| else: |
| shed_str = "none" |
|
|
| net = int(summary["net_benefit_rwf"]) |
| saved_str = f"{net:,}RWF" |
|
|
| sms1 = f"UMURIRO FORECAST 24H: Risk={risk_word} at {risk_times}. Shed: {shed_str}. Est.save: {saved_str}. Stay alert!" |
| sms2 = f"PLAN: Turn OFF {shed_str} during risk hrs ({risk_times}). Keep dryer+clippers+lights ON. Generator ready?" |
| sms3 = f"If no signal by 13h, use YESTERDAY plan. Risk valid 6h. Call 0788-GRID for live update. Good business!" |
|
|
| |
| sms1 = sms1[:160] |
| sms2 = sms2[:160] |
| sms3 = sms3[:160] |
|
|
| return [sms1, sms2, sms3] |
|
|
|
|
| def print_plan(plan_result: dict): |
| """Pretty-print the 24h plan to terminal.""" |
| print(f"\n{'='*70}") |
| print(f" LOAD-SHEDDING PLAN — {plan_result['business']}") |
| print(f"{'='*70}") |
| print(f"{'Hour':>5} {'Time':>12} {'Risk':>6} {'P(out)':>7} | Appliances OFF") |
| print("-" * 70) |
| for h in plan_result["plan"]: |
| off = [a["name"][:12] for a in h["appliances"] if a["state"] == "OFF"] |
| off_str = ", ".join(off) if off else "—" |
| print(f"{h['hour']:>5} {h['timestamp'][11:]:>12} {h['risk_level']:>6} " |
| f"{h['p_outage']:>7.3f} | {off_str}") |
| print("-" * 70) |
| s = plan_result["summary"] |
| print(f" Net benefit vs naïve: {s['net_benefit_rwf']:,.0f} RWF") |
| print(f" Revenue (plan): {s['total_revenue_plan_rwf']:,.0f} RWF") |
| print(f" Shed hours: {s['hours_with_shed']}/24") |
| print(f"{'='*70}\n") |
|
|
|
|
| if __name__ == "__main__": |
| import sys |
| from forecaster import Forecaster |
|
|
| business_id = sys.argv[1] if len(sys.argv) > 1 else "salon" |
|
|
| print(f"Fitting forecaster...") |
| fc = Forecaster().fit("grid_history.csv") |
| forecast = fc.predict_next_24h() |
|
|
| appliances, businesses = load_data() |
|
|
| print(f"\nGenerating plan for: {business_id}") |
| result = plan(forecast, appliances, business_id=business_id) |
| print_plan(result) |
|
|
| |
| sms_msgs = format_digest(result, forecast) |
| print("📱 Morning SMS Digest (3×160 chars):") |
| for i, msg in enumerate(sms_msgs, 1): |
| print(f" SMS {i} ({len(msg)} chars): {msg}") |
|
|