# grader.py import os import math import pandas as pd import matplotlib.pyplot as plt from typing import Tuple from sklearn.preprocessing import MinMaxScaler from sklearn.metrics import mean_absolute_error, root_mean_squared_error from src.utils import evaluate_model # TOTAL 스코어 계산 함수 ANSWER_PATH = "answer.csv" # Space에 같이 넣은 정답 파일 def _safe_minmax(series: pd.Series) -> pd.Series: """모든 값이 같거나 전부 NaN인 경우에도 0으로 안전 스케일링.""" s = series.astype(float) if s.notna().sum() == 0: return pd.Series([float("nan")] * len(s), index=s.index) val_min = s.min() val_max = s.max() if pd.isna(val_min) or pd.isna(val_max) or val_min == val_max: # range가 0이면 전부 0.0으로(차이 정보 없음) return pd.Series([0.0 if not pd.isna(v) else float("nan") for v in s], index=s.index) return (s - val_min) / (val_max - val_min) def _safe_rmse(y_true: pd.Series, y_pred: pd.Series) -> float: """NaN/무한대 방어 RMSE.""" df = pd.concat([y_true, y_pred], axis=1).dropna() if df.shape[0] == 0: return float("nan") a = df.iloc[:, 0].astype(float) b = df.iloc[:, 1].astype(float) try: return root_mean_squared_error(a, b) except Exception: return float("nan") def _safe_nmae(y_true: pd.Series, y_pred: pd.Series, mode: str = "range") -> float: """ mode='range' -> MAE / (max(y_true) - min(y_true)) mode='mean' -> MAE / mean(y_true) 분모가 0/NaN이면 NaN 반환. """ df = pd.concat([y_true, y_pred], axis=1).dropna() if df.shape[0] == 0: return float("nan") a = df.iloc[:, 0].astype(float) b = df.iloc[:, 1].astype(float) try: mae = mean_absolute_error(a, b) if mode == "range": denom = a.max() - a.min() else: denom = a.mean() if denom is None or pd.isna(denom) or denom == 0: return float("nan") return mae / denom except Exception: return float("nan") def _plot_series(idx, y1, y2, title, ylabel, out_path): plt.figure(figsize=(10, 5)) plt.plot(idx, y1, label="Submission") plt.plot(idx, y2, label="Answer") plt.xlabel("Index") plt.ylabel(ylabel) plt.title(title) plt.legend() plt.tight_layout() plt.savefig(out_path) plt.close() def grade(submission_df: pd.DataFrame, team_id: str = "submission") -> Tuple[pd.DataFrame, str]: """ 입력: 사용자가 업로드한 CSV DataFrame 출력: (score_df, report_dir) - score_df: RMSE/NMAE/TOTAL 지표 1-row - report_dir: 그래프 PNG들이 저장된 폴더 경로 """ # -------------------------- # 1) 정답/제출 정규화 & 머지 # -------------------------- answer = pd.read_csv(ANSWER_PATH) answer = answer[['DATE_TIME', 'PLANT_ID', 'SOURCE_KEY', 'DC_POWER', 'AC_POWER', 'DAILY_YIELD']] answer = answer.rename(columns={ 'SOURCE_KEY': 'INVERTER_ID', 'DC_POWER' : 'ANS_DC_POWER', 'AC_POWER' : 'ANS_AC_POWER', 'DAILY_YIELD': 'ANS_DAILY_YIELD' }) # 제출 컬럼 보정 sub = submission_df.copy() if 'SOURCE_KEY' in sub.columns and 'INVERTER_ID' not in sub.columns: sub = sub.rename(columns={"SOURCE_KEY": "INVERTER_ID"}) # 타입/정렬 보정 for c in ['PLANT_ID', 'INVERTER_ID']: if c in sub.columns: sub[c] = sub[c].astype(str) for c in ['PLANT_ID', 'INVERTER_ID']: if c in answer.columns: answer[c] = answer[c].astype(str) # 날짜 파싱 (불가 시 원문 유지) for df_ in (answer, sub): if 'DATE_TIME' in df_.columns: try: df_['DATE_TIME'] = pd.to_datetime(df_['DATE_TIME']) except Exception: pass merged_df = pd.merge( answer, sub, on=['DATE_TIME', 'PLANT_ID', 'INVERTER_ID'], how='left', suffixes=('', '_SUB') ).sort_values(by=['DATE_TIME', 'PLANT_ID', 'INVERTER_ID']).reset_index(drop=True) # -------------------------- # 2) 스케일링 & 지표 계산 # -------------------------- # 원본 값 y_true_ac = merged_df.get('ANS_AC_POWER') y_pred_ac = merged_df.get('AC_POWER') # 스케일드 merged_df['AC_POWER_SCALED'] = _safe_minmax(merged_df.get('AC_POWER')) merged_df['ANS_AC_POWER_SCALED'] = _safe_minmax(merged_df.get('ANS_AC_POWER')) rmse_ac = _safe_rmse(y_pred_ac, y_true_ac) rmse_ac_scaled = _safe_rmse(merged_df['AC_POWER_SCALED'], merged_df['ANS_AC_POWER_SCALED']) # DAILY_YIELD nmae_range, nmae_mean = float("nan"), float("nan") if 'DAILY_YIELD' in merged_df.columns and 'ANS_DAILY_YIELD' in merged_df.columns: nmae_range = _safe_nmae(merged_df['ANS_DAILY_YIELD'], merged_df['DAILY_YIELD'], mode="range") nmae_mean = _safe_nmae(merged_df['ANS_DAILY_YIELD'], merged_df['DAILY_YIELD'], mode="mean") # TOTAL 점수 (evaluate_model의 기대 입력에 맞춤) rmse_for_total = rmse_ac if not (pd.isna(rmse_ac) or math.isinf(rmse_ac)) else None nmae_for_total = nmae_range if not (pd.isna(nmae_range) or math.isinf(nmae_range)) else None try: total = evaluate_model(rmse_for_total, nmae_for_total) except Exception: total = float("nan") metrics = { # app.py에서 team_id/timestamp를 앞단에 삽입하므로, grader는 지표만 책임지게 구성. "RMSE_AC": rmse_ac, "RMSE_AC_SCALED": rmse_ac_scaled, "NMAE_RANGE": nmae_range, "NMAE_MEAN": nmae_mean, "TOTAL": total, } score_df = pd.DataFrame([metrics]) # -------------------------- # 3) 리포팅 (그래프 PNG 저장) # -------------------------- output_dir = f"output/{team_id}" os.makedirs(output_dir, exist_ok=True) # 인덱스: 동일한 길이의 정수 인덱스로 시각화(축 겹침 최소화) merged_df = merged_df.reset_index(drop=True) idx = list(range(len(merged_df))) # (A) AC_POWER 원본 비교 try: _plot_series( idx, merged_df['AC_POWER'], merged_df['ANS_AC_POWER'], title="AC_POWER Comparison (Raw)", ylabel="AC Power", out_path=f"{output_dir}/ac_power_raw.png", ) except Exception: pass # (B) AC_POWER 스케일드 비교 try: _plot_series( idx, merged_df['AC_POWER_SCALED'], merged_df['ANS_AC_POWER_SCALED'], title="AC_POWER Comparison (Scaled 0-1)", ylabel="Scaled AC Power", out_path=f"{output_dir}/ac_power_scaled.png", ) except Exception: pass # (C) Plant 단위 원본 비교 (원 요청 유지) try: for plant_id in merged_df['PLANT_ID'].dropna().unique(): plant_data = merged_df[merged_df['PLANT_ID'] == plant_id].reset_index(drop=True) pidx = list(range(len(plant_data))) _plot_series( pidx, plant_data['AC_POWER'], plant_data['ANS_AC_POWER'], title=f"Plant {plant_id} - AC_POWER Comparison", ylabel="AC Power", out_path=f"{output_dir}/ac_power_{plant_id}.png", ) except Exception: pass # (D) DAILY_YIELD 비교(존재 시) if 'DAILY_YIELD' in merged_df.columns and 'ANS_DAILY_YIELD' in merged_df.columns: try: _plot_series( idx, merged_df['DAILY_YIELD'], merged_df['ANS_DAILY_YIELD'], title="DAILY_YIELD Comparison", ylabel="Daily Yield", out_path=f"{output_dir}/daily_yield.png", ) except Exception: pass return score_df, output_dir