#!/usr/bin/env python3
"""Preprocess campus building energy data and create per-building EDA files.
The script intentionally uses only the Python standard library so it can run in
minimal environments. It reads the wide minute-level `all_buildings_power.csv`,
converts watts to kW, converts UNIX timestamps to Asia/Kolkata time, aggregates
hourly/daily analysis-ready files, and writes EDA reports for each meter and
building type.
"""
from __future__ import annotations
import csv
import math
import statistics
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import UTC, datetime
from pathlib import Path
from typing import Iterable
from zoneinfo import ZoneInfo
ROOT = Path(__file__).resolve().parents[1]
DATA_FILE = ROOT / "energy_dataset" / "all_buildings_power.csv"
TRANSFORMER_POWER_FILE = ROOT / "energy_dataset" / "all_transformer_power.csv"
OUT_DIR = ROOT / "preprocessed_outputs"
EDA_DIR = ROOT / "eda_by_building_type"
FULL_EDA_DIR = ROOT / "eda_energy_full"
IST = ZoneInfo("Asia/Kolkata")
METERS = [
"Academic",
"Boys_main",
"Boys_backup",
"Facilities",
"Girls_main",
"Girls_backup",
"Lecture",
"Library",
"Mess",
]
METER_META = {
"Academic": {
"type": "academic",
"display": "Academic building",
"note": "Main academic block; expected to follow working-day and teaching-hour patterns.",
},
"Boys_main": {
"type": "hostel",
"display": "Boys hostel mains",
"note": "Main grid supply for boys hostel; residential load should be steadier than academic spaces.",
},
"Boys_backup": {
"type": "hostel",
"display": "Boys hostel UPS",
"note": "Backup/UPS supply for boys hostel; useful for separating essential residential load.",
},
"Facilities": {
"type": "facilities",
"display": "Facilities building",
"note": "Campus facilities load; may include operational equipment and irregular maintenance activity.",
},
"Girls_main": {
"type": "hostel",
"display": "Girls hostel mains",
"note": "Main grid supply for girls hostel; residential load should remain active across weekends.",
},
"Girls_backup": {
"type": "hostel",
"display": "Girls hostel UPS",
"note": "Backup/UPS supply for girls hostel; currently one of the cleanest meter series.",
},
"Lecture": {
"type": "lecture",
"display": "Lecture building",
"note": "Lecture/classroom load; expected to be schedule-driven with many low or zero periods.",
},
"Library": {
"type": "library",
"display": "Library building",
"note": "Library load; expected to reflect opening hours, study periods, and exam-season usage.",
},
"Mess": {
"type": "mess",
"display": "Dining/Mess building",
"note": "Dining building load; expected to show meal-time equipment peaks.",
},
}
EDA_ASPECTS = [
{
"aspect": "Data quality and coverage",
"why": "Check whether each building has enough usable observations before comparing consumption.",
"outputs": "missing %, observed rows, availability %, zero %, negative %",
},
{
"aspect": "Load magnitude and ranking",
"why": "Identify which buildings consume the most power and should be prioritized.",
"outputs": "mean kW, median kW, p75/p95 kW, max kW",
},
{
"aspect": "Temporal behavior",
"why": "Campus energy is strongly tied to operating hours, class schedules, weekends, and seasons.",
"outputs": "hourly profile, weekday profile, monthly profile, daily trend",
},
{
"aspect": "Peak demand",
"why": "Peak events drive capacity planning, demand response, and transformer stress.",
"outputs": "top 10 peak timestamps, p95 kW, max kW",
},
{
"aspect": "Stability and variability",
"why": "Stable loads behave differently from occupancy-driven loads and need different models.",
"outputs": "std kW, coefficient of variation, p95/median ratio",
},
{
"aspect": "Building type comparison",
"why": "Academic, hostel, library, mess, lecture, and facilities loads represent different use cases.",
"outputs": "type-level hourly/daily cleaned files and type-level reports",
},
]
COMMON_EDA_FLOW = [
"1. Validate data coverage and missing/zero/negative values.",
"2. Convert timestamp to Asia/Kolkata and power from W to kW.",
"3. Aggregate minute data to hourly and daily clean datasets.",
"4. Summarize distribution: mean, median, p75, p95, max, variability.",
"5. Analyze temporal patterns by hour of day, weekday, month, and daily trend.",
"6. Extract peak events for operational review.",
"7. Compare the meter with its building type and with all other buildings.",
]
TRANSFORMER_COLUMNS = ["transfomer_1", "transfomer_2", "transfomer_3"]
@dataclass
class SeriesStats:
count: int = 0
missing: int = 0
zero: int = 0
negative: int = 0
total_kw: float = 0.0
total_sq_kw: float = 0.0
min_kw: float | None = None
max_kw: float | None = None
first_seen: str | None = None
last_seen: str | None = None
values_for_quantiles: list[float] = field(default_factory=list)
def add(self, value: float | None, timestamp: str) -> None:
if value is None:
self.missing += 1
return
self.count += 1
self.total_kw += value
self.total_sq_kw += value * value
self.min_kw = value if self.min_kw is None else min(self.min_kw, value)
self.max_kw = value if self.max_kw is None else max(self.max_kw, value)
if value == 0:
self.zero += 1
if value < 0:
self.negative += 1
if self.first_seen is None:
self.first_seen = timestamp
self.last_seen = timestamp
self.values_for_quantiles.append(value)
@property
def mean_kw(self) -> float:
return self.total_kw / self.count if self.count else math.nan
@property
def std_kw(self) -> float:
if self.count <= 1:
return math.nan
variance = (self.total_sq_kw - (self.total_kw * self.total_kw / self.count)) / (self.count - 1)
return math.sqrt(max(variance, 0.0))
@dataclass
class Bucket:
sum_kw: float = 0.0
count: int = 0
missing: int = 0
def add(self, value: float | None) -> None:
if value is None:
self.missing += 1
return
self.sum_kw += value
self.count += 1
@property
def mean_kw(self) -> float:
return self.sum_kw / self.count if self.count else math.nan
@property
def availability_pct(self) -> float:
total = self.count + self.missing
return self.count / total * 100 if total else math.nan
def parse_watts(value: str) -> float | None:
value = value.strip()
if not value or value.upper() == "NA":
return None
try:
watts = float(value)
except ValueError:
return None
return watts / 1000.0
def parse_timestamp(value: str) -> int:
"""Read UNIX timestamps stored either as integers or scientific notation."""
return int(float(value.strip()))
def fmt(value: float | int | None, digits: int = 3) -> str:
if value is None:
return ""
if isinstance(value, float) and (math.isnan(value) or math.isinf(value)):
return ""
return f"{value:.{digits}f}"
def quantile(sorted_values: list[float], q: float) -> float:
if not sorted_values:
return math.nan
if len(sorted_values) == 1:
return sorted_values[0]
pos = (len(sorted_values) - 1) * q
lower = math.floor(pos)
upper = math.ceil(pos)
if lower == upper:
return sorted_values[int(pos)]
return sorted_values[lower] * (upper - pos) + sorted_values[upper] * (pos - lower)
def ensure_dirs() -> None:
for path in [
OUT_DIR,
EDA_DIR,
EDA_DIR / "per_meter",
EDA_DIR / "by_type",
EDA_DIR / "charts",
FULL_EDA_DIR,
FULL_EDA_DIR / "charts",
FULL_EDA_DIR / "buildings",
FULL_EDA_DIR / "building_types",
]:
path.mkdir(parents=True, exist_ok=True)
for meter in METERS:
(FULL_EDA_DIR / "buildings" / meter.lower()).mkdir(parents=True, exist_ok=True)
for building_type in sorted({meta["type"] for meta in METER_META.values()}):
(FULL_EDA_DIR / "building_types" / building_type).mkdir(parents=True, exist_ok=True)
def write_csv(path: Path, fieldnames: list[str], rows: Iterable[dict[str, object]]) -> None:
with path.open("w", newline="", encoding="utf-8") as file:
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader()
for row in rows:
writer.writerow(row)
def svg_line_chart(path: Path, title: str, points: list[tuple[str, float]], y_label: str = "kW") -> None:
width, height = 920, 360
margin_left, margin_right, margin_top, margin_bottom = 70, 30, 50, 70
plot_w = width - margin_left - margin_right
plot_h = height - margin_top - margin_bottom
values = [v for _, v in points if not math.isnan(v)]
if not values:
return
min_v, max_v = min(values), max(values)
if min_v == max_v:
min_v -= 1
max_v += 1
coords = []
usable = [(label, value) for label, value in points if not math.isnan(value)]
for i, (_, value) in enumerate(usable):
x = margin_left + (plot_w * i / max(len(usable) - 1, 1))
y = margin_top + plot_h - ((value - min_v) / (max_v - min_v) * plot_h)
coords.append(f"{x:.1f},{y:.1f}")
tick_labels = []
for i in range(0, len(usable), max(1, len(usable) // 8)):
label, _ = usable[i]
x = margin_left + (plot_w * i / max(len(usable) - 1, 1))
tick_labels.append(
f'{label}'
)
svg = f"""
"""
path.write_text(svg, encoding="utf-8")
def svg_bar_chart(path: Path, title: str, bars: list[tuple[str, float]], y_label: str = "kW") -> None:
width, height = 920, 420
margin_left, margin_right, margin_top, margin_bottom = 70, 30, 55, 110
plot_w = width - margin_left - margin_right
plot_h = height - margin_top - margin_bottom
clean_bars = [(label, value) for label, value in bars if not math.isnan(value)]
if not clean_bars:
return
max_v = max(value for _, value in clean_bars)
max_v = max_v if max_v > 0 else 1
step = plot_w / len(clean_bars)
bar_w = step * 0.62
rects = []
labels = []
for i, (label, value) in enumerate(clean_bars):
x = margin_left + i * step + (step - bar_w) / 2
bar_h = value / max_v * plot_h
y = margin_top + plot_h - bar_h
rects.append(
f''
)
labels.append(
f'{label}'
)
labels.append(
f'{fmt(value, 1)}'
)
svg = f"""
"""
path.write_text(svg, encoding="utf-8")
def quality_label(missing_pct: float, zero_pct: float) -> str:
if missing_pct <= 5 and zero_pct <= 10:
return "strong"
if missing_pct <= 15 and zero_pct <= 25:
return "usable"
if missing_pct <= 30:
return "limited"
return "weak"
def summarize_transformer_power() -> list[dict[str, object]]:
stats = {column: SeriesStats() for column in TRANSFORMER_COLUMNS}
if not TRANSFORMER_POWER_FILE.exists():
return []
with TRANSFORMER_POWER_FILE.open("r", newline="", encoding="utf-8") as file:
reader = csv.DictReader(file)
for row in reader:
dt = datetime.fromtimestamp(parse_timestamp(row["timestamp"]), tz=UTC).astimezone(IST)
dt_iso = dt.isoformat()
for column in TRANSFORMER_COLUMNS:
stats[column].add(parse_watts(row[column]), dt_iso)
rows = []
for column, item in stats.items():
values = sorted(item.values_for_quantiles)
total_points = item.count + item.missing
rows.append(
{
"transformer": column,
"rows_total": total_points,
"observed_rows": item.count,
"missing_rows": item.missing,
"missing_pct": fmt(item.missing / total_points * 100 if total_points else math.nan),
"zero_pct": fmt(item.zero / item.count * 100 if item.count else math.nan),
"mean_kw": fmt(item.mean_kw),
"median_kw": fmt(quantile(values, 0.50)),
"p95_kw": fmt(quantile(values, 0.95)),
"max_kw": fmt(item.max_kw),
"first_seen_ist": item.first_seen or "",
"last_seen_ist": item.last_seen or "",
}
)
return rows
def write_analysis_plan() -> None:
lines = [
"# EDA Analysis Plan Before Per-Building EDA",
"",
"This dataset should be analyzed with the same flow for all 9 building meters so comparisons stay fair.",
"",
"## Dataset-specific questions",
"",
"- Which meters have enough coverage to trust?",
"- Which buildings consume the most energy on average and during peaks?",
"- Which buildings show schedule-driven behavior by hour, weekday, and month?",
"- Which loads are stable enough for baseline forecasting?",
"- Which meters need caution because of missing data or many zeros?",
"- How do building types differ: academic, hostel, facilities, lecture, library, and mess?",
"",
"## Recommended analysis aspects",
"",
"| Aspect | Why it matters | Output files/metrics |",
"| --- | --- | --- |",
]
for item in EDA_ASPECTS:
lines.append(f"| {item['aspect']} | {item['why']} | {item['outputs']} |")
lines.extend(["", "## Common EDA flow used for every building", ""])
lines.extend([f"- {step}" for step in COMMON_EDA_FLOW])
lines.extend(
[
"",
"## Practical interpretation",
"",
"- Use `strong` meters for headline insights and modeling first.",
"- Use `usable` meters for comparison after checking missing periods.",
"- Treat `limited` meters carefully; avoid overclaiming precise trends.",
"- Treat a high zero percentage as a separate operating-state signal, not automatically as clean data.",
"",
"## Current feature coverage",
"",
"| Feature | Current status | Notes |",
"| --- | --- | --- |",
"| Energy consumption | Calculated | Uses the 1-minute building and transformer power CSV files. |",
"| Schedule/time pattern | Calculated as proxy | Uses hour-of-day, weekday/weekend, and month from timestamps. No explicit class timetable file is present. |",
"| Occupancy | Not calculated yet | No occupancy CSV is present in the current workspace. Add it to join at 10-minute or hourly resolution. |",
"| Weather | Not calculated yet | No weather CSV is present in the current workspace. Add it to join by timestamp before correlation/regression. |",
]
)
content = "\n".join(lines) + "\n"
(EDA_DIR / "eda_analysis_plan.md").write_text(content, encoding="utf-8")
(FULL_EDA_DIR / "00_eda_scope_and_flow.md").write_text(content, encoding="utf-8")
def main() -> None:
ensure_dirs()
write_analysis_plan()
stats = {meter: SeriesStats() for meter in METERS}
hourly = defaultdict(Bucket)
daily = defaultdict(Bucket)
hour_profile = defaultdict(Bucket)
weekday_profile = defaultdict(Bucket)
month_profile = defaultdict(Bucket)
type_hourly = defaultdict(Bucket)
type_daily = defaultdict(Bucket)
top_peaks = {meter: [] for meter in METERS}
row_count = 0
with DATA_FILE.open("r", newline="", encoding="utf-8") as file:
reader = csv.DictReader(file)
for row in reader:
row_count += 1
dt = datetime.fromtimestamp(parse_timestamp(row["timestamp"]), tz=UTC).astimezone(IST)
dt_iso = dt.isoformat()
hour_key = dt.strftime("%Y-%m-%d %H:00:00%z")
day_key = dt.strftime("%Y-%m-%d")
month_key = dt.strftime("%Y-%m")
weekday_key = str(dt.weekday())
hour_of_day = f"{dt.hour:02d}"
values_by_type = defaultdict(list)
for meter in METERS:
value = parse_watts(row[meter])
stats[meter].add(value, dt_iso)
hourly[(meter, hour_key)].add(value)
daily[(meter, day_key)].add(value)
hour_profile[(meter, hour_of_day)].add(value)
weekday_profile[(meter, weekday_key)].add(value)
month_profile[(meter, month_key)].add(value)
if value is not None:
values_by_type[METER_META[meter]["type"]].append(value)
peaks = top_peaks[meter]
peaks.append((value, dt_iso))
peaks.sort(key=lambda item: item[0], reverse=True)
del peaks[10:]
else:
type_hourly[(METER_META[meter]["type"], hour_key)].add(None)
type_daily[(METER_META[meter]["type"], day_key)].add(None)
for building_type, values in values_by_type.items():
total_type_kw = sum(values)
type_hourly[(building_type, hour_key)].add(total_type_kw)
type_daily[(building_type, day_key)].add(total_type_kw)
summary_rows = []
for meter, item in stats.items():
values = sorted(item.values_for_quantiles)
total_points = item.count + item.missing
summary_rows.append(
{
"meter": meter,
"building_type": METER_META[meter]["type"],
"display_name": METER_META[meter]["display"],
"building_note": METER_META[meter]["note"],
"rows_total": total_points,
"observed_rows": item.count,
"missing_rows": item.missing,
"missing_pct": fmt(item.missing / total_points * 100 if total_points else math.nan),
"zero_pct": fmt(item.zero / item.count * 100 if item.count else math.nan),
"negative_pct": fmt(item.negative / item.count * 100 if item.count else math.nan),
"mean_kw": fmt(item.mean_kw),
"std_kw": fmt(item.std_kw),
"min_kw": fmt(item.min_kw),
"p25_kw": fmt(quantile(values, 0.25)),
"median_kw": fmt(quantile(values, 0.50)),
"p75_kw": fmt(quantile(values, 0.75)),
"p95_kw": fmt(quantile(values, 0.95)),
"max_kw": fmt(item.max_kw),
"first_seen_ist": item.first_seen or "",
"last_seen_ist": item.last_seen or "",
}
)
write_csv(
OUT_DIR / "building_preprocessing_summary.csv",
list(summary_rows[0].keys()),
summary_rows,
)
write_csv(
FULL_EDA_DIR / "01_common_building_power_summary.csv",
list(summary_rows[0].keys()),
summary_rows,
)
common_flow_rows = []
for row in summary_rows:
meter = row["meter"]
weekdays = [weekday_profile[(meter, str(day))].mean_kw for day in range(5)]
weekends = [weekday_profile[(meter, str(day))].mean_kw for day in range(5, 7)]
weekday_mean = statistics.fmean([value for value in weekdays if not math.isnan(value)])
weekend_mean = statistics.fmean([value for value in weekends if not math.isnan(value)])
mean_kw = float(row["mean_kw"]) if row["mean_kw"] else math.nan
median_kw = float(row["median_kw"]) if row["median_kw"] else math.nan
std_kw = float(row["std_kw"]) if row["std_kw"] else math.nan
p95_kw = float(row["p95_kw"]) if row["p95_kw"] else math.nan
missing_pct = float(row["missing_pct"]) if row["missing_pct"] else math.nan
zero_pct = float(row["zero_pct"]) if row["zero_pct"] else math.nan
cv = std_kw / mean_kw if mean_kw else math.nan
p95_to_median = p95_kw / median_kw if median_kw else math.nan
common_flow_rows.append(
{
"meter": meter,
"building_type": row["building_type"],
"display_name": row["display_name"],
"building_note": row["building_note"],
"quality_label": quality_label(missing_pct, zero_pct),
"missing_pct": row["missing_pct"],
"zero_pct": row["zero_pct"],
"mean_kw": row["mean_kw"],
"median_kw": row["median_kw"],
"p95_kw": row["p95_kw"],
"max_kw": row["max_kw"],
"cv": fmt(cv),
"p95_to_median": fmt(p95_to_median),
"weekday_mean_kw": fmt(weekday_mean),
"weekend_mean_kw": fmt(weekend_mean),
"weekend_delta_pct": fmt((weekend_mean - weekday_mean) / weekday_mean * 100 if weekday_mean else math.nan),
"first_seen_ist": row["first_seen_ist"],
"last_seen_ist": row["last_seen_ist"],
}
)
write_csv(
EDA_DIR / "all_meters_common_flow_summary.csv",
[
"meter",
"building_type",
"display_name",
"building_note",
"quality_label",
"missing_pct",
"zero_pct",
"mean_kw",
"median_kw",
"p95_kw",
"max_kw",
"cv",
"p95_to_median",
"weekday_mean_kw",
"weekend_mean_kw",
"weekend_delta_pct",
"first_seen_ist",
"last_seen_ist",
],
common_flow_rows,
)
write_csv(
FULL_EDA_DIR / "02_common_all_meters_flow_summary.csv",
[
"meter",
"building_type",
"display_name",
"building_note",
"quality_label",
"missing_pct",
"zero_pct",
"mean_kw",
"median_kw",
"p95_kw",
"max_kw",
"cv",
"p95_to_median",
"weekday_mean_kw",
"weekend_mean_kw",
"weekend_delta_pct",
"first_seen_ist",
"last_seen_ist",
],
common_flow_rows,
)
transformer_rows = summarize_transformer_power()
if transformer_rows:
write_csv(
FULL_EDA_DIR / "03_common_transformer_power_summary.csv",
list(transformer_rows[0].keys()),
transformer_rows,
)
svg_bar_chart(
FULL_EDA_DIR / "charts" / "common_transformer_mean_power.svg",
"Transformer mean power",
[(row["transformer"], float(row["mean_kw"])) for row in transformer_rows if row["mean_kw"]],
)
ranked_rows = sorted(common_flow_rows, key=lambda row: (float(row["missing_pct"]), float(row["zero_pct"])))
svg_bar_chart(
FULL_EDA_DIR / "charts" / "common_building_mean_power_ranking.svg",
"Building mean power ranking",
[(row["meter"], float(row["mean_kw"])) for row in sorted(common_flow_rows, key=lambda item: float(item["mean_kw"]), reverse=True)],
)
svg_bar_chart(
FULL_EDA_DIR / "charts" / "common_building_missing_pct.svg",
"Building missing data percentage",
[(row["meter"], float(row["missing_pct"])) for row in sorted(common_flow_rows, key=lambda item: float(item["missing_pct"]), reverse=True)],
y_label="%",
)
all_building_lines = [
"# All 9 Buildings Common EDA",
"",
"Each building meter is analyzed with the same flow so the outputs can be compared directly.",
"",
"## Analysis aspects before EDA",
"",
"| Aspect | Why it matters |",
"| --- | --- |",
]
for item in EDA_ASPECTS:
all_building_lines.append(f"| {item['aspect']} | {item['why']} |")
all_building_lines.extend(["", "## Common flow", ""])
all_building_lines.extend([f"- {step}" for step in COMMON_EDA_FLOW])
all_building_lines.extend(
[
"",
"## Building annotations",
"",
"| Meter | Display name | Type | Note |",
"| --- | --- | --- | --- |",
]
)
for row in common_flow_rows:
all_building_lines.append(
f"| {row['meter']} | {row['display_name']} | {row['building_type']} | {row['building_note']} |"
)
all_building_lines.extend(
[
"",
"## 9-building comparable summary",
"",
"| Rank by quality | Meter | Type | Quality | Missing % | Zero % | Mean kW | Median kW | P95 kW | CV | Weekend delta % |",
"| ---: | --- | --- | --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: |",
]
)
for rank, row in enumerate(ranked_rows, start=1):
all_building_lines.append(
f"| {rank} | {row['meter']} | {row['building_type']} | {row['quality_label']} | "
f"{row['missing_pct']} | {row['zero_pct']} | {row['mean_kw']} | {row['median_kw']} | "
f"{row['p95_kw']} | {row['cv']} | {row['weekend_delta_pct']} |"
)
all_building_lines.extend(
[
"",
"## Recommended interpretation order",
"",
"- Start with `quality_label`, `missing_pct`, and `zero_pct`.",
"- Use `mean_kw`, `median_kw`, and `p95_kw` for load ranking.",
"- Use `cv` and `p95_to_median` for volatility.",
"- Use `weekend_delta_pct` for schedule sensitivity.",
"- Drill into each `per_meter/*_eda.md` file for the same meter-level flow.",
]
)
(EDA_DIR / "all_buildings_common_eda.md").write_text("\n".join(all_building_lines) + "\n", encoding="utf-8")
(FULL_EDA_DIR / "04_common_all_buildings_eda.md").write_text("\n".join(all_building_lines) + "\n", encoding="utf-8")
hourly_rows = []
for (meter, hour_key), bucket in sorted(hourly.items(), key=lambda item: (item[0][0], item[0][1])):
hourly_rows.append(
{
"datetime_hour_ist": hour_key,
"meter": meter,
"building_type": METER_META[meter]["type"],
"mean_kw": fmt(bucket.mean_kw),
"observed_minutes": bucket.count,
"missing_minutes": bucket.missing,
"availability_pct": fmt(bucket.availability_pct),
"approx_kwh": fmt(bucket.mean_kw if bucket.count else math.nan),
}
)
write_csv(
OUT_DIR / "building_power_hourly_clean.csv",
[
"datetime_hour_ist",
"meter",
"building_type",
"mean_kw",
"observed_minutes",
"missing_minutes",
"availability_pct",
"approx_kwh",
],
hourly_rows,
)
daily_rows = []
for (meter, day_key), bucket in sorted(daily.items(), key=lambda item: (item[0][0], item[0][1])):
daily_rows.append(
{
"date_ist": day_key,
"meter": meter,
"building_type": METER_META[meter]["type"],
"mean_kw": fmt(bucket.mean_kw),
"observed_minutes": bucket.count,
"missing_minutes": bucket.missing,
"availability_pct": fmt(bucket.availability_pct),
"approx_kwh": fmt(bucket.mean_kw * 24 if bucket.count else math.nan),
}
)
write_csv(
OUT_DIR / "building_power_daily_clean.csv",
[
"date_ist",
"meter",
"building_type",
"mean_kw",
"observed_minutes",
"missing_minutes",
"availability_pct",
"approx_kwh",
],
daily_rows,
)
type_hourly_rows = []
for (building_type, hour_key), bucket in sorted(type_hourly.items(), key=lambda item: (item[0][0], item[0][1])):
type_hourly_rows.append(
{
"datetime_hour_ist": hour_key,
"building_type": building_type,
"mean_kw": fmt(bucket.mean_kw),
"observed_meter_values": bucket.count,
"missing_meter_values": bucket.missing,
"availability_pct": fmt(bucket.availability_pct),
}
)
write_csv(
OUT_DIR / "building_type_hourly_clean.csv",
[
"datetime_hour_ist",
"building_type",
"mean_kw",
"observed_meter_values",
"missing_meter_values",
"availability_pct",
],
type_hourly_rows,
)
type_daily_rows = []
for (building_type, day_key), bucket in sorted(type_daily.items(), key=lambda item: (item[0][0], item[0][1])):
type_daily_rows.append(
{
"date_ist": day_key,
"building_type": building_type,
"mean_kw": fmt(bucket.mean_kw),
"observed_meter_values": bucket.count,
"missing_meter_values": bucket.missing,
"availability_pct": fmt(bucket.availability_pct),
"approx_kwh": fmt(bucket.mean_kw * 24 if bucket.count else math.nan),
}
)
write_csv(
OUT_DIR / "building_type_daily_clean.csv",
[
"date_ist",
"building_type",
"mean_kw",
"observed_meter_values",
"missing_meter_values",
"availability_pct",
"approx_kwh",
],
type_daily_rows,
)
for meter in METERS:
meter_slug = meter.lower()
building_dir = FULL_EDA_DIR / "buildings" / meter_slug
hourly_profile_rows = []
for hour in [f"{i:02d}" for i in range(24)]:
bucket = hour_profile[(meter, hour)]
hourly_profile_rows.append(
{
"hour_ist": hour,
"mean_kw": fmt(bucket.mean_kw),
"observed_points": bucket.count,
"availability_pct": fmt(bucket.availability_pct),
}
)
write_csv(
EDA_DIR / "per_meter" / f"{meter_slug}_hourly_profile.csv",
["hour_ist", "mean_kw", "observed_points", "availability_pct"],
hourly_profile_rows,
)
write_csv(
building_dir / "hourly_profile.csv",
["hour_ist", "mean_kw", "observed_points", "availability_pct"],
hourly_profile_rows,
)
weekday_rows = []
for weekday in range(7):
bucket = weekday_profile[(meter, str(weekday))]
weekday_rows.append(
{
"weekday": weekday,
"weekday_name": ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"][weekday],
"mean_kw": fmt(bucket.mean_kw),
"observed_points": bucket.count,
"availability_pct": fmt(bucket.availability_pct),
}
)
write_csv(
EDA_DIR / "per_meter" / f"{meter_slug}_weekday_profile.csv",
["weekday", "weekday_name", "mean_kw", "observed_points", "availability_pct"],
weekday_rows,
)
write_csv(
building_dir / "weekday_profile.csv",
["weekday", "weekday_name", "mean_kw", "observed_points", "availability_pct"],
weekday_rows,
)
monthly_rows = []
for (profile_meter, month), bucket in sorted(month_profile.items()):
if profile_meter != meter:
continue
monthly_rows.append(
{
"month_ist": month,
"mean_kw": fmt(bucket.mean_kw),
"observed_points": bucket.count,
"availability_pct": fmt(bucket.availability_pct),
}
)
write_csv(
EDA_DIR / "per_meter" / f"{meter_slug}_monthly_profile.csv",
["month_ist", "mean_kw", "observed_points", "availability_pct"],
monthly_rows,
)
write_csv(
building_dir / "monthly_profile.csv",
["month_ist", "mean_kw", "observed_points", "availability_pct"],
monthly_rows,
)
peak_rows = [
{"rank": index + 1, "datetime_ist": timestamp, "kw": fmt(value)}
for index, (value, timestamp) in enumerate(top_peaks[meter])
]
write_csv(EDA_DIR / "per_meter" / f"{meter_slug}_top_peaks.csv", ["rank", "datetime_ist", "kw"], peak_rows)
write_csv(building_dir / "top_peaks.csv", ["rank", "datetime_ist", "kw"], peak_rows)
svg_line_chart(
EDA_DIR / "charts" / f"{meter_slug}_hourly_profile.svg",
f"{METER_META[meter]['display']} - mean hourly load",
[(row["hour_ist"], float(row["mean_kw"]) if row["mean_kw"] else math.nan) for row in hourly_profile_rows],
)
svg_line_chart(
building_dir / "hourly_profile.svg",
f"{METER_META[meter]['display']} - mean hourly load",
[(row["hour_ist"], float(row["mean_kw"]) if row["mean_kw"] else math.nan) for row in hourly_profile_rows],
)
summary = next(row for row in summary_rows if row["meter"] == meter)
common_summary = next(row for row in common_flow_rows if row["meter"] == meter)
common_flow_md = "\n".join([f"- {step}" for step in COMMON_EDA_FLOW])
md = f"""# {METER_META[meter]["display"]} EDA
## Common EDA flow
{common_flow_md}
## Building note
{METER_META[meter]["note"]}
## Preprocessing notes
- Source: `energy_dataset/all_buildings_power.csv`
- Timezone: UNIX timestamp converted to `Asia/Kolkata` (`+05:30`)
- Unit conversion: watts to kW
- Missing values: `NA`/blank kept as missing during aggregation
## Key metrics
| Metric | Value |
| --- | ---: |
| Building type | {summary["building_type"]} |
| Observed rows | {summary["observed_rows"]} |
| Missing rows | {summary["missing_rows"]} |
| Missing % | {summary["missing_pct"]} |
| Mean kW | {summary["mean_kw"]} |
| Median kW | {summary["median_kw"]} |
| P95 kW | {summary["p95_kw"]} |
| Max kW | {summary["max_kw"]} |
| Zero % of observed | {summary["zero_pct"]} |
| Quality label | {common_summary["quality_label"]} |
| Coefficient of variation | {common_summary["cv"]} |
| P95 / median | {common_summary["p95_to_median"]} |
| Weekday mean kW | {common_summary["weekday_mean_kw"]} |
| Weekend mean kW | {common_summary["weekend_mean_kw"]} |
| Weekend delta % | {common_summary["weekend_delta_pct"]} |
## How to read this meter
- Use missing % and zero % first; these decide how much confidence to put in the patterns.
- Use mean/median/p95/max to understand normal load versus stress load.
- Use hourly, weekday, and monthly profiles to separate schedule effects from long-term changes.
- Use top peaks to inspect unusual operating days or demand-response opportunities.
## Files
- `{meter_slug}_hourly_profile.csv`
- `{meter_slug}_weekday_profile.csv`
- `{meter_slug}_monthly_profile.csv`
- `{meter_slug}_top_peaks.csv`
- `../charts/{meter_slug}_hourly_profile.svg`
"""
(EDA_DIR / "per_meter" / f"{meter_slug}_eda.md").write_text(md, encoding="utf-8")
structured_md = md.replace(
f"- `{meter_slug}_hourly_profile.csv`\n"
f"- `{meter_slug}_weekday_profile.csv`\n"
f"- `{meter_slug}_monthly_profile.csv`\n"
f"- `{meter_slug}_top_peaks.csv`\n"
f"- `../charts/{meter_slug}_hourly_profile.svg`",
"- `hourly_profile.csv`\n"
"- `weekday_profile.csv`\n"
"- `monthly_profile.csv`\n"
"- `top_peaks.csv`\n"
"- `hourly_profile.svg`",
)
(building_dir / "README.md").write_text(structured_md, encoding="utf-8")
type_summary_rows = []
for building_type in sorted({meta["type"] for meta in METER_META.values()}):
meters = [meter for meter in METERS if METER_META[meter]["type"] == building_type]
selected = [row for row in summary_rows if row["meter"] in meters]
observed = sum(int(row["observed_rows"]) for row in selected)
missing = sum(int(row["missing_rows"]) for row in selected)
mean_values = [float(row["mean_kw"]) for row in selected if row["mean_kw"]]
type_summary_rows.append(
{
"building_type": building_type,
"meters": ", ".join(meters),
"observed_rows": observed,
"missing_rows": missing,
"missing_pct": fmt(missing / (missing + observed) * 100 if missing + observed else math.nan),
"mean_of_meter_means_kw": fmt(statistics.fmean(mean_values) if mean_values else math.nan),
}
)
type_daily_points = []
for row in type_daily_rows:
if row["building_type"] == building_type and row["mean_kw"]:
type_daily_points.append((row["date_ist"], float(row["mean_kw"])))
svg_line_chart(
EDA_DIR / "charts" / f"{building_type}_daily_profile.svg",
f"{building_type.title()} - daily mean load",
type_daily_points,
)
svg_line_chart(
FULL_EDA_DIR / "building_types" / building_type / "daily_profile.svg",
f"{building_type.title()} - daily mean load",
type_daily_points,
)
md_lines = [
f"# {building_type.title()} Building Type EDA",
"",
"## Included meters",
"",
", ".join(meters),
"",
"## Meter summary",
"",
"| Meter | Mean kW | Median kW | P95 kW | Missing % | Max kW |",
"| --- | ---: | ---: | ---: | ---: | ---: |",
]
for row in selected:
md_lines.append(
f"| {row['meter']} | {row['mean_kw']} | {row['median_kw']} | {row['p95_kw']} | {row['missing_pct']} | {row['max_kw']} |"
)
md_lines.extend(
[
"",
"## Files",
"",
"- `../../preprocessed_outputs/building_type_hourly_clean.csv`",
"- `../../preprocessed_outputs/building_type_daily_clean.csv`",
f"- `../charts/{building_type}_daily_profile.svg`",
]
)
(EDA_DIR / "by_type" / f"{building_type}_eda.md").write_text("\n".join(md_lines) + "\n", encoding="utf-8")
(FULL_EDA_DIR / "building_types" / building_type / "README.md").write_text(
"\n".join(md_lines) + "\n",
encoding="utf-8",
)
write_csv(
EDA_DIR / "building_type_summary.csv",
["building_type", "meters", "observed_rows", "missing_rows", "missing_pct", "mean_of_meter_means_kw"],
type_summary_rows,
)
write_csv(
FULL_EDA_DIR / "05_common_building_type_summary.csv",
["building_type", "meters", "observed_rows", "missing_rows", "missing_pct", "mean_of_meter_means_kw"],
type_summary_rows,
)
index_lines = [
"# Building Energy Preprocessing and EDA Index",
"",
f"Source rows processed: `{row_count}`",
"",
"## Start here",
"",
"- `eda_analysis_plan.md`",
"- `all_buildings_common_eda.md`",
"- `all_meters_common_flow_summary.csv`",
"",
"## Preprocessed outputs",
"",
"- `preprocessed_outputs/building_preprocessing_summary.csv`",
"- `preprocessed_outputs/building_power_hourly_clean.csv`",
"- `preprocessed_outputs/building_power_daily_clean.csv`",
"- `preprocessed_outputs/building_type_hourly_clean.csv`",
"- `preprocessed_outputs/building_type_daily_clean.csv`",
"",
"## Per-meter reports",
"",
]
for meter in METERS:
index_lines.append(f"- `per_meter/{meter.lower()}_eda.md`")
index_lines.extend(["", "## Per-building-type reports", ""])
for row in type_summary_rows:
index_lines.append(f"- `by_type/{row['building_type']}_eda.md`")
(EDA_DIR / "README.md").write_text("\n".join(index_lines) + "\n", encoding="utf-8")
full_index_lines = [
"# Full Energy EDA Output",
"",
"Common files are kept at this top level. Building-specific EDA is kept inside `buildings/`.",
"",
"## Common outputs",
"",
"- `00_eda_scope_and_flow.md`",
"- `01_common_building_power_summary.csv`",
"- `02_common_all_meters_flow_summary.csv`",
"- `03_common_transformer_power_summary.csv`",
"- `04_common_all_buildings_eda.md`",
"- `05_common_building_type_summary.csv`",
"- `charts/common_building_mean_power_ranking.svg`",
"- `charts/common_building_missing_pct.svg`",
"- `charts/common_transformer_mean_power.svg`",
"",
"## Per-building outputs",
"",
]
for meter in METERS:
full_index_lines.append(f"- `buildings/{meter.lower()}/README.md`")
full_index_lines.extend(["", "## Per-building-type outputs", ""])
for row in type_summary_rows:
full_index_lines.append(f"- `building_types/{row['building_type']}/README.md`")
(FULL_EDA_DIR / "README.md").write_text("\n".join(full_index_lines) + "\n", encoding="utf-8")
if __name__ == "__main__":
main()