| """ |
| Export Utilities for AegisLM Reporting System |
| |
| Production-grade utilities for JSON and CSV report export with integrity verification. |
| """ |
|
|
| import json |
| import csv |
| import hashlib |
| import os |
| from datetime import datetime |
| from typing import Dict, Any, Optional, List |
| from pathlib import Path |
| import sys |
|
|
| |
| current_dir = Path(__file__).parent |
| backend_dir = current_dir.parent |
| if str(backend_dir) not in sys.path: |
| sys.path.insert(0, str(backend_dir)) |
|
|
| from schemas.report_schema import ( |
| FullReport, SummaryReport, CSVReportData, ReportFormat, ReportType |
| ) |
|
|
|
|
| class ReportExporter: |
| """ |
| Production-grade report exporter with integrity verification. |
| |
| Handles JSON and CSV export with file management and checksums. |
| """ |
| |
| def __init__(self, reports_dir: Optional[str] = None): |
| """ |
| Initialize report exporter. |
| |
| Args: |
| reports_dir: Directory for storing reports |
| """ |
| if reports_dir is None: |
| |
| current_file = Path(__file__) |
| self.reports_dir = current_file.parent.parent / "reports" |
| else: |
| self.reports_dir = Path(reports_dir) |
| |
| |
| self.reports_dir.mkdir(parents=True, exist_ok=True) |
| |
| def export_json_report(self, report_data: Dict[str, Any], run_id: str, |
| report_type: ReportType = ReportType.FULL) -> str: |
| """ |
| Export report as JSON file. |
| |
| Args: |
| report_data: Report data to export |
| run_id: Associated run ID |
| report_type: Type of report |
| |
| Returns: |
| File path of exported report |
| |
| Raises: |
| ValueError: If export fails |
| """ |
| try: |
| |
| timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") |
| filename = f"run_{run_id}_{report_type.value}_{timestamp}.json" |
| file_path = self.reports_dir / filename |
| |
| |
| if file_path.exists(): |
| counter = 1 |
| while file_path.exists(): |
| stem = file_path.stem |
| suffix = file_path.suffix |
| new_filename = f"{stem}_{counter}{suffix}" |
| file_path = self.reports_dir / new_filename |
| counter += 1 |
| |
| |
| export_data = { |
| "export_metadata": { |
| "exported_at": datetime.utcnow().isoformat(), |
| "export_format": "json", |
| "report_type": report_type.value, |
| "run_id": run_id, |
| "file_version": "1.0" |
| }, |
| "report_data": report_data |
| } |
| |
| |
| with open(file_path, 'w', encoding='utf-8') as f: |
| json.dump(export_data, f, indent=2, ensure_ascii=False, sort_keys=True) |
| |
| |
| file_checksum = self._calculate_file_checksum(file_path) |
| |
| |
| export_data["export_metadata"]["file_checksum"] = file_checksum |
| export_data["export_metadata"]["file_size_bytes"] = file_path.stat().st_size |
| |
| |
| with open(file_path, 'w', encoding='utf-8') as f: |
| json.dump(export_data, f, indent=2, ensure_ascii=False, sort_keys=True) |
| |
| return str(file_path) |
| |
| except Exception as e: |
| raise ValueError(f"Failed to export JSON report: {str(e)}") |
| |
| def export_csv_summary(self, report_data: Dict[str, Any], run_id: str) -> str: |
| """ |
| Export report summary as CSV file. |
| |
| Args: |
| report_data: Report data to export |
| run_id: Associated run ID |
| |
| Returns: |
| File path of exported CSV |
| |
| Raises: |
| ValueError: If export fails |
| """ |
| try: |
| |
| timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") |
| filename = f"run_{run_id}_summary_{timestamp}.csv" |
| file_path = self.reports_dir / filename |
| |
| |
| if file_path.exists(): |
| counter = 1 |
| while file_path.exists(): |
| stem = file_path.stem |
| suffix = file_path.suffix |
| new_filename = f"{stem}_{counter}{suffix}" |
| file_path = self.reports_dir / new_filename |
| counter += 1 |
| |
| |
| csv_data = self._extract_csv_data(report_data, run_id) |
| |
| |
| with open(file_path, 'w', newline='', encoding='utf-8') as f: |
| if csv_data: |
| |
| fieldnames = csv_data[0].keys() |
| writer = csv.DictWriter(f, fieldnames=fieldnames) |
| writer.writeheader() |
| writer.writerows(csv_data) |
| else: |
| |
| writer = csv.writer(f) |
| writer.writerow(['run_id', 'model_name', 'dataset_name', 'status', 'created_at']) |
| writer.writerow([run_id, 'Unknown', 'Unknown', 'Unknown', datetime.utcnow().isoformat()]) |
| |
| |
| file_checksum = self._calculate_file_checksum(file_path) |
| |
| |
| metadata_filename = filename.replace('.csv', '_metadata.json') |
| metadata_path = self.reports_dir / metadata_filename |
| |
| metadata = { |
| "export_metadata": { |
| "exported_at": datetime.utcnow().isoformat(), |
| "export_format": "csv", |
| "run_id": run_id, |
| "file_version": "1.0", |
| "file_checksum": file_checksum, |
| "file_size_bytes": file_path.stat().st_size, |
| "csv_rows": len(csv_data) if csv_data else 1 |
| } |
| } |
| |
| with open(metadata_path, 'w', encoding='utf-8') as f: |
| json.dump(metadata, f, indent=2, ensure_ascii=False, sort_keys=True) |
| |
| return str(file_path) |
| |
| except Exception as e: |
| raise ValueError(f"Failed to export CSV summary: {str(e)}") |
| |
| def export_csv_detailed(self, report_data: Dict[str, Any], run_id: str) -> str: |
| """ |
| Export detailed report data as CSV file. |
| |
| Args: |
| report_data: Report data to export |
| run_id: Associated run ID |
| |
| Returns: |
| File path of exported CSV |
| |
| Raises: |
| ValueError: If export fails |
| """ |
| try: |
| |
| timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") |
| filename = f"run_{run_id}_detailed_{timestamp}.csv" |
| file_path = self.reports_dir / filename |
| |
| |
| if file_path.exists(): |
| counter = 1 |
| while file_path.exists(): |
| stem = file_path.stem |
| suffix = file_path.suffix |
| new_filename = f"{stem}_{counter}{suffix}" |
| file_path = self.reports_dir / new_filename |
| counter += 1 |
| |
| |
| csv_data = self._extract_detailed_csv_data(report_data, run_id) |
| |
| |
| with open(file_path, 'w', newline='', encoding='utf-8') as f: |
| if csv_data: |
| |
| fieldnames = csv_data[0].keys() |
| writer = csv.DictWriter(f, fieldnames=fieldnames) |
| writer.writeheader() |
| writer.writerows(csv_data) |
| else: |
| |
| writer = csv.writer(f) |
| writer.writerow(['run_id', 'metric_type', 'metric_name', 'metric_value', 'timestamp']) |
| writer.writerow([run_id, 'Unknown', 'Unknown', 'Unknown', datetime.utcnow().isoformat()]) |
| |
| |
| file_checksum = self._calculate_file_checksum(file_path) |
| |
| |
| metadata_filename = filename.replace('.csv', '_metadata.json') |
| metadata_path = self.reports_dir / metadata_filename |
| |
| metadata = { |
| "export_metadata": { |
| "exported_at": datetime.utcnow().isoformat(), |
| "export_format": "csv", |
| "run_id": run_id, |
| "file_version": "1.0", |
| "file_checksum": file_checksum, |
| "file_size_bytes": file_path.stat().st_size, |
| "csv_rows": len(csv_data) if csv_data else 1 |
| } |
| } |
| |
| with open(metadata_path, 'w', encoding='utf-8') as f: |
| json.dump(metadata, f, indent=2, ensure_ascii=False, sort_keys=True) |
| |
| return str(file_path) |
| |
| except Exception as e: |
| raise ValueError(f"Failed to export detailed CSV: {str(e)}") |
| |
| def export_batch_reports(self, reports: List[Dict[str, Any]], |
| format: ReportFormat = ReportFormat.JSON) -> List[str]: |
| """ |
| Export multiple reports in batch. |
| |
| Args: |
| reports: List of report data to export |
| format: Export format |
| |
| Returns: |
| List of file paths |
| |
| Raises: |
| ValueError: If batch export fails |
| """ |
| file_paths = [] |
| |
| try: |
| for report in reports: |
| run_id = report.get('run_id', 'unknown') |
| report_type = ReportType(report.get('report_type', 'full')) |
| |
| if format == ReportFormat.JSON: |
| file_path = self.export_json_report(report, run_id, report_type) |
| elif format == ReportFormat.CSV: |
| if report_type == ReportType.SUMMARY: |
| file_path = self.export_csv_summary(report, run_id) |
| else: |
| file_path = self.export_csv_detailed(report, run_id) |
| else: |
| raise ValueError(f"Unsupported export format: {format}") |
| |
| file_paths.append(file_path) |
| |
| return file_paths |
| |
| except Exception as e: |
| raise ValueError(f"Failed to export batch reports: {str(e)}") |
| |
| def get_report_file_info(self, file_path: str) -> Dict[str, Any]: |
| """ |
| Get information about a report file. |
| |
| Args: |
| file_path: Path to report file |
| |
| Returns: |
| File information dictionary |
| |
| Raises: |
| ValueError: If file not found or invalid |
| """ |
| try: |
| file_path = Path(file_path) |
| |
| if not file_path.exists(): |
| raise ValueError(f"File not found: {file_path}") |
| |
| |
| stat = file_path.stat() |
| |
| info = { |
| "file_path": str(file_path), |
| "file_name": file_path.name, |
| "file_size_bytes": stat.st_size, |
| "created_at": datetime.fromtimestamp(stat.st_ctime).isoformat(), |
| "modified_at": datetime.fromtimestamp(stat.st_mtime).isoformat(), |
| "file_extension": file_path.suffix.lower() |
| } |
| |
| |
| info["file_checksum"] = self._calculate_file_checksum(file_path) |
| |
| |
| if file_path.suffix.lower() == '.json': |
| info.update(self._get_json_file_info(file_path)) |
| elif file_path.suffix.lower() == '.csv': |
| info.update(self._get_csv_file_info(file_path)) |
| |
| return info |
| |
| except Exception as e: |
| raise ValueError(f"Failed to get file info: {str(e)}") |
| |
| def list_reports(self, run_id: Optional[str] = None, |
| format: Optional[ReportFormat] = None) -> List[Dict[str, Any]]: |
| """ |
| List available reports. |
| |
| Args: |
| run_id: Optional run ID filter |
| format: Optional format filter |
| |
| Returns: |
| List of report file information |
| """ |
| reports = [] |
| |
| try: |
| |
| for file_path in self.reports_dir.iterdir(): |
| if not file_path.is_file(): |
| continue |
| |
| |
| if format: |
| if format == ReportFormat.JSON and file_path.suffix.lower() != '.json': |
| continue |
| elif format == ReportFormat.CSV and file_path.suffix.lower() != '.csv': |
| continue |
| |
| |
| try: |
| info = self.get_report_file_info(file_path) |
| |
| |
| if run_id: |
| if run_id not in info.get('file_name', ''): |
| continue |
| |
| reports.append(info) |
| |
| except Exception: |
| |
| continue |
| |
| |
| reports.sort(key=lambda x: x['created_at'], reverse=True) |
| |
| return reports |
| |
| except Exception as e: |
| raise ValueError(f"Failed to list reports: {str(e)}") |
| |
| def delete_report(self, file_path: str) -> bool: |
| """ |
| Delete a report file. |
| |
| Args: |
| file_path: Path to report file |
| |
| Returns: |
| True if deleted, False otherwise |
| """ |
| try: |
| file_path = Path(file_path) |
| |
| if file_path.exists(): |
| file_path.unlink() |
| |
| |
| metadata_path = file_path.with_suffix('.json') |
| if metadata_path.exists() and metadata_path != file_path: |
| metadata_path.unlink() |
| |
| return True |
| |
| return False |
| |
| except Exception: |
| return False |
| |
| def _extract_csv_data(self, report_data: Dict[str, Any], run_id: str) -> List[CSVReportData]: |
| """Extract CSV data from report data.""" |
| |
| csv_rows = [] |
| |
| try: |
| |
| experiment = report_data.get('experiment', {}) |
| audit = report_data.get('audit', {}) |
| |
| |
| csv_row = CSVReportData( |
| run_id=run_id, |
| model_name=experiment.get('model_name', 'Unknown'), |
| dataset_name=experiment.get('dataset_name'), |
| dataset_version=experiment.get('dataset_version'), |
| attack_types=','.join(experiment.get('attack_types', [])), |
| created_at=experiment.get('created_at', datetime.utcnow()).isoformat(), |
| execution_time_ms=experiment.get('execution_time_ms'), |
| status=experiment.get('status', 'Unknown'), |
| total_attacks=experiment.get('total_attacks', 0), |
| successful_attacks=experiment.get('successful_attacks', 0), |
| failed_attacks=experiment.get('failed_attacks', 0), |
| success_rate=experiment.get('success_rate'), |
| robustness_score=experiment.get('robustness_score'), |
| risk_score=experiment.get('risk_score'), |
| hallucination_rate=experiment.get('hallucination_rate'), |
| toxicity_rate=experiment.get('toxicity_rate'), |
| confidence_score=experiment.get('confidence_score'), |
| config_hash=audit.get('config_hash', ''), |
| result_checksum=audit.get('result_checksum', ''), |
| audit_status=audit.get('audit_status', 'Unknown'), |
| integrity_level=audit.get('integrity_level', 'Unknown'), |
| confidence_level=audit.get('confidence_level') |
| ) |
| |
| csv_rows.append(csv_row.dict()) |
| |
| except Exception as e: |
| print(f"Warning: Failed to extract CSV data: {str(e)}") |
| |
| return csv_rows |
| |
| def _extract_detailed_csv_data(self, report_data: Dict[str, Any], run_id: str) -> List[Dict[str, Any]]: |
| """Extract detailed CSV data from report data.""" |
| |
| csv_rows = [] |
| |
| try: |
| |
| experiment = report_data.get('experiment', {}) |
| audit = report_data.get('audit', {}) |
| full_result = report_data.get('full_result', {}) |
| |
| |
| basic_info = { |
| 'run_id': run_id, |
| 'metric_type': 'basic_info', |
| 'metric_name': 'model_name', |
| 'metric_value': experiment.get('model_name', 'Unknown'), |
| 'timestamp': experiment.get('created_at', datetime.utcnow()).isoformat() |
| } |
| csv_rows.append(basic_info) |
| |
| |
| score_metrics = [ |
| ('robustness_score', experiment.get('robustness_score')), |
| ('risk_score', experiment.get('risk_score')), |
| ('confidence_score', experiment.get('confidence_score')), |
| ('hallucination_rate', experiment.get('hallucination_rate')), |
| ('toxicity_rate', experiment.get('toxicity_rate')) |
| ] |
| |
| for metric_name, metric_value in score_metrics: |
| if metric_value is not None: |
| row = { |
| 'run_id': run_id, |
| 'metric_type': 'score', |
| 'metric_name': metric_name, |
| 'metric_value': metric_value, |
| 'timestamp': experiment.get('created_at', datetime.utcnow()).isoformat() |
| } |
| csv_rows.append(row) |
| |
| |
| attack_metrics = [ |
| ('total_attacks', experiment.get('total_attacks', 0)), |
| ('successful_attacks', experiment.get('successful_attacks', 0)), |
| ('failed_attacks', experiment.get('failed_attacks', 0)), |
| ('success_rate', experiment.get('success_rate', 0)) |
| ] |
| |
| for metric_name, metric_value in attack_metrics: |
| row = { |
| 'run_id': run_id, |
| 'metric_type': 'attack', |
| 'metric_name': metric_name, |
| 'metric_value': metric_value, |
| 'timestamp': experiment.get('created_at', datetime.utcnow()).isoformat() |
| } |
| csv_rows.append(row) |
| |
| |
| audit_metrics = [ |
| ('integrity_level', audit.get('integrity_level')), |
| ('confidence_level', audit.get('confidence_level')), |
| ('replay_count', audit.get('replay_count', 0)), |
| ('verification_confidence', audit.get('confidence_score')) |
| ] |
| |
| for metric_name, metric_value in audit_metrics: |
| if metric_value is not None: |
| row = { |
| 'run_id': run_id, |
| 'metric_type': 'audit', |
| 'metric_name': metric_name, |
| 'metric_value': metric_value, |
| 'timestamp': audit.get('verification_timestamp', datetime.utcnow()).isoformat() |
| } |
| csv_rows.append(row) |
| |
| except Exception as e: |
| print(f"Warning: Failed to extract detailed CSV data: {str(e)}") |
| |
| return csv_rows |
| |
| def _calculate_file_checksum(self, file_path: Path) -> str: |
| """Calculate SHA-256 checksum of file.""" |
| |
| hash_sha256 = hashlib.sha256() |
| |
| with open(file_path, "rb") as f: |
| |
| for chunk in iter(lambda: f.read(4096), b""): |
| hash_sha256.update(chunk) |
| |
| return hash_sha256.hexdigest() |
| |
| def _get_json_file_info(self, file_path: Path) -> Dict[str, Any]: |
| """Get JSON file specific information.""" |
| |
| info = {} |
| |
| try: |
| with open(file_path, 'r', encoding='utf-8') as f: |
| data = json.load(f) |
| |
| |
| if 'export_metadata' in data: |
| metadata = data['export_metadata'] |
| info.update({ |
| 'export_format': metadata.get('export_format'), |
| 'report_type': metadata.get('report_type'), |
| 'run_id': metadata.get('run_id'), |
| 'exported_at': metadata.get('exported_at'), |
| 'file_version': metadata.get('file_version') |
| }) |
| |
| |
| if 'report_data' in data: |
| report_data = data['report_data'] |
| |
| |
| if 'experiment' in report_data: |
| experiment = report_data['experiment'] |
| info.update({ |
| 'model_name': experiment.get('model_name'), |
| 'dataset_name': experiment.get('dataset_name'), |
| 'status': experiment.get('status') |
| }) |
| |
| |
| if 'audit' in report_data: |
| audit = report_data['audit'] |
| info.update({ |
| 'audit_status': audit.get('audit_status'), |
| 'integrity_level': audit.get('integrity_level') |
| }) |
| |
| except Exception as e: |
| print(f"Warning: Failed to parse JSON file info: {str(e)}") |
| |
| return info |
| |
| def _get_csv_file_info(self, file_path: Path) -> Dict[str, Any]: |
| """Get CSV file specific information.""" |
| |
| info = {} |
| |
| try: |
| |
| metadata_path = file_path.with_suffix('.json') |
| if metadata_path.exists(): |
| with open(metadata_path, 'r', encoding='utf-8') as f: |
| metadata = json.load(f) |
| |
| if 'export_metadata' in metadata: |
| export_metadata = metadata['export_metadata'] |
| info.update({ |
| 'export_format': export_metadata.get('export_format'), |
| 'run_id': export_metadata.get('run_id'), |
| 'exported_at': export_metadata.get('exported_at'), |
| 'file_version': export_metadata.get('file_version'), |
| 'csv_rows': export_metadata.get('csv_rows', 0) |
| }) |
| |
| |
| with open(file_path, 'r', encoding='utf-8') as f: |
| row_count = sum(1 for _ in f) - 1 |
| |
| info['csv_rows'] = max(row_count, 0) |
| |
| except Exception as e: |
| print(f"Warning: Failed to parse CSV file info: {str(e)}") |
| |
| return info |
|
|
|
|
| |
| _report_exporter = None |
|
|
|
|
| def get_report_exporter() -> ReportExporter: |
| """ |
| Get the global report exporter instance. |
| |
| Returns: |
| Global report exporter instance |
| """ |
| global _report_exporter |
| if _report_exporter is None: |
| _report_exporter = ReportExporter() |
| return _report_exporter |
|
|
|
|
| |
| def export_json_report(report_data: Dict[str, Any], run_id: str, |
| report_type: ReportType = ReportType.FULL) -> str: |
| """ |
| Export report as JSON file. |
| |
| Args: |
| report_data: Report data to export |
| run_id: Associated run ID |
| report_type: Type of report |
| |
| Returns: |
| File path of exported report |
| """ |
| exporter = get_report_exporter() |
| return exporter.export_json_report(report_data, run_id, report_type) |
|
|
|
|
| def export_csv_summary(report_data: Dict[str, Any], run_id: str) -> str: |
| """ |
| Export report summary as CSV file. |
| |
| Args: |
| report_data: Report data to export |
| run_id: Associated run ID |
| |
| Returns: |
| File path of exported CSV |
| """ |
| exporter = get_report_exporter() |
| return exporter.export_csv_summary(report_data, run_id) |
|
|
|
|
| def export_csv_detailed(report_data: Dict[str, Any], run_id: str) -> str: |
| """ |
| Export detailed report data as CSV file. |
| |
| Args: |
| report_data: Report data to export |
| run_id: Associated run ID |
| |
| Returns: |
| File path of exported CSV |
| """ |
| exporter = get_report_exporter() |
| return exporter.export_csv_detailed(report_data, run_id) |
|
|