hack-a-thon / src /grader.py
KuuwangE's picture
์ •์ƒํ™”
0c6d96d unverified
# grader.py
import os
import math
import pandas as pd
import matplotlib.pyplot as plt
from typing import Tuple
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_absolute_error, root_mean_squared_error
from src.utils import evaluate_model # TOTAL ์Šค์ฝ”์–ด ๊ณ„์‚ฐ ํ•จ์ˆ˜
ANSWER_PATH = "answer.csv" # Space์— ๊ฐ™์ด ๋„ฃ์€ ์ •๋‹ต ํŒŒ์ผ
def _safe_minmax(series: pd.Series) -> pd.Series:
"""๋ชจ๋“  ๊ฐ’์ด ๊ฐ™๊ฑฐ๋‚˜ ์ „๋ถ€ NaN์ธ ๊ฒฝ์šฐ์—๋„ 0์œผ๋กœ ์•ˆ์ „ ์Šค์ผ€์ผ๋ง."""
s = series.astype(float)
if s.notna().sum() == 0:
return pd.Series([float("nan")] * len(s), index=s.index)
val_min = s.min()
val_max = s.max()
if pd.isna(val_min) or pd.isna(val_max) or val_min == val_max:
# range๊ฐ€ 0์ด๋ฉด ์ „๋ถ€ 0.0์œผ๋กœ(์ฐจ์ด ์ •๋ณด ์—†์Œ)
return pd.Series([0.0 if not pd.isna(v) else float("nan") for v in s], index=s.index)
return (s - val_min) / (val_max - val_min)
def _safe_rmse(y_true: pd.Series, y_pred: pd.Series) -> float:
"""NaN/๋ฌดํ•œ๋Œ€ ๋ฐฉ์–ด RMSE."""
df = pd.concat([y_true, y_pred], axis=1).dropna()
if df.shape[0] == 0:
return float("nan")
a = df.iloc[:, 0].astype(float)
b = df.iloc[:, 1].astype(float)
try:
return root_mean_squared_error(a, b)
except Exception:
return float("nan")
def _safe_nmae(y_true: pd.Series, y_pred: pd.Series, mode: str = "range") -> float:
"""
mode='range' -> MAE / (max(y_true) - min(y_true))
mode='mean' -> MAE / mean(y_true)
๋ถ„๋ชจ๊ฐ€ 0/NaN์ด๋ฉด NaN ๋ฐ˜ํ™˜.
"""
df = pd.concat([y_true, y_pred], axis=1).dropna()
if df.shape[0] == 0:
return float("nan")
a = df.iloc[:, 0].astype(float)
b = df.iloc[:, 1].astype(float)
try:
mae = mean_absolute_error(a, b)
if mode == "range":
denom = a.max() - a.min()
else:
denom = a.mean()
if denom is None or pd.isna(denom) or denom == 0:
return float("nan")
return mae / denom
except Exception:
return float("nan")
def _plot_series(idx, y1, y2, title, ylabel, out_path):
plt.figure(figsize=(10, 5))
plt.plot(idx, y1, label="Submission")
plt.plot(idx, y2, label="Answer")
plt.xlabel("Index")
plt.ylabel(ylabel)
plt.title(title)
plt.legend()
plt.tight_layout()
plt.savefig(out_path)
plt.close()
def grade(submission_df: pd.DataFrame, team_id: str = "submission") -> Tuple[pd.DataFrame, str]:
"""
์ž…๋ ฅ: ์‚ฌ์šฉ์ž๊ฐ€ ์—…๋กœ๋“œํ•œ CSV DataFrame
์ถœ๋ ฅ: (score_df, report_dir)
- score_df: RMSE/NMAE/TOTAL ์ง€ํ‘œ 1-row
- report_dir: ๊ทธ๋ž˜ํ”„ PNG๋“ค์ด ์ €์žฅ๋œ ํด๋” ๊ฒฝ๋กœ
"""
# --------------------------
# 1) ์ •๋‹ต/์ œ์ถœ ์ •๊ทœํ™” & ๋จธ์ง€
# --------------------------
answer = pd.read_csv(ANSWER_PATH)
answer = answer[['DATE_TIME', 'PLANT_ID', 'SOURCE_KEY', 'DC_POWER', 'AC_POWER', 'DAILY_YIELD']]
answer = answer.rename(columns={
'SOURCE_KEY': 'INVERTER_ID',
'DC_POWER' : 'ANS_DC_POWER',
'AC_POWER' : 'ANS_AC_POWER',
'DAILY_YIELD': 'ANS_DAILY_YIELD'
})
# ์ œ์ถœ ์ปฌ๋Ÿผ ๋ณด์ •
sub = submission_df.copy()
if 'SOURCE_KEY' in sub.columns and 'INVERTER_ID' not in sub.columns:
sub = sub.rename(columns={"SOURCE_KEY": "INVERTER_ID"})
# ํƒ€์ž…/์ •๋ ฌ ๋ณด์ •
for c in ['PLANT_ID', 'INVERTER_ID']:
if c in sub.columns:
sub[c] = sub[c].astype(str)
for c in ['PLANT_ID', 'INVERTER_ID']:
if c in answer.columns:
answer[c] = answer[c].astype(str)
# ๋‚ ์งœ ํŒŒ์‹ฑ (๋ถˆ๊ฐ€ ์‹œ ์›๋ฌธ ์œ ์ง€)
for df_ in (answer, sub):
if 'DATE_TIME' in df_.columns:
try:
df_['DATE_TIME'] = pd.to_datetime(df_['DATE_TIME'])
except Exception:
pass
merged_df = pd.merge(
answer, sub,
on=['DATE_TIME', 'PLANT_ID', 'INVERTER_ID'],
how='left',
suffixes=('', '_SUB')
).sort_values(by=['DATE_TIME', 'PLANT_ID', 'INVERTER_ID']).reset_index(drop=True)
# --------------------------
# 2) ์Šค์ผ€์ผ๋ง & ์ง€ํ‘œ ๊ณ„์‚ฐ
# --------------------------
# ์›๋ณธ ๊ฐ’
y_true_ac = merged_df.get('ANS_AC_POWER')
y_pred_ac = merged_df.get('AC_POWER')
# ์Šค์ผ€์ผ๋“œ
merged_df['AC_POWER_SCALED'] = _safe_minmax(merged_df.get('AC_POWER'))
merged_df['ANS_AC_POWER_SCALED'] = _safe_minmax(merged_df.get('ANS_AC_POWER'))
rmse_ac = _safe_rmse(y_pred_ac, y_true_ac)
rmse_ac_scaled = _safe_rmse(merged_df['AC_POWER_SCALED'], merged_df['ANS_AC_POWER_SCALED'])
# DAILY_YIELD
nmae_range, nmae_mean = float("nan"), float("nan")
if 'DAILY_YIELD' in merged_df.columns and 'ANS_DAILY_YIELD' in merged_df.columns:
nmae_range = _safe_nmae(merged_df['ANS_DAILY_YIELD'], merged_df['DAILY_YIELD'], mode="range")
nmae_mean = _safe_nmae(merged_df['ANS_DAILY_YIELD'], merged_df['DAILY_YIELD'], mode="mean")
# TOTAL ์ ์ˆ˜ (evaluate_model์˜ ๊ธฐ๋Œ€ ์ž…๋ ฅ์— ๋งž์ถค)
rmse_for_total = rmse_ac if not (pd.isna(rmse_ac) or math.isinf(rmse_ac)) else None
nmae_for_total = nmae_range if not (pd.isna(nmae_range) or math.isinf(nmae_range)) else None
try:
total = evaluate_model(rmse_for_total, nmae_for_total)
except Exception:
total = float("nan")
metrics = {
# app.py์—์„œ team_id/timestamp๋ฅผ ์•ž๋‹จ์— ์‚ฝ์ž…ํ•˜๋ฏ€๋กœ, grader๋Š” ์ง€ํ‘œ๋งŒ ์ฑ…์ž„์ง€๊ฒŒ ๊ตฌ์„ฑ.
"RMSE_AC": rmse_ac,
"RMSE_AC_SCALED": rmse_ac_scaled,
"NMAE_RANGE": nmae_range,
"NMAE_MEAN": nmae_mean,
"TOTAL": total,
}
score_df = pd.DataFrame([metrics])
# --------------------------
# 3) ๋ฆฌํฌํŒ… (๊ทธ๋ž˜ํ”„ PNG ์ €์žฅ)
# --------------------------
output_dir = f"output/{team_id}"
os.makedirs(output_dir, exist_ok=True)
# ์ธ๋ฑ์Šค: ๋™์ผํ•œ ๊ธธ์ด์˜ ์ •์ˆ˜ ์ธ๋ฑ์Šค๋กœ ์‹œ๊ฐํ™”(์ถ• ๊ฒน์นจ ์ตœ์†Œํ™”)
merged_df = merged_df.reset_index(drop=True)
idx = list(range(len(merged_df)))
# (A) AC_POWER ์›๋ณธ ๋น„๊ต
try:
_plot_series(
idx,
merged_df['AC_POWER'],
merged_df['ANS_AC_POWER'],
title="AC_POWER Comparison (Raw)",
ylabel="AC Power",
out_path=f"{output_dir}/ac_power_raw.png",
)
except Exception:
pass
# (B) AC_POWER ์Šค์ผ€์ผ๋“œ ๋น„๊ต
try:
_plot_series(
idx,
merged_df['AC_POWER_SCALED'],
merged_df['ANS_AC_POWER_SCALED'],
title="AC_POWER Comparison (Scaled 0-1)",
ylabel="Scaled AC Power",
out_path=f"{output_dir}/ac_power_scaled.png",
)
except Exception:
pass
# (C) Plant ๋‹จ์œ„ ์›๋ณธ ๋น„๊ต (์› ์š”์ฒญ ์œ ์ง€)
try:
for plant_id in merged_df['PLANT_ID'].dropna().unique():
plant_data = merged_df[merged_df['PLANT_ID'] == plant_id].reset_index(drop=True)
pidx = list(range(len(plant_data)))
_plot_series(
pidx,
plant_data['AC_POWER'],
plant_data['ANS_AC_POWER'],
title=f"Plant {plant_id} - AC_POWER Comparison",
ylabel="AC Power",
out_path=f"{output_dir}/ac_power_{plant_id}.png",
)
except Exception:
pass
# (D) DAILY_YIELD ๋น„๊ต(์กด์žฌ ์‹œ)
if 'DAILY_YIELD' in merged_df.columns and 'ANS_DAILY_YIELD' in merged_df.columns:
try:
_plot_series(
idx,
merged_df['DAILY_YIELD'],
merged_df['ANS_DAILY_YIELD'],
title="DAILY_YIELD Comparison",
ylabel="Daily Yield",
out_path=f"{output_dir}/daily_yield.png",
)
except Exception:
pass
return score_df, output_dir