| | |
| | import os |
| | import sys |
| | import json |
| | import time |
| | import argparse |
| | import numpy as np |
| | import pandas as pd |
| | import seaborn as sns |
| | from tqdm import tqdm |
| | from typing import Any |
| | from typing import List |
| | from typing import Dict |
| | from scipy import stats |
| | from pathlib import Path |
| | from typing import Tuple |
| | from datetime import datetime |
| | from dataclasses import asdict |
| | import matplotlib.pyplot as plt |
| | from dataclasses import dataclass |
| | from collections import defaultdict |
| | from sklearn.metrics import roc_auc_score |
| | from sklearn.metrics import confusion_matrix |
| | from sklearn.metrics import average_precision_score |
| | from sklearn.metrics import precision_recall_fscore_support |
| |
|
| |
|
| | |
| | sys.path.append(str(Path(__file__).parent.parent)) |
| |
|
| | from config.enums import Domain |
| | from services.orchestrator import DetectionOrchestrator |
| |
|
| |
|
| |
|
| | @dataclass |
| | class EvaluationSample: |
| | """ |
| | Single evaluation sample |
| | """ |
| | text_id : str |
| | domain : str |
| | ground_truth : str |
| | text : str |
| | file_path : str |
| | subset : str |
| |
|
| |
|
| | @dataclass |
| | class SingleEvalResult: |
| | """ |
| | Result for a single sample |
| | """ |
| | text_id : str |
| | domain : str |
| | ground_truth : str |
| | prediction : str |
| | verdict : str |
| | synthetic_prob : float |
| | authentic_prob : float |
| | hybrid_prob : float |
| | confidence : float |
| | uncertainty : float |
| | processing_time : float |
| | is_correct : bool | None |
| | subset : str |
| | word_count : int |
| |
|
| |
|
| | @dataclass |
| | class AggregatedMetrics: |
| | """ |
| | Aggregated performance metrics for 4-class system |
| | """ |
| | |
| | precision : float |
| | recall : float |
| | f1 : float |
| | accuracy : float |
| | |
| | |
| | coverage : float |
| | accuracy_at_cov : float |
| | f1_at_cov : float |
| | |
| | |
| | auroc : float |
| | auprc : float |
| | ece : float |
| | |
| | |
| | hybrid_detection_rate : float |
| | abstention_rate : float |
| | |
| | |
| | confusion_matrix : List[List[int]] |
| | support : Dict[str, int] |
| | |
| | |
| | verdict_distribution : Dict[str, int] |
| |
|
| |
|
| | @dataclass |
| | class LengthBucketMetrics: |
| | """ |
| | Performance metrics for a specific text length range |
| | """ |
| | min_words : int |
| | max_words : int |
| | label : str |
| | sample_count : int |
| | precision : float |
| | recall : float |
| | f1 : float |
| | accuracy : float |
| | mean_confidence : float |
| | fp_rate : float |
| | fn_rate : float |
| | avg_proc_time : float |
| | abstention_rate : float |
| |
|
| |
|
| | class TextAuthEvaluator: |
| | """ |
| | Comprehensive evaluation framework for TEXT-AUTH (4-class system) |
| | |
| | Handles verdicts: |
| | - "Synthetically-Generated" → prediction = "ai" |
| | - "Authentically-Written" → prediction = "human" |
| | - "Hybrid" → prediction = "hybrid" |
| | - "Uncertain" → prediction = "uncertain" |
| | """ |
| | def __init__(self, dataset_path: str = "evaluation", output_dir: str = "evaluation/results"): |
| | """ |
| | Initialize evaluator |
| | |
| | Arguments: |
| | ---------- |
| | dataset_path { str } : Path to evaluation directory |
| | |
| | output_dir { str } : Directory to save results |
| | """ |
| | self.dataset_path = Path(dataset_path) |
| | self.output_dir = Path(output_dir) |
| | self.output_dir.mkdir(exist_ok = True, |
| | parents = True, |
| | ) |
| | |
| | |
| | print("\nInitializing TEXT-AUTH Detection Orchestrator...") |
| | self.orchestrator = DetectionOrchestrator.create_with_executor(max_workers = 4, |
| | enable_language_detection = False, |
| | parallel_execution = True, |
| | skip_expensive_metrics = False, |
| | ) |
| | |
| | if not self.orchestrator.initialize(): |
| | raise RuntimeError("Failed to initialize detection orchestrator") |
| | |
| | print("\nOrchestrator initialized successfully\n") |
| | |
| | |
| | self.results = list() |
| | self.metadata = dict() |
| | |
| | |
| | self._load_metadata() |
| | |
| |
|
| | def _load_metadata(self): |
| | """ |
| | Load dataset metadata |
| | """ |
| | metadata_path = self.dataset_path / "metadata.json" |
| |
|
| | if metadata_path.exists(): |
| | with open(metadata_path, 'r') as f: |
| | self.metadata = json.load(f) |
| |
|
| | print(f"\nDataset: {self.metadata.get('dataset_name', 'Unknown')}") |
| | print(f" Version: {self.metadata.get('version', 'Unknown')}") |
| | print(f" Total samples: {self.metadata.get('total_samples', 'Unknown')}") |
| | print(f" Human: {self.metadata.get('human_samples', 'Unknown')}") |
| | print(f" AI: {self.metadata.get('ai_samples', 'Unknown')}") |
| |
|
| | if ('challenge_samples' in self.metadata): |
| | challenges = self.metadata['challenge_samples'] |
| | print(f" Paraphrased: {challenges.get('paraphrased', 0)}") |
| | print(f" Cross-model: {challenges.get('cross_model', 0)}") |
| |
|
| | print() |
| |
|
| | else: |
| | print("\nNo metadata.json found - run create_metadata.py first\n") |
| | |
| |
|
| | def load_dataset(self, domains: List[str] = None, max_samples_per_domain: int = None, subset_filter: str = None) -> List[EvaluationSample]: |
| | """ |
| | Load evaluation dataset |
| | |
| | Arguments: |
| | ---------- |
| | domains { list } : List of domains to evaluate (None = all) |
| | |
| | max_samples_per_domain { int } : Limit samples per domain |
| | |
| | subset_filter { str } : Only load specific subset |
| | |
| | Returns: |
| | -------- |
| | { list } : List of EvaluationSample objects |
| | """ |
| | samples = list() |
| | |
| | |
| | if (subset_filter is None or (subset_filter == "clean")): |
| | for subset_name, subset_dir in [("human", "human"), ("ai", "ai_generated")]: |
| | subset_path = self.dataset_path / subset_dir |
| | |
| | if not subset_path.exists(): |
| | print(f"Directory not found: {subset_path}") |
| | continue |
| | |
| | for domain_dir in subset_path.iterdir(): |
| | if not domain_dir.is_dir(): |
| | continue |
| | |
| | domain = domain_dir.name |
| | if domains and domain not in domains: |
| | continue |
| | |
| | files = list(domain_dir.glob("*.txt")) |
| | if max_samples_per_domain: |
| | files = files[:max_samples_per_domain] |
| | |
| | for file_path in files: |
| | try: |
| | with open(file_path, 'r', encoding = 'utf-8') as f: |
| | text = f.read() |
| | |
| | samples.append(EvaluationSample(text_id = file_path.stem, |
| | domain = domain, |
| | ground_truth = subset_name, |
| | text = text, |
| | file_path = str(file_path), |
| | subset = "clean", |
| | ) |
| | ) |
| |
|
| | except Exception as e: |
| | print(f"Error loading {file_path}: {e}") |
| | |
| | |
| | if subset_filter is None or subset_filter in ["paraphrased", "cross_model"]: |
| | adversarial_path = self.dataset_path / "adversarial" |
| | |
| | if adversarial_path.exists(): |
| | for challenge_name in ["paraphrased", "cross_model"]: |
| | if subset_filter and subset_filter != challenge_name: |
| | continue |
| | |
| | challenge_path = adversarial_path / challenge_name |
| | |
| | if not challenge_path.exists(): |
| | continue |
| | |
| | files = list(challenge_path.glob("*.txt")) |
| | |
| | for file_path in files: |
| | try: |
| | with open(file_path, 'r', encoding = 'utf-8') as f: |
| | text = f.read() |
| | |
| | |
| | domain = "general" |
| | for possible_domain in ["academic", "creative", "ai_ml", "software_dev", "technical_doc", "engineering", "science", "business", "legal", "medical", "journalism", "marketing", "social_media", "blog_personal", "tutorial", "general"]: |
| | if possible_domain in file_path.stem: |
| | domain = possible_domain |
| | break |
| | |
| | |
| | if domains and domain not in domains: |
| | continue |
| | |
| | samples.append(EvaluationSample(text_id = file_path.stem, |
| | domain = domain, |
| | ground_truth = "ai", |
| | text = text, |
| | file_path = str(file_path), |
| | subset = challenge_name, |
| | ) |
| | ) |
| | |
| | except Exception as e: |
| | print(f"Error loading {file_path}: {e}") |
| | |
| | print(f"\nLoaded {len(samples)} samples") |
| | |
| | return samples |
| | |
| |
|
| | def _map_verdict_to_prediction(self, verdict: str) -> str: |
| | """ |
| | Map system verdict to evaluation prediction class |
| | |
| | Arguments: |
| | ---------- |
| | verdict { str } : Raw verdict from system |
| | |
| | Returns: |
| | -------- |
| | { str } : Mapped prediction ("human", "ai", "hybrid", "uncertain") |
| | """ |
| | verdict_lower = verdict.lower() |
| | |
| | if (("synthetic" in verdict_lower) or ("generated" in verdict_lower)): |
| | return "ai" |
| | |
| | elif (("authentic" in verdict_lower) or ("written" in verdict_lower)): |
| | return "human" |
| | |
| | elif ("hybrid" in verdict_lower): |
| | return "hybrid" |
| | |
| | else: |
| | return "uncertain" |
| | |
| |
|
| | def run_evaluation(self, samples: List[EvaluationSample]): |
| | """ |
| | Run evaluation on all samples |
| | |
| | Arguments: |
| | ---------- |
| | samples { list } : List of EvaluationSample objects |
| | """ |
| | print(f"\nEvaluating {len(samples)} samples...") |
| | print("=" * 70) |
| | |
| | for i, sample in enumerate(tqdm(samples, desc = "Processing")): |
| | try: |
| | start_time = time.time() |
| | |
| | |
| | result = self.orchestrator.analyze(text = sample.text) |
| | |
| | proc_time = time.time() - start_time |
| | |
| | |
| | ensemble = result.ensemble_result |
| | verdict = ensemble.final_verdict |
| | prediction = self._map_verdict_to_prediction(verdict) |
| | synthetic_prob = ensemble.synthetic_probability |
| | authentic_prob = ensemble.authentic_probability |
| | hybrid_prob = ensemble.hybrid_probability |
| | confidence = ensemble.overall_confidence |
| | uncertainty = ensemble.uncertainty_score |
| | word_count = len(sample.text.split()) |
| | |
| | |
| | is_correct = None |
| | |
| | if ((prediction == "ai") and (sample.ground_truth == "ai")): |
| | is_correct = True |
| | |
| | elif ((prediction == "human") and (sample.ground_truth == "human")): |
| | is_correct = True |
| | |
| | elif ((prediction in ["ai", "human"]) and (prediction != sample.ground_truth)): |
| | is_correct = False |
| | |
| | |
| | elif ((prediction == "hybrid") and (sample.ground_truth == "ai")): |
| | is_correct = True |
| | |
| | elif ((prediction == "hybrid") and (sample.ground_truth == "human")): |
| | is_correct = False |
| | |
| | |
| | |
| | |
| | eval_result = SingleEvalResult(text_id = sample.text_id, |
| | domain = sample.domain, |
| | ground_truth = sample.ground_truth, |
| | prediction = prediction, |
| | verdict = verdict, |
| | synthetic_prob = synthetic_prob, |
| | authentic_prob = authentic_prob, |
| | hybrid_prob = hybrid_prob, |
| | confidence = confidence, |
| | uncertainty = uncertainty, |
| | processing_time = proc_time, |
| | is_correct = is_correct, |
| | subset = sample.subset, |
| | word_count = word_count, |
| | ) |
| | |
| | self.results.append(eval_result) |
| | |
| | except Exception as e: |
| | print(f"\nError processing sample {i}: {e}") |
| | continue |
| | |
| | print("\n" + "=" * 70) |
| | print(f"Evaluation complete: {len(self.results)}/{len(samples)} samples processed") |
| | |
| |
|
| | def calculate_metrics(self, domain: str = None, subset: str = None) -> AggregatedMetrics: |
| | """ |
| | Calculate aggregated metrics for 4-class system |
| | |
| | Arguments: |
| | ---------- |
| | domain { str } : Calculate for specific domain only |
| | |
| | subset { str } : Calculate for specific subset only |
| | |
| | Returns: |
| | -------- |
| | { AggregatedMetrics } : Aggregated metrics |
| | """ |
| | |
| | filtered = self.results |
| | |
| | if domain: |
| | filtered = [r for r in filtered if r.domain == domain] |
| | |
| | if subset: |
| | filtered = [r for r in filtered if r.subset == subset] |
| | |
| | if not filtered: |
| | return None |
| | |
| | |
| | decisive = [r for r in filtered if r.prediction != "uncertain"] |
| | uncertain = [r for r in filtered if r.prediction == "uncertain"] |
| | |
| | |
| | coverage = len(decisive) / len(filtered) if filtered else 0.0 |
| | |
| | |
| | verdict_dist = {"Synthetically-Generated" : sum(1 for r in filtered if r.verdict == "Synthetically-Generated"), |
| | "Authentically-Written" : sum(1 for r in filtered if r.verdict == "Authentically-Written"), |
| | "Hybrid" : sum(1 for r in filtered if r.verdict == "Hybrid"), |
| | "Uncertain" : sum(1 for r in filtered if r.verdict == "Uncertain"), |
| | } |
| | |
| | |
| | if decisive: |
| | |
| | y_true_binary = [1 if r.ground_truth == "ai" else 0 for r in decisive] |
| | y_pred_binary = [1 if r.prediction in ["ai", "hybrid"] else 0 for r in decisive] |
| | |
| | |
| | precision, recall, f1, support_array = precision_recall_fscore_support(y_true_binary, |
| | y_pred_binary, |
| | average = 'binary', |
| | pos_label = 1, |
| | zero_division = 0, |
| | ) |
| | |
| | accuracy = sum(1 for i, r in enumerate(decisive) if y_true_binary[i] == y_pred_binary[i]) / len(decisive) |
| | |
| | |
| | cm = confusion_matrix(y_true_binary, y_pred_binary) |
| | |
| | |
| | support = {"human" : sum(1 for r in decisive if r.ground_truth == "human"), |
| | "ai" : sum(1 for r in decisive if r.ground_truth == "ai"), |
| | } |
| | |
| | else: |
| | precision = recall = f1 = accuracy = 0.0 |
| | cm = [[0, 0], [0, 0]] |
| | support = {"human" : 0, "ai" : 0} |
| | |
| | |
| | y_true_prob = [1 if r.ground_truth == "ai" else 0 for r in filtered] |
| | y_scores = [r.synthetic_prob for r in filtered] |
| | |
| | try: |
| | auroc = roc_auc_score(y_true_prob, y_scores) |
| | |
| | except: |
| | auroc = 0.0 |
| | |
| | try: |
| | auprc = average_precision_score(y_true_prob, y_scores) |
| | |
| | except: |
| | auprc = 0.0 |
| | |
| | |
| | ece = self._calculate_ece(filtered) |
| | |
| | |
| | ai_samples = [r for r in filtered if r.ground_truth == "ai"] |
| | hybrid_detection_rate = sum(1 for r in ai_samples if r.prediction == "hybrid") / len(ai_samples) if ai_samples else 0.0 |
| | |
| | |
| | abstention_rate = len(uncertain) / len(filtered) if filtered else 0.0 |
| | |
| | return AggregatedMetrics(precision = precision, |
| | recall = recall, |
| | f1 = f1, |
| | accuracy = accuracy, |
| | coverage = coverage, |
| | accuracy_at_cov = accuracy, |
| | f1_at_cov = f1, |
| | auroc = auroc, |
| | auprc = auprc, |
| | ece = ece, |
| | hybrid_detection_rate = hybrid_detection_rate, |
| | abstention_rate = abstention_rate, |
| | confusion_matrix = cm.tolist(), |
| | support = support, |
| | verdict_distribution = verdict_dist, |
| | ) |
| | |
| |
|
| | def _calculate_ece(self, results: List[SingleEvalResult], n_bins: int = 10) -> float: |
| | """ |
| | Calculate Expected Calibration Error |
| | |
| | Arguments: |
| | ---------- |
| | results { list } : List of evaluation results |
| | |
| | n_bins { int } : Number of confidence bins |
| | |
| | Returns: |
| | -------- |
| | { float } : ECE value |
| | """ |
| | |
| | decisive = [r for r in results if r.prediction != "uncertain"] |
| | |
| | if not decisive: |
| | return 0.0 |
| | |
| | confidences = np.array([r.confidence for r in decisive]) |
| | predictions = np.array([1 if r.prediction in ["ai", "hybrid"] else 0 for r in decisive]) |
| | labels = np.array([1 if r.ground_truth == "ai" else 0 for r in decisive]) |
| | |
| | ece = 0.0 |
| | |
| | for i in range(n_bins): |
| | bin_lower = i / n_bins |
| | bin_upper = (i + 1) / n_bins |
| | |
| | in_bin = (confidences > bin_lower) & (confidences <= bin_upper) |
| | |
| | if (np.sum(in_bin) > 0): |
| | bin_accuracy = np.mean(predictions[in_bin] == labels[in_bin]) |
| | bin_confidence = np.mean(confidences[in_bin]) |
| | bin_size = np.sum(in_bin) |
| | |
| | ece += (bin_size / len(decisive)) * abs(bin_accuracy - bin_confidence) |
| | |
| | return ece |
| | |
| |
|
| | def analyze_by_length(self) -> Dict[str, LengthBucketMetrics]: |
| | """ |
| | Analyze performance across different text lengths |
| | |
| | Returns: |
| | -------- |
| | { dict } : Dictionary mapping label to LengthBucketMetrics |
| | """ |
| | |
| | length_buckets = [(0, 100, "Very Short (0-100)"), |
| | (100, 200, "Short (100-200)"), |
| | (200, 400, "Medium (200-400)"), |
| | (400, 600, "Medium-Long (400-600)"), |
| | (600, 1000, "Long (600-1000)"), |
| | (1000, float("inf"), "Very Long (1000+)"), |
| | ] |
| | |
| | bucket_metrics = dict() |
| | |
| | for min_words, max_words, label in length_buckets: |
| | |
| | filtered = [r for r in self.results if (min_words <= r.word_count < max_words)] |
| | |
| | if not filtered: |
| | continue |
| | |
| | |
| | abstained = [r for r in filtered if r.prediction in ["hybrid", "uncertain"]] |
| | decisive = [r for r in filtered if r.prediction not in ["hybrid", "uncertain"]] |
| | |
| | if (len(decisive) < 5): |
| | continue |
| |
|
| | |
| | y_true = [1 if (r.ground_truth == "ai") else 0 for r in decisive] |
| | y_pred = [1 if (r.prediction == "ai") else 0 for r in decisive] |
| | |
| | if not y_true: |
| | continue |
| | |
| | |
| | tp = sum(1 for i, _ in enumerate(y_true) if (y_true[i] == 1) and (y_pred[i] == 1)) |
| | fp = sum(1 for i, _ in enumerate(y_true) if (y_true[i] == 0) and (y_pred[i] == 1)) |
| | fn = sum(1 for i, _ in enumerate(y_true) if (y_true[i] == 1) and (y_pred[i] == 0)) |
| | |
| | precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0 |
| | recall = tp / (tp + fn) if (tp + fn) > 0 else 0.0 |
| | f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0 |
| | valid = [r for r in decisive if r.is_correct is not None] |
| | accuracy = sum(r.is_correct for r in valid) / len(valid) if valid else 0.0 |
| | |
| | |
| | mean_conf = np.mean([r.confidence for r in decisive]) |
| | fp_rate = fp / len(decisive) if len(decisive) > 0 else 0.0 |
| | fn_rate = fn / len(decisive) if len(decisive) > 0 else 0.0 |
| | avg_time = np.mean([r.processing_time for r in decisive]) |
| | |
| | bucket_metrics[label] = LengthBucketMetrics(min_words = min_words, |
| | max_words = max_words, |
| | label = label, |
| | sample_count = len(decisive), |
| | precision = precision, |
| | recall = recall, |
| | f1 = f1, |
| | accuracy = accuracy, |
| | mean_confidence = mean_conf, |
| | fp_rate = fp_rate, |
| | fn_rate = fn_rate, |
| | avg_proc_time = avg_time, |
| | abstention_rate = len(abstained) / (len(decisive) + len(abstained)) if (len(decisive) + len(abstained)) > 0 else 0.0, |
| | ) |
| | |
| | return bucket_metrics |
| |
|
| |
|
| | def print_length_analysis(self): |
| | """ |
| | Print length-based performance analysis |
| | """ |
| | print(f"\n{'=' * 80}") |
| | print("PERFORMANCE BY TEXT LENGTH") |
| | print("=" * 80) |
| | |
| | bucket_metrics = self.analyze_by_length() |
| | |
| | if not bucket_metrics: |
| | print(" No length analysis available") |
| | return |
| | |
| | print(f"\n{'Length Range':<25s} {'Samples':>8s} {'F1':>8s} {'Precision':>10s} {'Recall':>8s} {'Accuracy':>10s} {'Abstain':>10s} {'Time(s)':>8s}") |
| | print("─" * 80) |
| | |
| | for label, metrics in bucket_metrics.items(): |
| | print(f"{label:<25s} {metrics.sample_count:>8d} " |
| | f"{metrics.f1:>8.3f} {metrics.precision:>10.3f} " |
| | f"{metrics.recall:>8.3f} {metrics.accuracy:>10.3f} " |
| | f"{metrics.abstention_rate:>10.2%} " |
| | f"{metrics.avg_proc_time:>8.2f}" |
| | ) |
| | |
| | |
| | if (len(bucket_metrics) > 1): |
| | best = max(bucket_metrics.items(), key = lambda x: x[1].f1) |
| | worst = min(bucket_metrics.items(), key = lambda x: x[1].f1) |
| | |
| | print(f"\n Best Performance: {best[0]} (F1: {best[1].f1:.3f})") |
| | print(f" Worst Performance: {worst[0]} (F1: {worst[1].f1:.3f})") |
| | |
| | |
| | self._analyze_length_correlation(bucket_metrics = bucket_metrics) |
| |
|
| |
|
| | def _analyze_length_correlation(self, bucket_metrics: Dict[str, LengthBucketMetrics]): |
| | """ |
| | Analyze correlation between text length and performance |
| | |
| | Arguments: |
| | ---------- |
| | bucket_metrics { dict } : Dictionary of length bucket metrics |
| | """ |
| | if (len(bucket_metrics) < 3): |
| | return |
| | |
| | |
| | lengths = list() |
| | f1_scores = list() |
| | |
| | for metrics in bucket_metrics.values(): |
| | |
| | if (metrics.sample_count < 5): |
| | continue |
| |
|
| | |
| | if ((metrics.f1 == 0.0) and (metrics.precision == 0.0) and (metrics.recall == 0.0)): |
| | continue |
| |
|
| | |
| | if np.isinf(metrics.max_words): |
| | avg_length = metrics.min_words |
| |
|
| | else: |
| | avg_length = (metrics.min_words + metrics.max_words) / 2 |
| |
|
| | lengths.append(avg_length) |
| | f1_scores.append(metrics.f1) |
| |
|
| | lengths = np.asarray(lengths, dtype = float) |
| | f1_scores = np.asarray(f1_scores, dtype = float) |
| |
|
| | |
| | if (len(lengths) < 3): |
| | print("\n Length-Performance Correlation:") |
| | print(" Skipped (insufficient valid buckets)") |
| | return |
| |
|
| | if (not np.all(np.isfinite(lengths)) or not np.all(np.isfinite(f1_scores))): |
| | print("\n Length-Performance Correlation:") |
| | print(" Skipped (NaN / Inf detected)") |
| | return |
| |
|
| | if (np.std(f1_scores) == 0.0): |
| | print("\n Length-Performance Correlation:") |
| | print(" Skipped (zero variance in F1 scores)") |
| | return |
| |
|
| | |
| | corr, p_value = stats.pearsonr(lengths, f1_scores) |
| |
|
| | print(f"\n Length-Performance Correlation:") |
| | print(f" Pearson r = {corr:.3f} (p-value: {p_value:.4f})") |
| |
|
| | if p_value < 0.05: |
| | if (corr > 0.3): |
| | print(" → Significant POSITIVE correlation — performance improves with length\n") |
| |
|
| | elif (corr < -0.3): |
| | print(" → Significant NEGATIVE correlation — performance degrades with length\n") |
| |
|
| | else: |
| | print(" → Weak but statistically significant correlation\n") |
| | |
| | else: |
| | print(" → No statistically significant correlation\n") |
| |
|
| |
|
| | def generate_report(self): |
| | """ |
| | Generate comprehensive evaluation report |
| | """ |
| | print("\n" + "=" * 70) |
| | print("EVALUATION REPORT - 4-CLASS SYSTEM") |
| | print("=" * 70) |
| | |
| | |
| | overall = self.calculate_metrics() |
| | |
| | if overall: |
| | print("\nOverall Performance (Decisive Predictions):") |
| | print(f" Coverage: {overall.coverage:.1%} (decisive predictions)") |
| | print(f" Accuracy: {overall.accuracy:.1%}") |
| | print(f" Precision (AI): {overall.precision:.1%}") |
| | print(f" Recall (AI): {overall.recall:.1%}") |
| | print(f" F1 Score: {overall.f1:.1%}") |
| | print(f" AUROC: {overall.auroc:.3f}") |
| | print(f" AUPRC: {overall.auprc:.3f}") |
| | print(f" ECE (Calibration): {overall.ece:.3f}") |
| | |
| | print(f"\n4-Class Specific Metrics:") |
| | print(f" Abstention Rate: {overall.abstention_rate:.1%}") |
| | print(f" Hybrid Detection Rate: {overall.hybrid_detection_rate:.1%}") |
| | |
| | print("\n Verdict Distribution:") |
| | for verdict, count in overall.verdict_distribution.items(): |
| | pct = count / len(self.results) * 100 |
| | print(f" {verdict:30s}: {count:4d} ({pct:5.1f}%)") |
| | |
| | |
| | print("\n" + "-" * 70) |
| | print("Per-Domain Performance:") |
| | print("-" * 70) |
| | print(f"{'Domain':<20s} {'F1':>8s} {'Coverage':>10s} {'Abstain':>10s} {'Hybrid%':>10s}") |
| | print("-" * 70) |
| | |
| | domain_scores = list() |
| | |
| | for domain in sorted(set(r.domain for r in self.results)): |
| | metrics = self.calculate_metrics(domain = domain) |
| | |
| | if metrics and (metrics.support['ai'] + metrics.support['human']) >= 5: |
| | domain_scores.append((domain, metrics.f1, metrics.coverage)) |
| | print(f"{domain:<20s} {metrics.f1:>8.1%} {metrics.coverage:>10.1%} {metrics.abstention_rate:>10.1%} {metrics.hybrid_detection_rate:>10.1%}") |
| | |
| | |
| | print("\n" + "-" * 70) |
| | print("Per-Subset Performance:") |
| | print("-" * 70) |
| | |
| | for subset in sorted(set(r.subset for r in self.results)): |
| | metrics = self.calculate_metrics(subset = subset) |
| | |
| | if metrics: |
| | print(f"\n {subset.upper()}:") |
| | print(f" Samples: {metrics.support['human'] + metrics.support['ai']}") |
| | print(f" F1 Score: {metrics.f1:.1%}") |
| | print(f" Coverage: {metrics.coverage:.1%}") |
| | print(f" Abstention: {metrics.abstention_rate:.1%}") |
| | print(f" Hybrid Detection: {metrics.hybrid_detection_rate:.1%}") |
| | |
| | |
| | self.print_length_analysis() |
| | |
| | print("\n" + "=" * 70) |
| | |
| |
|
| | def save_results(self): |
| | """ |
| | Save evaluation results |
| | """ |
| | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| | |
| | |
| | overall = self.calculate_metrics() |
| | |
| | |
| | length_metrics = self.analyze_by_length() |
| | length_dict = {k: asdict(v) for k, v in length_metrics.items()} |
| | |
| | |
| | abstention_stats = {"total_uncertain" : sum(1 for r in self.results if r.prediction == "uncertain"), |
| | "total_hybrid" : sum(1 for r in self.results if r.prediction == "hybrid"), |
| | "abstention_rate" : sum(1 for r in self.results if r.prediction == "uncertain") / len(self.results) if self.results else 0, |
| | "hybrid_rate" : sum(1 for r in self.results if r.prediction == "hybrid") / len(self.results) if self.results else 0, |
| | "avg_uncertainty" : np.mean([r.uncertainty for r in self.results if r.prediction == "uncertain"]) if any(r.prediction == "uncertain" for r in self.results) else 0, |
| | } |
| | |
| | |
| | results_dict = [asdict(r) for r in self.results] |
| | json_path = self.output_dir / f"evaluation_results_{timestamp}.json" |
| |
|
| | with open(json_path, 'w') as f: |
| | json.dump(obj = {'metadata' : self.metadata, |
| | 'overall_metrics' : asdict(overall) if overall else {}, |
| | 'length_metrics' : length_dict, |
| | 'abstention' : abstention_stats, |
| | 'timestamp' : timestamp, |
| | 'results' : results_dict, |
| | }, |
| | fp = f, |
| | indent = 4, |
| | ) |
| |
|
| | print(f"\n✓ JSON results saved: {json_path}") |
| | |
| | |
| | df = pd.DataFrame(data = results_dict) |
| | csv_path = self.output_dir / f"evaluation_results_{timestamp}.csv" |
| | |
| | df.to_csv(csv_path, index = False) |
| | print(f"✓ CSV results saved: {csv_path}") |
| | |
| |
|
| | def plot_visualizations(self): |
| | """ |
| | Generate comprehensive evaluation visualizations |
| | """ |
| | fig, axes = plt.subplots(nrows = 2, |
| | ncols = 2, |
| | figsize = (18, 14), |
| | ) |
| |
|
| | plt.suptitle('TEXT-AUTH Evaluation Results (4-Class System)', |
| | fontsize = 18, |
| | fontweight = 'bold', |
| | y = 0.98, |
| | ) |
| |
|
| | |
| | ax1 = axes[0, 0] |
| | overall = self.calculate_metrics() |
| | |
| | if overall: |
| | cm = np.array(overall.confusion_matrix) |
| |
|
| | sns.heatmap(cm, |
| | annot = True, |
| | fmt = 'd', |
| | cmap = 'Blues', |
| | ax = ax1, |
| | xticklabels = ['Human', 'AI/Hybrid'], |
| | yticklabels = ['Human', 'AI'], |
| | ) |
| |
|
| | ax1.set_title('Confusion Matrix\n(Decisive Predictions Only)') |
| | ax1.set_xlabel('Predicted') |
| | ax1.set_ylabel('Actual') |
| |
|
| |
|
| | |
| | ax2 = axes[0, 1] |
| | domain_scores = list() |
| |
|
| | for domain in sorted(set(r.domain for r in self.results)): |
| | metrics = self.calculate_metrics(domain = domain) |
| |
|
| | if metrics and (metrics.support['ai'] + metrics.support['human']) >= 10: |
| | domain_scores.append((domain, metrics.f1)) |
| |
|
| | domain_scores.sort(key = lambda x: x[1]) |
| |
|
| | if domain_scores: |
| | domain_labels, domain_f1 = zip(*domain_scores) |
| |
|
| | ax2.barh(domain_labels, domain_f1, color = 'steelblue') |
| |
|
| | if overall: |
| | ax2.axvline(x = overall.f1, |
| | color = 'red', |
| | linestyle = '--', |
| | linewidth = 1.5, |
| | label = f'Overall ({overall.f1:.1%})', |
| | ) |
| |
|
| | ax2.set_xlim([0, 1]) |
| | ax2.set_xlabel('F1 Score') |
| | ax2.set_title('F1 Score by Domain') |
| | ax2.grid(axis = 'x', alpha = 0.3) |
| | ax2.legend() |
| |
|
| |
|
| | |
| | ax3 = axes[1, 0] |
| | |
| | if overall: |
| | verdict_counts = overall.verdict_distribution |
| | labels = list(verdict_counts.keys()) |
| | sizes = list(verdict_counts.values()) |
| | colors = ['#fee2e2', '#d1fae5', '#e9d5ff', '#fef3c7'] |
| | explode = (0.05, 0.05, 0.05, 0.05) |
| | |
| | ax3.pie(sizes, |
| | labels = labels, |
| | autopct = '%1.1f%%', |
| | colors = colors, |
| | explode = explode, |
| | startangle = 90, |
| | textprops = {'fontsize': 9}) |
| | ax3.set_title('4-Class Verdict Distribution') |
| |
|
| |
|
| | |
| | ax4 = axes[1, 1] |
| | subset_scores = list() |
| |
|
| | for subset in sorted(set(r.subset for r in self.results)): |
| | metrics = self.calculate_metrics(subset = subset) |
| |
|
| | if metrics: |
| | subset_scores.append((subset, metrics.f1, metrics.coverage, metrics.abstention_rate)) |
| |
|
| | if subset_scores: |
| | subset_labels = [s[0] for s in subset_scores] |
| | subset_f1 = [s[1] for s in subset_scores] |
| | subset_cov = [s[2] for s in subset_scores] |
| | subset_abs = [s[3] for s in subset_scores] |
| |
|
| | x = np.arange(len(subset_labels)) |
| | width = 0.25 |
| |
|
| | ax4.bar(x - width, subset_f1, width, label = 'F1 Score', color = 'steelblue') |
| | ax4.bar(x, subset_cov, width, label = 'Coverage', color = 'lightcoral') |
| | ax4.bar(x + width, subset_abs, width, label = 'Abstention', color = 'gold') |
| |
|
| | ax4.set_ylabel('Score / Rate') |
| | ax4.set_title('Performance & Behavior by Subset') |
| | ax4.set_xticks(x) |
| | ax4.set_xticklabels(subset_labels, rotation = 45, ha = 'right') |
| | ax4.legend() |
| | ax4.grid(axis = 'y', alpha = 0.3) |
| | ax4.set_ylim([0, 1]) |
| |
|
| |
|
| | plt.tight_layout(rect = [0, 0, 1, 0.96]) |
| |
|
| | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| | plot_path = self.output_dir / f"evaluation_plots_{timestamp}.png" |
| |
|
| | plt.savefig(plot_path, dpi = 300, bbox_inches = 'tight') |
| | plt.close() |
| |
|
| | print(f"✓ Main plots saved: {plot_path}") |
| | |
| | |
| | self.plot_length_visualizations() |
| |
|
| |
|
| | def plot_length_visualizations(self): |
| | """ |
| | Generate length-based performance visualizations |
| | """ |
| | bucket_metrics = self.analyze_by_length() |
| | |
| | if not bucket_metrics or len(bucket_metrics) < 2: |
| | return |
| | |
| | fig, axes = plt.subplots(nrows = 2, |
| | ncols = 2, |
| | figsize = (16, 12), |
| | ) |
| |
|
| | plt.suptitle('Performance Analysis by Text Length', |
| | fontsize = 16, |
| | fontweight = 'bold', |
| | y = 0.98, |
| | ) |
| |
|
| | labels = list(bucket_metrics.keys()) |
| | metrics_list = list(bucket_metrics.values()) |
| |
|
| | |
| | ax1 = axes[0, 0] |
| | |
| | f1_vals = [m.f1 for m in metrics_list] |
| | precision_vals = [m.precision for m in metrics_list] |
| | recall_vals = [m.recall for m in metrics_list] |
| | |
| | x = np.arange(len(labels)) |
| | width = 0.25 |
| | |
| | ax1.bar(x - width, f1_vals, width, label = 'F1', color = 'steelblue') |
| | ax1.bar(x, precision_vals, width, label = 'Precision', color = 'lightcoral') |
| | ax1.bar(x + width, recall_vals, width, label = 'Recall', color = 'lightgreen') |
| | |
| | ax1.set_ylabel('Score') |
| | ax1.set_title('Classification Metrics by Length') |
| | ax1.set_xticks(x) |
| | ax1.set_xticklabels(labels, rotation = 45, ha = 'right', fontsize = 9) |
| | ax1.legend() |
| | ax1.grid(axis = 'y', alpha = 0.3) |
| | ax1.set_ylim([0, 1]) |
| |
|
| | |
| | ax2 = axes[0, 1] |
| | |
| | sample_counts = [m.sample_count for m in metrics_list] |
| | |
| | ax2.bar(labels, sample_counts, color = 'mediumpurple') |
| | ax2.set_ylabel('Number of Samples') |
| | ax2.set_title('Sample Distribution by Length') |
| | ax2.set_xticklabels(labels, rotation = 45, ha = 'right', fontsize = 9) |
| | ax2.grid(axis = 'y', alpha = 0.3) |
| |
|
| | |
| | ax3 = axes[1, 0] |
| | |
| | proc_times = [m.avg_proc_time for m in metrics_list] |
| | |
| | ax3.plot(labels, proc_times, marker = 'o', linewidth = 2, markersize = 8, color = 'darkorange') |
| | ax3.set_ylabel('Processing Time (seconds)') |
| | ax3.set_title('Average Processing Time by Length') |
| | ax3.set_xticklabels(labels, rotation = 45, ha = 'right', fontsize = 9) |
| | ax3.grid(alpha = 0.3) |
| |
|
| | |
| | ax4 = axes[1, 1] |
| | |
| | abstention_rates = [m.abstention_rate * 100 for m in metrics_list] |
| | |
| | ax4.bar(labels, abstention_rates, color = 'gold') |
| | ax4.set_ylabel('Abstention Rate (%)') |
| | ax4.set_title('Abstention Rate by Length') |
| | ax4.set_xticklabels(labels, rotation = 45, ha = 'right', fontsize = 9) |
| | ax4.grid(axis = 'y', alpha = 0.3) |
| |
|
| | plt.tight_layout(rect = [0, 0, 1, 0.96]) |
| |
|
| | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| | plot_path = self.output_dir / f"length_analysis_{timestamp}.png" |
| |
|
| | plt.savefig(plot_path, dpi = 300, bbox_inches = 'tight') |
| | plt.close() |
| |
|
| | print(f"✓ Length analysis plots saved: {plot_path}") |
| |
|
| |
|
| | def main(): |
| | parser = argparse.ArgumentParser(description = 'Run TEXT-AUTH evaluation (4-class system)') |
| | parser.add_argument('--dataset', type = str, default = 'evaluation', help = 'Path to evaluation directory') |
| | parser.add_argument('--output', type = str, default = 'evaluation/results', help = 'Output directory for results') |
| | parser.add_argument('--quick-test', action = 'store_true', help = 'Run quick test on 10 samples per domain') |
| | parser.add_argument('--samples', type = int, default = None, help = 'Maximum samples per domain') |
| | parser.add_argument('--domains', type = str, nargs = '+', default = None, help = 'Specific domains to evaluate') |
| | parser.add_argument('--subset', type = str, choices = ['clean', 'paraphrased', 'cross_model'], help = 'Evaluate only specific subset') |
| | |
| | args = parser.parse_args() |
| | |
| | |
| | evaluator = TextAuthEvaluator(dataset_path = args.dataset, |
| | output_dir = args.output, |
| | ) |
| | |
| | |
| | max_samples = 10 if args.quick_test else args.samples |
| | samples = evaluator.load_dataset(domains = args.domains, |
| | max_samples_per_domain = max_samples, |
| | subset_filter = args.subset, |
| | ) |
| | |
| | if not samples: |
| | print("No samples loaded. Check dataset path and run data collection scripts.") |
| | return 1 |
| | |
| | |
| | evaluator.run_evaluation(samples) |
| | |
| | |
| | evaluator.generate_report() |
| | |
| | |
| | evaluator.save_results() |
| | |
| | |
| | evaluator.plot_visualizations() |
| | |
| | print("\n✓ Evaluation complete!\n") |
| | return 0 |
| |
|
| |
|
| |
|
| | |
| | if __name__ == "__main__": |
| | sys.exit(main()) |