Spaces:
Runtime error
Runtime error
| import os | |
| import pandas as pd | |
| import numpy as np | |
| from sklearn.metrics import f1_score, accuracy_score, precision_score, recall_score | |
| from typing import Dict, List | |
| import json | |
| from utils.except_dir import cust_listdir | |
| class MetricsEvaluator: | |
| def __init__(self, pred_dir: str, label_dir: str, save_dir: str): | |
| """ | |
| Args: | |
| pred_dir: 예측 csv 파일들이 있는 디렉토리 경로 | |
| label_dir: 정답 csv 파일들이 있는 디렉토리 경로 | |
| save_dir: 결과를 저장할 디렉토리 경로 | |
| """ | |
| self.pred_dir = pred_dir | |
| self.label_dir = label_dir | |
| self.save_dir = save_dir | |
| def evaluate(self) -> Dict: | |
| """전체 평가 수행""" | |
| category_metrics = {} # 카테고리별 평균 성능 저장 | |
| all_metrics = { # 모든 카테고리 통합 메트릭 | |
| 'falldown': {'f1': [], 'accuracy': [], 'precision': [], 'recall': [], 'specificity': []}, | |
| 'violence': {'f1': [], 'accuracy': [], 'precision': [], 'recall': [], 'specificity': []}, | |
| 'fire': {'f1': [], 'accuracy': [], 'precision': [], 'recall': [], 'specificity': []} | |
| } | |
| # 모든 카테고리의 metrics를 저장할 DataFrame 리스트 | |
| all_categories_metrics = [] | |
| for category in cust_listdir(self.pred_dir): | |
| if not os.path.isdir(os.path.join(self.pred_dir, category)): | |
| continue | |
| pred_category_path = os.path.join(self.pred_dir, category) | |
| label_category_path = os.path.join(self.label_dir, category) | |
| save_category_path = os.path.join(self.save_dir, category) | |
| os.makedirs(save_category_path, exist_ok=True) | |
| # 결과 저장을 위한 데이터프레임 생성 | |
| metrics_df = self._evaluate_category(category, pred_category_path, label_category_path) | |
| metrics_df['category'] = category | |
| metrics_df.to_csv(os.path.join(save_category_path, f"{category}_metrics.csv"), index=False) | |
| all_categories_metrics.append(metrics_df) | |
| # 카테고리별 평균 성능 저장 | |
| category_metrics[category] = metrics_df.iloc[-1].to_dict() # 마지막 row(평균) | |
| # 전체 평균을 위한 메트릭 수집 | |
| # for col in metrics_df.columns: | |
| # if col != 'video_name': | |
| # event_type, metric_type = col.split('_') | |
| # all_metrics[event_type][metric_type].append(category_metrics[category][col]) | |
| for col in metrics_df.columns: | |
| if col != 'video_name': | |
| try: | |
| # 첫 번째 언더스코어를 기준으로 이벤트 타입과 메트릭 타입 분리 | |
| parts = col.split('_', 1) # maxsplit=1로 첫 번째 언더스코어에서만 분리 | |
| if len(parts) == 2: | |
| event_type, metric_type = parts | |
| if event_type in all_metrics and metric_type in all_metrics[event_type]: | |
| all_metrics[event_type][metric_type].append(category_metrics[category][col]) | |
| except Exception as e: | |
| print(f"Warning: Could not process column {col}: {str(e)}") | |
| continue | |
| # 각 DataFrame에서 마지막 행(average)을 제거 | |
| all_categories_metrics_without_avg = [df.iloc[:-1] for df in all_categories_metrics] | |
| # 모든 카테고리의 metrics를 하나의 DataFrame으로 합치기 | |
| combined_metrics_df = pd.concat(all_categories_metrics_without_avg, ignore_index=True) | |
| # 합쳐진 metrics를 json 파일과 같은 위치에 저장 | |
| combined_metrics_df.to_csv(os.path.join(self.save_dir, "all_categories_metrics.csv"), index=False) | |
| # 결과 출력 | |
| # print("\nCategory-wise Average Metrics:") | |
| # for category, metrics in category_metrics.items(): | |
| # print(f"\n{category}:") | |
| # for metric_name, value in metrics.items(): | |
| # if metric_name != "video_name": | |
| # print(f"{metric_name}: {value:.3f}") | |
| print("\nCategory-wise Average Metrics:") | |
| for category, metrics in category_metrics.items(): | |
| print(f"\n{category}:") | |
| for metric_name, value in metrics.items(): | |
| if metric_name != "video_name": | |
| try: | |
| if isinstance(value, str): | |
| print(f"{metric_name}: {value}") | |
| elif metric_name in ['tp', 'tn', 'fp', 'fn']: | |
| print(f"{metric_name}: {int(value)}") | |
| else: | |
| print(f"{metric_name}: {float(value):.3f}") | |
| except (ValueError, TypeError): | |
| print(f"{metric_name}: {value}") | |
| # 전체 평균 계산 및 출력 | |
| print("\n" + "="*50) | |
| print("Overall Average Metrics Across All Categories:") | |
| print("="*50) | |
| # for event_type in all_metrics: | |
| # print(f"\n{event_type}:") | |
| # for metric_type, values in all_metrics[event_type].items(): | |
| # avg_value = np.mean(values) | |
| # print(f"{metric_type}: {avg_value:.3f}") | |
| for event_type in all_metrics: | |
| print(f"\n{event_type}:") | |
| for metric_type, values in all_metrics[event_type].items(): | |
| avg_value = np.mean(values) | |
| if metric_type in ['tp', 'tn', 'fp', 'fn']: # 정수 값 | |
| print(f"{metric_type}: {int(avg_value)}") | |
| else: # 소수점 값 | |
| print(f"{metric_type}: {avg_value:.3f}") | |
| ################################################################################################## | |
| # 최종 결과를 저장할 딕셔너리 | |
| final_results = { | |
| "category_metrics": {}, | |
| "overall_metrics": {} | |
| } | |
| # 카테고리별 메트릭 저장 | |
| for category, metrics in category_metrics.items(): | |
| final_results["category_metrics"][category] = {} | |
| for metric_name, value in metrics.items(): | |
| if metric_name != "video_name": | |
| if isinstance(value, (int, float)): | |
| final_results["category_metrics"][category][metric_name] = float(value) | |
| # 전체 평균 계산 및 저장 | |
| for event_type in all_metrics: | |
| # print(f"\n{event_type}:") | |
| final_results["overall_metrics"][event_type] = {} | |
| for metric_type, values in all_metrics[event_type].items(): | |
| avg_value = float(np.mean(values)) | |
| # print(f"{metric_type}: {avg_value:.3f}") | |
| final_results["overall_metrics"][event_type][metric_type] = avg_value | |
| # JSON 파일로 저장 | |
| json_path = os.path.join(self.save_dir, "overall_metrics.json") | |
| with open(json_path, 'w', encoding='utf-8') as f: | |
| json.dump(final_results, f, indent=4) | |
| # return category_metrics | |
| # 누적 메트릭 계산 | |
| accumulated_metrics = self.calculate_accumulated_metrics(combined_metrics_df) | |
| # JSON에 누적 메트릭 추가 | |
| final_results["accumulated_metrics"] = accumulated_metrics | |
| # 누적 메트릭만 따로 저장 | |
| accumulated_json_path = os.path.join(self.save_dir, "accumulated_metrics.json") | |
| with open(accumulated_json_path, 'w', encoding='utf-8') as f: | |
| json.dump(accumulated_metrics, f, indent=4) | |
| return accumulated_metrics | |
| def _evaluate_category(self, category: str, pred_path: str, label_path: str) -> pd.DataFrame: | |
| """카테고리별 평가 수행""" | |
| results = [] | |
| metrics_columns = ['video_name'] | |
| for pred_file in cust_listdir(pred_path): | |
| if not pred_file.endswith('.csv'): | |
| continue | |
| video_name = os.path.splitext(pred_file)[0] | |
| pred_df = pd.read_csv(os.path.join(pred_path, pred_file)) | |
| # 해당 비디오의 정답 CSV 파일 로드 | |
| label_file = f"{video_name}.csv" | |
| label_path_full = os.path.join(label_path, label_file) | |
| if not os.path.exists(label_path_full): | |
| print(f"Warning: Label file not found for {video_name}") | |
| continue | |
| label_df = pd.read_csv(label_path_full) | |
| # 각 카테고리별 메트릭 계산 | |
| video_metrics = {'video_name': video_name} | |
| categories = [col for col in pred_df.columns if col != 'frame'] | |
| for cat in categories: | |
| # 정답값과 예측값 | |
| y_true = label_df[cat].values | |
| y_pred = pred_df[cat].values | |
| # 메트릭 계산 | |
| metrics = self._calculate_metrics(y_true, y_pred) | |
| # 결과 저장 | |
| for metric_name, value in metrics.items(): | |
| col_name = f"{cat}_{metric_name}" | |
| video_metrics[col_name] = value | |
| if col_name not in metrics_columns: | |
| metrics_columns.append(col_name) | |
| results.append(video_metrics) | |
| # 결과를 데이터프레임으로 변환 | |
| metrics_df = pd.DataFrame(results, columns=metrics_columns) | |
| # 평균 계산하여 추가 | |
| avg_metrics = {'video_name': 'average'} | |
| for col in metrics_columns[1:]: # video_name 제외 | |
| avg_metrics[col] = metrics_df[col].mean() | |
| metrics_df = pd.concat([metrics_df, pd.DataFrame([avg_metrics])], ignore_index=True) | |
| return metrics_df | |
| # def _calculate_metrics(self, y_true: np.ndarray, y_pred: np.ndarray) -> Dict: | |
| # """성능 지표 계산""" | |
| # tn = np.sum((y_true == 0) & (y_pred == 0)) | |
| # fp = np.sum((y_true == 0) & (y_pred == 1)) | |
| # metrics = { | |
| # 'f1': f1_score(y_true, y_pred, zero_division=0), | |
| # 'accuracy': accuracy_score(y_true, y_pred), | |
| # 'precision': precision_score(y_true, y_pred, zero_division=0), | |
| # 'recall': recall_score(y_true, y_pred, zero_division=0), | |
| # 'specificity': tn / (tn + fp) if (tn + fp) > 0 else 0 | |
| # } | |
| # return metrics | |
| def calculate_accumulated_metrics(self, all_categories_metrics_df: pd.DataFrame) -> Dict: | |
| """누적된 혼동행렬로 각 카테고리별 성능 지표 계산""" | |
| accumulated_results = {"micro_avg": {}} | |
| categories = ['falldown', 'violence', 'fire'] | |
| for category in categories: | |
| # 해당 카테고리의 혼동행렬 값들 누적 | |
| tp = all_categories_metrics_df[f'{category}_tp'].sum() | |
| tn = all_categories_metrics_df[f'{category}_tn'].sum() | |
| fp = all_categories_metrics_df[f'{category}_fp'].sum() | |
| fn = all_categories_metrics_df[f'{category}_fn'].sum() | |
| # 기본 메트릭 계산 | |
| metrics = { | |
| 'tp': int(tp), | |
| 'tn': int(tn), | |
| 'fp': int(fp), | |
| 'fn': int(fn), | |
| 'accuracy': (tp + tn) / (tp + tn + fp + fn) if (tp + tn + fp + fn) > 0 else 0, | |
| 'precision': tp / (tp + fp) if (tp + fp) > 0 else 0, | |
| 'recall': tp / (tp + fn) if (tp + fn) > 0 else 0, | |
| 'specificity': tn / (tn + fp) if (tn + fp) > 0 else 0, | |
| 'f1': 2 * tp / (2 * tp + fp + fn) if (2 * tp + fp + fn) > 0 else 0, | |
| } | |
| # 추가 메트릭 계산 | |
| tpr = metrics['recall'] # TPR = recall | |
| tnr = metrics['specificity'] # TNR = specificity | |
| # Balanced Accuracy | |
| metrics['balanced_accuracy'] = (tpr + tnr) / 2 | |
| # G-Mean | |
| metrics['g_mean'] = np.sqrt(tpr * tnr) if (tpr * tnr) > 0 else 0 | |
| # MCC (Matthews Correlation Coefficient) | |
| numerator = (tp * tn) - (fp * fn) | |
| denominator = np.sqrt((tp + fp) * (tp + fn) * (tn + fp) * (tn + fn)) | |
| metrics['mcc'] = numerator / denominator if denominator > 0 else 0 | |
| # NPV (Negative Predictive Value) | |
| metrics['npv'] = tn / (tn + fn) if (tn + fn) > 0 else 0 | |
| # FAR (False Alarm Rate) = FPR = 1 - specificity | |
| metrics['far'] = 1 - metrics['specificity'] | |
| accumulated_results[category] = metrics | |
| # 전체 카테고리의 누적 값으로 계산 | |
| total_tp = sum(accumulated_results[cat]['tp'] for cat in categories) | |
| total_tn = sum(accumulated_results[cat]['tn'] for cat in categories) | |
| total_fp = sum(accumulated_results[cat]['fp'] for cat in categories) | |
| total_fn = sum(accumulated_results[cat]['fn'] for cat in categories) | |
| # micro average 계산 (전체 누적 값으로 계산) | |
| accumulated_results["micro_avg"] = { | |
| 'tp': int(total_tp), | |
| 'tn': int(total_tn), | |
| 'fp': int(total_fp), | |
| 'fn': int(total_fn), | |
| 'accuracy': (total_tp + total_tn) / (total_tp + total_tn + total_fp + total_fn), | |
| 'precision': total_tp / (total_tp + total_fp) if (total_tp + total_fp) > 0 else 0, | |
| 'recall': total_tp / (total_tp + total_fn) if (total_tp + total_fn) > 0 else 0, | |
| 'f1': 2 * total_tp / (2 * total_tp + total_fp + total_fn) if (2 * total_tp + total_fp + total_fn) > 0 else 0, | |
| # ... (다른 메트릭들도 동일한 방식으로 계산) | |
| } | |
| return accumulated_results | |
| def _calculate_metrics(self, y_true: np.ndarray, y_pred: np.ndarray) -> Dict: | |
| """성능 지표 계산""" | |
| tn = np.sum((y_true == 0) & (y_pred == 0)) | |
| fp = np.sum((y_true == 0) & (y_pred == 1)) | |
| fn = np.sum((y_true == 1) & (y_pred == 0)) | |
| tp = np.sum((y_true == 1) & (y_pred == 1)) | |
| metrics = { | |
| 'f1': f1_score(y_true, y_pred, zero_division=0), | |
| 'accuracy': accuracy_score(y_true, y_pred), | |
| 'precision': precision_score(y_true, y_pred, zero_division=0), | |
| 'recall': recall_score(y_true, y_pred, zero_division=0), | |
| 'specificity': tn / (tn + fp) if (tn + fp) > 0 else 0, | |
| 'tp': int(tp), | |
| 'tn': int(tn), | |
| 'fp': int(fp), | |
| 'fn': int(fn) | |
| } | |
| return metrics |