NILMbench / app.py
Pybunny's picture
Drop Upload-your-own tab
f1a0624 verified
"""NILMbench HuggingFace Space.
Three tabs:
1. Built-in single-frame example (FaustineCNN baseline, V/I bundled).
2. Single-frame upload (user supplies a V/I segment).
3. Benchmark your model: user uploads a .pt for the bundled
``DemoRegressor`` architecture (see examples/byom_demo.py in the GitHub
repo); the Space scores it on a subset of the dense House-2 set and
renders the same Markdown report the CLI produces.
Asset sources: model weights for the baseline come from
``Pybunny/nilmbench-faustine``; the dense benchmark split for tab 3 is
fetched once from ``Pybunny/nilmbench-ukdale`` and cached.
"""
# ----------------------------------------------------------------------
# Monkey-patch gradio_client schema walker BEFORE importing gradio.
# Newer gradio_client (auto-installed by pip's resolution of gradio>=4.44)
# crashes at startup with `TypeError: argument of type 'bool' is not
# iterable` when it walks a schema with `additionalProperties: True`
# (which gr.JSON outputs produce). This brings the / route down and
# launch() then errors with "localhost is not accessible". Returning
# "Any" for bool schemas is what the unbroken upstream code does.
# ----------------------------------------------------------------------
import gradio_client.utils as _gc_utils # noqa: E402
_orig_get_type = _gc_utils.get_type
_orig_to_python = _gc_utils._json_schema_to_python_type
def _safe_get_type(schema):
if isinstance(schema, bool):
return "Any" if schema else "None"
return _orig_get_type(schema)
def _safe_to_python(schema, defs):
if isinstance(schema, bool):
return "Any" if schema else "None"
return _orig_to_python(schema, defs)
_gc_utils.get_type = _safe_get_type
_gc_utils._json_schema_to_python_type = _safe_to_python
import json
from pathlib import Path
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import gradio as gr
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from huggingface_hub import hf_hub_download
HERE = Path(__file__).resolve().parent
EXAMPLES_DIR = HERE / "examples"
MODEL_REPO = "Pybunny/nilmbench-faustine"
# UK-DALE House 2 calibration constants (from calibration_house_2.cfg).
V_PER_ADC = 1.88296904357e-7
I_PER_ADC = 4.77518864497e-8
ADC_FULL_SCALE = 2 ** 31
V_FACTOR = ADC_FULL_SCALE * V_PER_ADC # ~404.4
I_FACTOR = ADC_FULL_SCALE * I_PER_ADC # ~102.5
# ----------------------------------------------------------------------
# Model (self-contained so the Space has no dependency on the nilmbench pkg)
# ----------------------------------------------------------------------
class FaustineCNN(nn.Module):
def __init__(self, n_categories: int):
super().__init__()
self.conv_layers = nn.Sequential(
nn.Conv2d(2, 16, kernel_size=5, stride=2, padding=2),
nn.BatchNorm2d(16), nn.ReLU(inplace=True),
nn.Conv2d(16, 32, kernel_size=5, stride=2, padding=2),
nn.BatchNorm2d(32), nn.ReLU(inplace=True),
nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1),
nn.BatchNorm2d(64), nn.ReLU(inplace=True),
nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1),
nn.BatchNorm2d(128), nn.ReLU(inplace=True),
nn.AdaptiveAvgPool2d((1, 1)),
)
self.fc_layers = nn.Sequential(
nn.Linear(128, 1024),
nn.LayerNorm(1024),
nn.ReLU(inplace=True),
nn.Dropout(0.25),
nn.Linear(1024, 2 * n_categories),
)
self.n_categories = n_categories
def forward(self, x: torch.Tensor) -> torch.Tensor:
h = self.conv_layers(x).flatten(1)
h = self.fc_layers(h).view(x.size(0), self.n_categories, 2)
return F.softmax(h, dim=-1)[..., 0]
# ----------------------------------------------------------------------
# Asset loading (Hub)
# ----------------------------------------------------------------------
def load_assets():
classes_path = hf_hub_download(MODEL_REPO, "classes.json")
cutoffs_path = hf_hub_download(MODEL_REPO, "cutoffs.json")
weights_path = hf_hub_download(MODEL_REPO, "faustine_best.pt")
classes = json.loads(Path(classes_path).read_text())
cutoffs = json.loads(Path(cutoffs_path).read_text())["cutoffs_W"]
model = FaustineCNN(n_categories=len(classes))
state = torch.load(weights_path, map_location="cpu", weights_only=False)
if isinstance(state, dict) and "state_dict" in state:
state = state["state_dict"]
model.load_state_dict(state)
model.eval()
return model, classes, cutoffs
MODEL, CLASSES, CUTOFFS = load_assets()
# ----------------------------------------------------------------------
# Inference + plotting
# ----------------------------------------------------------------------
def _to_2d_image(vi_norm: np.ndarray) -> torch.Tensor:
if vi_norm.shape != (2, 96000):
raise ValueError(f"Expected (2, 96000), got {vi_norm.shape}")
img = vi_norm.reshape(2, 240, 400).astype(np.float32)
return torch.as_tensor(img).unsqueeze(0)
def predict(vi_norm: np.ndarray, aggregate_W: float) -> dict[str, float]:
with torch.no_grad():
scores = MODEL(_to_2d_image(vi_norm)).cpu().numpy().squeeze(0)
# FaustineCNN outputs per-category Bernoulli activations; renormalise
# across categories to obtain shares, then scale by the aggregate.
shares = scores / (scores.sum() + 1e-9)
raw = shares * float(aggregate_W)
out = {}
for k, cls in enumerate(CLASSES):
cut = CUTOFFS.get(cls, 0.0)
out[cls] = float(raw[k]) if raw[k] > cut else 0.0
return out
def make_overview_plot(vi_norm: np.ndarray, preds: dict[str, float],
truth: dict[str, float] | None) -> plt.Figure:
v = vi_norm[0].astype(np.float32) * V_FACTOR
i = vi_norm[1].astype(np.float32) * I_FACTOR
t = np.arange(len(v)) / 16000
fig = plt.figure(figsize=(8.0, 6.0))
gs = fig.add_gridspec(3, 1, height_ratios=[1.2, 1.2, 1.6], hspace=0.55)
ax_v = fig.add_subplot(gs[0])
ax_v.plot(t, v, color="#1a4f8a", lw=0.4)
ax_v.set_ylabel("Voltage (V)")
ax_v.set_xlim(0, 6); ax_v.grid(True, linestyle=":", alpha=0.4)
ax_i = fig.add_subplot(gs[1])
ax_i.plot(t, i, color="#7a1a1a", lw=0.4)
ax_i.set_ylabel("Current (A)"); ax_i.set_xlabel("Time (s)")
ax_i.set_xlim(0, 6); ax_i.grid(True, linestyle=":", alpha=0.4)
ax_p = fig.add_subplot(gs[2])
active = [(c, w) for c, w in preds.items() if w > 0]
active.sort(key=lambda kv: -kv[1])
if not active:
active = [("(all categories below cutoff)", 0.0)]
names = [c for c, _ in active]
vals = [w for _, w in active]
y_pos = np.arange(len(names))
ax_p.barh(y_pos, vals, color="#a63d40", edgecolor="#222", linewidth=0.4,
label="prediction")
if truth is not None:
tvals = [truth.get(c, 0.0) for c in names]
ax_p.barh(y_pos + 0.32, tvals, height=0.32,
color="#1a4f8a", alpha=0.6, edgecolor="#222", linewidth=0.4,
label="ground truth")
ax_p.set_yticks(y_pos); ax_p.set_yticklabels(names)
ax_p.invert_yaxis()
ax_p.set_xlabel("Predicted power (W)")
ax_p.grid(True, axis="x", linestyle=":", alpha=0.4)
if truth is not None:
ax_p.legend(loc="lower right", frameon=False, fontsize=9)
return fig
# ----------------------------------------------------------------------
# Gradio handlers
# ----------------------------------------------------------------------
def list_examples() -> list[str]:
if not EXAMPLES_DIR.exists():
return []
return sorted(p.stem for p in EXAMPLES_DIR.glob("*.npy"))
def load_example(name: str):
npy = EXAMPLES_DIR / f"{name}.npy"
meta = EXAMPLES_DIR / f"{name}.json"
vi = np.load(npy)
truth = None
aggregate = 0.0
if meta.exists():
m = json.loads(meta.read_text())
truth = m.get("truth")
aggregate = float(m.get("aggregate_W", 0.0))
if aggregate == 0.0 and truth is not None:
aggregate = sum(truth.values())
return vi, truth, aggregate
def run_example(name: str):
if not name:
return None, {}
vi, truth, agg = load_example(name)
preds = predict(vi, agg)
return make_overview_plot(vi, preds, truth), preds
def run_upload(file_obj, aggregate_W: float):
if file_obj is None:
return None, {}
vi = np.load(file_obj.name)
preds = predict(vi, aggregate_W)
return make_overview_plot(vi, preds, None), preds
# ----------------------------------------------------------------------
# Tab 3: full benchmark with a user-uploaded .pt for DemoRegressor
# ----------------------------------------------------------------------
# Self-contained copy of examples.byom_demo.DemoRegressor so the Space
# does not have to import the nilmbench package at module load time
# (lighter dep tree, faster cold start).
class DemoRegressor(nn.Module):
"""6 V/I stats -> linear -> softplus. Output: per-category power (W)."""
N_FEATURES = 6
def __init__(self, n_categories: int = 7):
super().__init__()
self.n_categories = n_categories
self.head = nn.Linear(self.N_FEATURES, n_categories)
@staticmethod
def _feats(x):
rms = (x * x).mean(dim=-1).clamp_min(0).sqrt()
absmean = x.abs().mean(dim=-1)
std = x.std(dim=-1)
return torch.cat([rms, absmean, std], dim=-1)
def forward(self, x):
return F.softplus(self.head(self._feats(x)))
_BENCH_DATA_DIR = None
def _bench_data_root():
"""Cache-aware snapshot_download of the benchmark/ split."""
global _BENCH_DATA_DIR
if _BENCH_DATA_DIR is not None:
return _BENCH_DATA_DIR
from huggingface_hub import snapshot_download
local = snapshot_download(
repo_id="Pybunny/nilmbench-ukdale", repo_type="dataset",
allow_patterns=["benchmark/*", "summary.json"],
)
_BENCH_DATA_DIR = Path(local)
return _BENCH_DATA_DIR
def _bench_subset(n_frames):
"""Memory-mapped read of the first n_frames frames from benchmark/.
Filters the labels to the 7-category benchmark scoring set
(electrical heating is listed in the file but never activates in House 2
and is excluded by the official protocol). This matches the shape of
the bundled byom_demo.pt and any other DemoRegressor checkpoint
trained via examples/byom_demo.py.
"""
BENCH_CLASSES = [
"always on", "cooking", "dishwasher", "electronics & lighting",
"fridge", "misc", "washing machine",
]
root = _bench_data_root() / "benchmark"
total = int(np.load(root / "x_vi_6s.npy", mmap_mode="r").shape[0])
n = max(1, min(int(n_frames), total))
x = np.asarray(np.load(root / "x_vi_6s.npy", mmap_mode="r")[:n],
dtype=np.float32)
lab = np.load(root / "labels_and_index.npz", allow_pickle=True)
all_cls = [str(c) for c in lab["class_names"]]
keep = [all_cls.index(c) for c in BENCH_CLASSES if c in all_cls]
y_all = lab["y_power"][:n].astype(np.float32)
y = y_all[:, keep]
classes = [all_cls[i] for i in keep]
return x, y, classes, total
def _score_demo_pt(weights_file, n_frames):
"""Load the user's .pt into DemoRegressor and produce a Markdown report."""
import json as _json
if weights_file is None:
return ("**Please upload a .pt file trained on the "
"`DemoRegressor` architecture** (see "
"[examples/byom_demo.py](https://github.com/Saharmgh/NILMbench/blob/main/examples/byom_demo.py)). "
"A bundled checkpoint is at "
"[examples/byom_demo.pt](https://github.com/Saharmgh/NILMbench/blob/main/examples/byom_demo.pt).",
None)
try:
x, y_true, classes, total = _bench_subset(n_frames)
except Exception as exc:
return (f"**Benchmark data download failed.**\n\n```\n{exc}\n```", None)
K = len(classes)
model = DemoRegressor(n_categories=K)
try:
state = torch.load(weights_file.name, map_location="cpu",
weights_only=False)
if isinstance(state, dict) and "state_dict" in state:
state = state["state_dict"]
model.load_state_dict(state, strict=True)
except Exception as exc:
return (f"**Weights failed to load** (does the checkpoint match "
f"`DemoRegressor(n_categories={K})`?).\n\n"
f"```\n{exc}\n```", None)
model.eval()
with torch.inference_mode():
x_t = torch.as_tensor(x)
y_pred = model(x_t).cpu().numpy().astype(np.float32)
# Use the nilmbench scorer, but installing it as a dep is heavy. Compute
# the headline numbers inline. theta_k defaults from the paper.
THETA = np.array([3, 50, 10, 5, 5, 10, 10], dtype=np.float32)
if K != 7:
THETA = np.full(K, 10.0, dtype=np.float32)
A = y_true > THETA
B = y_pred > THETA
err_ok = np.abs(y_pred - y_true) <= 20.0
union = (A | B).sum(axis=1)
keep = union > 0
inter = (A & B).sum(axis=1).astype(np.float32)
correct = (A & B & err_ok).sum(axis=1).astype(np.float32)
mj = float((correct[keep] / np.maximum(union[keep], 1)).mean()) if keep.any() else 0.0
jacc = float((inter[keep] / np.maximum(union[keep], 1)).mean()) if keep.any() else 0.0
tp = (A & B).sum(axis=1).astype(np.float32)
fp = (~A & B).sum(axis=1).astype(np.float32)
fn = (A & ~B).sum(axis=1).astype(np.float32)
f1d = tp + 0.5 * (fp + fn)
f1 = float(np.nanmean(np.where(f1d > 0, tp / np.maximum(f1d, 1), np.nan)))
P = y_true.sum(axis=1)
teca = float(np.nanmean(np.where(P > 0,
1.0 - np.abs(y_true - y_pred).sum(axis=1) / np.maximum(2 * P, 1e-9),
np.nan)))
mae = float(np.mean(np.abs(y_true - y_pred)))
per_class = []
for k, c in enumerate(classes):
Ak = A[:, k]; Bk = B[:, k]
eok = np.abs(y_pred[:, k] - y_true[:, k]) <= 20.0
unionk = (Ak | Bk).sum()
cork = (Ak & Bk & eok).sum()
per_class.append((c, float(cork / unionk) if unionk > 0 else 0.0))
# Paper baselines (Table 3 of the NILMbench manuscript; full 60 000 frames).
PAPER_BASELINES = [
# name, MJ_20W, F1, Jaccard, TECA, MAE_W
("DeepDFML", 0.316, 0.658, 0.532, 0.513, 38.64),
("COLD", 0.375, 0.714, 0.600, 0.580, 37.53),
("SchirmerCNN", 0.412, 0.766, 0.667, 0.622, 45.25),
("FaustineCNN", 0.504, 0.790, 0.698, 0.706, 29.64),
("FaustineCNN + recall-constr. cutoffs", 0.562, 0.811, 0.729, 0.739, 27.09),
("predict zero (trivial)", 0.000, 0.000, 0.000, 0.500, 67.60),
("predict House-2 mean (trivial)", 0.227, 0.579, 0.450, -0.105, 60.70),
("all to 'always on' (trivial)", 0.019, 0.557, 0.412, 0.165, 76.40),
]
md = []
md.append(f"# NILMbench — uploaded .pt\n")
md.append(f"_Your model scored on {len(x)} of {total} dense House-2 frames._\n")
md.append("## Comparison to paper baselines")
md.append("")
md.append("Baselines below are from Table 3 of the NILMbench paper, computed "
"on the full 60 000-frame dense House-2 set. **Your model is scored "
f"on the first {len(x)} frames only** (Space free-tier compute budget); "
"treat the comparison as directional. Use the `nilmbench` CLI locally "
"to score on all 60 000 frames for a fair comparison.\n")
md.append("| Model | MJ\\_{20W} | F1 | Jaccard | TECA | MAE (W) |")
md.append("|---|---|---|---|---|---|")
md.append(f"| **Your model (uploaded)** | **{mj:.4f}** | **{f1:.4f}** | "
f"**{jacc:.4f}** | **{teca:.4f}** | **{mae:.2f}** |")
for name, b_mj, b_f1, b_j, b_teca, b_mae in PAPER_BASELINES:
md.append(f"| {name} | {b_mj:.4f} | {b_f1:.4f} | {b_j:.4f} | "
f"{b_teca:.4f} | {b_mae:.2f} |")
md.append("")
md.append("## Per-category MJ\\_{20W} (your model)\n")
md.append("| Category | MJ_20W |")
md.append("|---|---|")
for c, v in per_class:
md.append(f"| {c} | {v:.4f} |")
md.append("")
import tempfile as _t
out = Path(_t.mkdtemp(prefix="nbench_report_")) / "score.json"
out.write_text(_json.dumps({
"MJ_20W": mj, "F1": f1, "Jaccard": jacc, "TECA": teca, "MAE_W": mae,
"n_frames": int(len(x)), "n_total": int(total),
"per_class_MJ_20W": dict(per_class),
}, indent=2, sort_keys=True))
return "\n".join(md), str(out)
# ----------------------------------------------------------------------
# UI
# ----------------------------------------------------------------------
def build_ui() -> gr.Blocks:
examples = list_examples()
with gr.Blocks(title="NILMbench demo") as demo:
gr.Markdown(
"# NILMbench demo\n"
"FaustineCNN trained on UK-DALE House 1, applied to a single "
"6-second 16 kHz V/I segment from House 2. Predicted power is "
"post-processed with the recall-constrained cutoffs from the paper.\n\n"
"Source code: <https://github.com/Saharmgh/NILMbench> · "
"Model: <https://huggingface.co/Pybunny/nilmbench-faustine>"
)
with gr.Tabs():
with gr.TabItem("Built-in example"):
ex = gr.Dropdown(examples, label="Example frame",
value=examples[0] if examples else None)
btn = gr.Button("Run", variant="primary")
plot_a = gr.Plot()
lab_a = gr.JSON(label="Predicted power per category (W)")
btn.click(run_example, ex, [plot_a, lab_a])
with gr.TabItem("Benchmark your model"):
gr.Markdown(
"Upload a `.pt` checkpoint trained on the bundled "
"[`DemoRegressor`](https://github.com/Saharmgh/NILMbench/blob/main/examples/byom_demo.py) "
"architecture (V/I summary stats → linear head, 7 outputs). "
"A sample checkpoint is in the repo at "
"[`examples/byom_demo.pt`](https://github.com/Saharmgh/NILMbench/blob/main/examples/byom_demo.pt). "
"The Space downloads the dense House-2 benchmark from "
"`Pybunny/nilmbench-ukdale` on first run (cached) and "
"scores your model on the selected number of frames. "
"For full 60 000-frame scoring or your own model "
"architecture, use the `nilmbench` CLI from the GitHub repo."
)
pt = gr.File(label="Trained .pt for DemoRegressor")
nf = gr.Slider(50, 5000, value=500, step=50,
label="Frames to score (free CPU; 500 ≈ 1 min)")
bb = gr.Button("Run benchmark", variant="primary")
rep = gr.Markdown()
jf = gr.File(label="score.json")
bb.click(_score_demo_pt, [pt, nf], [rep, jf])
return demo
if __name__ == "__main__":
build_ui().launch()