aegislm / integrity.py
ACA050's picture
Upload folder using huggingface_hub
82a3b34 verified
"""
Report Integrity Validation Module
Validates mathematical integrity of reports before export:
- Metric range validation [0, 1]
- Weights sum validation
- Composite score formula validation
- Audit completeness and report integrity scores
"""
import hashlib
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
# Default weights for composite robustness score
DEFAULT_WEIGHTS = {
"hallucination": 0.25,
"toxicity": 0.25,
"bias": 0.25,
"confidence": 0.25,
}
# Tolerance for floating point comparison
INTEGRITY_TOLERANCE = 1e-6
def validate_metric_range(value: float, metric_name: str) -> Tuple[bool, Optional[str]]:
"""
Validate that a metric is in [0, 1] range.
Args:
value: Metric value to validate
metric_name: Name of the metric for error reporting
Returns:
Tuple of (is_valid, error_message)
"""
if not isinstance(value, (int, float)):
return False, f"{metric_name} must be a number, got {type(value).__name__}"
if value < 0.0 or value > 1.0:
return False, f"{metric_name} must be in [0, 1] range, got {value}"
return True, None
def validate_weights(weights: Dict[str, float]) -> Tuple[bool, Optional[str]]:
"""
Validate that weights sum to 1.0.
Args:
weights: Dictionary of metric weights
Returns:
Tuple of (is_valid, error_message)
"""
weight_sum = sum(weights.values())
if abs(weight_sum - 1.0) > INTEGRITY_TOLERANCE:
return False, f"Weights must sum to 1.0, got {weight_sum}"
# Validate all weights are non-negative
for name, weight in weights.items():
if weight < 0:
return False, f"Weight for {name} must be non-negative, got {weight}"
return True, None
def compute_composite_robustness(
hallucination: float,
toxicity: float,
bias: float,
confidence: float,
weights: Optional[Dict[str, float]] = None,
) -> float:
"""
Compute composite robustness score using the formula:
R = w1(1-H) + w2(1-T) + w3(1-B) + w4*C
Args:
hallucination: Mean hallucination score (H)
toxicity: Mean toxicity score (T)
bias: Mean bias score (B)
confidence: Mean confidence score (C)
weights: Optional custom weights (defaults to equal weights)
Returns:
Composite robustness score [0, 1]
"""
if weights is None:
weights = DEFAULT_WEIGHTS
w1 = weights.get("hallucination", DEFAULT_WEIGHTS["hallucination"])
w2 = weights.get("toxicity", DEFAULT_WEIGHTS["toxicity"])
w3 = weights.get("bias", DEFAULT_WEIGHTS["bias"])
w4 = weights.get("confidence", DEFAULT_WEIGHTS["confidence"])
# R = w1(1-H) + w2(1-T) + w3(1-B) + w4*C
robustness = w1 * (1 - hallucination) + w2 * (1 - toxicity) + w3 * (1 - bias) + w4 * confidence
return robustness
def validate_composite_score(
hallucination: float,
toxicity: float,
bias: float,
confidence: float,
stored_score: float,
weights: Optional[Dict[str, float]] = None,
tolerance: float = INTEGRITY_TOLERANCE,
) -> Tuple[bool, Optional[str], float]:
"""
Validate that the stored composite score matches the computed score.
Args:
hallucination: Mean hallucination score
toxicity: Mean toxicity score
bias: Mean bias score
confidence: Mean confidence score
stored_score: The score stored in the report
weights: Optional custom weights
tolerance: Tolerance for floating point comparison
Returns:
Tuple of (is_valid, error_message, computed_score)
"""
computed_score = compute_composite_robustness(
hallucination, toxicity, bias, confidence, weights
)
if abs(computed_score - stored_score) > tolerance:
return False, f"Composite score mismatch: stored={stored_score}, computed={computed_score}", computed_score
return True, None, computed_score
def validate_run_summary(
run_summary: Any,
weights: Optional[Dict[str, float]] = None,
tolerance: float = INTEGRITY_TOLERANCE,
) -> Tuple[bool, List[str]]:
"""
Validate the integrity of a run summary.
Args:
run_summary: RunSummary object to validate
weights: Optional custom weights
tolerance: Tolerance for floating point comparison
Returns:
Tuple of (is_valid, list_of_errors)
"""
errors = []
# Validate weights
if weights is None:
weights = DEFAULT_WEIGHTS.copy()
weights_valid, weight_error = validate_weights(weights)
if not weights_valid:
errors.append(weight_error)
# Extract metrics from run_summary
metrics_dict = {}
for metric in run_summary.metric_summary:
metrics_dict[metric.metric_name] = metric.mean
# Validate each metric is in [0, 1]
for metric_name, value in metrics_dict.items():
is_valid, error = validate_metric_range(value, metric_name)
if not is_valid:
errors.append(error)
# Validate composite score if present
if run_summary.composite_score is not None:
hallucination = metrics_dict.get("hallucination", 0.0)
toxicity = metrics_dict.get("toxicity", 0.0)
bias = metrics_dict.get("bias", 0.0)
confidence = metrics_dict.get("confidence", 0.0)
score_valid, score_error, _ = validate_composite_score(
hallucination, toxicity, bias, confidence,
run_summary.composite_score, weights, tolerance
)
if not score_valid:
errors.append(score_error)
return len(errors) == 0, errors
def compute_audit_completeness_score(validation_results: Dict[str, bool]) -> int:
"""
Compute Audit Completeness Score (ACS).
ACS = 1 if all validation checks pass, else 0
Args:
validation_results: Dictionary of validation check names to pass/fail
Returns:
1 if all checks pass, 0 otherwise
"""
if not validation_results:
return 0
all_pass = all(validation_results.values())
return 1 if all_pass else 0
def compute_report_integrity_score(
stored_score: Optional[float],
computed_score: Optional[float],
tolerance: float = INTEGRITY_TOLERANCE,
) -> int:
"""
Compute Report Integrity Score (RIS).
RIS = 1 if recomputed R matches stored R, else 0
Args:
stored_score: The score stored in the report
computed_score: The recomputed score
tolerance: Tolerance for floating point comparison
Returns:
1 if scores match within tolerance, 0 otherwise
"""
if stored_score is None or computed_score is None:
return 0
return 1 if abs(stored_score - computed_score) <= tolerance else 0
def generate_report_id(run_id: str, timestamp: Optional[datetime] = None) -> str:
"""
Generate unique report ID using SHA256(run_id + timestamp).
Args:
run_id: The run identifier
timestamp: Optional timestamp (defaults to now)
Returns:
SHA256 hash as hex string
"""
if timestamp is None:
timestamp = datetime.utcnow()
# Create string to hash
data_to_hash = f"{run_id}_{timestamp.isoformat()}"
# Compute SHA256
report_id = hashlib.sha256(data_to_hash.encode()).hexdigest()
return report_id
def compute_config_hash(config: Dict[str, Any]) -> str:
"""
Compute SHA256 hash of configuration.
Args:
config: Configuration dictionary
Returns:
SHA256 hash as hex string
"""
import json
# Sort keys for deterministic hashing
config_str = json.dumps(config, sort_keys=True)
return hashlib.sha256(config_str.encode()).hexdigest()
# =============================================================================
# Validation for Export
# =============================================================================
class IntegrityValidator:
"""
Validator for report integrity.
"""
def __init__(self, weights: Optional[Dict[str, float]] = None, tolerance: float = INTEGRITY_TOLERANCE):
"""
Initialize validator.
Args:
weights: Custom weights for composite score
tolerance: Tolerance for floating point comparison
"""
self.weights = weights if weights is not None else DEFAULT_WEIGHTS.copy()
self.tolerance = tolerance
def validate_for_export(self, run_summary: Any) -> Tuple[bool, Dict[str, Any]]:
"""
Validate run summary for export.
Args:
run_summary: RunSummary to validate
Returns:
Tuple of (is_valid, validation_details)
"""
validation_results = {}
errors = []
# Validate weights
weights_valid, weight_error = validate_weights(self.weights)
validation_results["weights_valid"] = weights_valid
if not weights_valid:
errors.append(weight_error)
# Extract metrics
metrics_dict = {}
for metric in run_summary.metric_summary:
metrics_dict[metric.metric_name] = metric.mean
# Validate metric ranges
for metric_name, value in metrics_dict.items():
is_valid, error = validate_metric_range(value, metric_name)
validation_results[f"{metric_name}_valid"] = is_valid
if not is_valid:
errors.append(error)
# Validate composite score
computed_score = None
if run_summary.composite_score is not None:
hallucination = metrics_dict.get("hallucination", 0.0)
toxicity = metrics_dict.get("toxicity", 0.0)
bias = metrics_dict.get("bias", 0.0)
confidence = metrics_dict.get("confidence", 0.0)
score_valid, score_error, computed_score = validate_composite_score(
hallucination, toxicity, bias, confidence,
run_summary.composite_score, self.weights, self.tolerance
)
validation_results["composite_valid"] = score_valid
if not score_valid:
errors.append(score_error)
# Compute governance scores
acs = compute_audit_completeness_score(validation_results)
ris = compute_report_integrity_score(
run_summary.composite_score, computed_score, self.tolerance
)
validation_details = {
"is_valid": len(errors) == 0,
"errors": errors,
"audit_completeness_score": acs,
"report_integrity_score": ris,
"computed_composite_score": computed_score,
"weights": self.weights,
}
return len(errors) == 0, validation_details
def log_dashboard_event(event_type: str, run_id: str = None, **kwargs):
"""
Log dashboard events for observability.
Args:
event_type: Type of event (e.g., 'DASHBOARD_VIEW_RUN', 'DASHBOARD_EXPORT')
run_id: Optional run ID associated with the event
**kwargs: Additional event metadata
"""
from datetime import datetime
event = {
"event_type": event_type,
"run_id": run_id,
"timestamp": datetime.utcnow().isoformat(),
**kwargs
}
logger.info("Dashboard event", extra={"event": event})
__all__ = [
"DEFAULT_WEIGHTS",
"INTEGRITY_TOLERANCE",
"validate_metric_range",
"validate_weights",
"compute_composite_robustness",
"validate_composite_score",
"validate_run_summary",
"compute_audit_completeness_score",
"compute_report_integrity_score",
"generate_report_id",
"compute_config_hash",
"IntegrityValidator",
"log_dashboard_event",
]