|
|
""" |
|
|
Hierarchical Risk Modeling: Clause-to-Contract Level Aggregation |
|
|
|
|
|
This module implements hierarchical risk assessment that aggregates clause-level |
|
|
predictions to contract-level risk scores and insights. |
|
|
""" |
|
|
import numpy as np |
|
|
import torch |
|
|
from typing import Dict, List, Any, Tuple |
|
|
from collections import defaultdict |
|
|
import warnings |
|
|
|
|
|
|
|
|
class HierarchicalRiskAggregator: |
|
|
""" |
|
|
Aggregates clause-level risk predictions to contract-level risk assessment. |
|
|
|
|
|
Supports multiple aggregation strategies: |
|
|
- Maximum risk (worst-case scenario) |
|
|
- Average risk (overall risk profile) |
|
|
- Weighted average (importance-weighted) |
|
|
- Risk distribution analysis |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize the hierarchical risk aggregator""" |
|
|
self.aggregation_methods = { |
|
|
'max': self._aggregate_max, |
|
|
'mean': self._aggregate_mean, |
|
|
'weighted_mean': self._aggregate_weighted, |
|
|
'risk_distribution': self._aggregate_distribution, |
|
|
'severity_weighted': self._aggregate_severity_weighted |
|
|
} |
|
|
|
|
|
def aggregate_contract_risk(self, |
|
|
clause_predictions: List[Dict[str, Any]], |
|
|
method: str = 'weighted_mean') -> Dict[str, Any]: |
|
|
""" |
|
|
Aggregate clause-level predictions to contract-level risk assessment. |
|
|
|
|
|
Args: |
|
|
clause_predictions: List of dictionaries containing clause-level predictions |
|
|
Each dict should have: risk_id, confidence, severity, importance |
|
|
method: Aggregation method ('max', 'mean', 'weighted_mean', 'risk_distribution') |
|
|
|
|
|
Returns: |
|
|
Dictionary containing contract-level risk assessment |
|
|
""" |
|
|
if not clause_predictions: |
|
|
return {'error': 'No clause predictions provided'} |
|
|
|
|
|
if method not in self.aggregation_methods: |
|
|
warnings.warn(f"Unknown method {method}, using 'weighted_mean'") |
|
|
method = 'weighted_mean' |
|
|
|
|
|
|
|
|
risk_ids = np.array([p['predicted_risk_id'] for p in clause_predictions]) |
|
|
confidences = np.array([p['confidence'] for p in clause_predictions]) |
|
|
severities = np.array([p['severity_score'] for p in clause_predictions]) |
|
|
importances = np.array([p['importance_score'] for p in clause_predictions]) |
|
|
|
|
|
|
|
|
contract_risk = self.aggregation_methods[method]( |
|
|
risk_ids, confidences, severities, importances |
|
|
) |
|
|
|
|
|
|
|
|
contract_risk.update({ |
|
|
'num_clauses': len(clause_predictions), |
|
|
'clause_statistics': self._compute_clause_statistics( |
|
|
risk_ids, confidences, severities, importances |
|
|
), |
|
|
'risk_distribution': self._compute_risk_distribution(risk_ids, severities), |
|
|
'high_risk_clauses': self._identify_high_risk_clauses( |
|
|
clause_predictions, threshold=7.0 |
|
|
) |
|
|
}) |
|
|
|
|
|
return contract_risk |
|
|
|
|
|
def _aggregate_max(self, risk_ids, confidences, severities, importances) -> Dict[str, Any]: |
|
|
"""Maximum risk aggregation (worst-case scenario)""" |
|
|
max_severity_idx = np.argmax(severities) |
|
|
|
|
|
return { |
|
|
'contract_risk_id': int(risk_ids[max_severity_idx]), |
|
|
'contract_severity': float(severities[max_severity_idx]), |
|
|
'contract_importance': float(importances[max_severity_idx]), |
|
|
'contract_confidence': float(confidences[max_severity_idx]), |
|
|
'aggregation_method': 'max', |
|
|
'rationale': 'Based on highest severity clause' |
|
|
} |
|
|
|
|
|
def _aggregate_mean(self, risk_ids, confidences, severities, importances) -> Dict[str, Any]: |
|
|
"""Simple mean aggregation""" |
|
|
|
|
|
unique_risks, counts = np.unique(risk_ids, return_counts=True) |
|
|
dominant_risk = unique_risks[np.argmax(counts)] |
|
|
|
|
|
return { |
|
|
'contract_risk_id': int(dominant_risk), |
|
|
'contract_severity': float(np.mean(severities)), |
|
|
'contract_importance': float(np.mean(importances)), |
|
|
'contract_confidence': float(np.mean(confidences)), |
|
|
'aggregation_method': 'mean', |
|
|
'rationale': 'Based on average across all clauses' |
|
|
} |
|
|
|
|
|
def _aggregate_weighted(self, risk_ids, confidences, severities, importances) -> Dict[str, Any]: |
|
|
"""Importance-weighted aggregation""" |
|
|
|
|
|
weights = importances / np.sum(importances) if np.sum(importances) > 0 else np.ones_like(importances) / len(importances) |
|
|
|
|
|
|
|
|
weighted_severity = float(np.sum(severities * weights)) |
|
|
weighted_importance = float(np.sum(importances * weights)) |
|
|
weighted_confidence = float(np.sum(confidences * weights)) |
|
|
|
|
|
|
|
|
risk_weights = defaultdict(float) |
|
|
for risk_id, weight in zip(risk_ids, weights): |
|
|
risk_weights[risk_id] += weight |
|
|
|
|
|
dominant_risk = max(risk_weights.items(), key=lambda x: x[1])[0] |
|
|
|
|
|
return { |
|
|
'contract_risk_id': int(dominant_risk), |
|
|
'contract_severity': weighted_severity, |
|
|
'contract_importance': weighted_importance, |
|
|
'contract_confidence': weighted_confidence, |
|
|
'aggregation_method': 'weighted_mean', |
|
|
'rationale': 'Weighted by clause importance scores' |
|
|
} |
|
|
|
|
|
def _aggregate_severity_weighted(self, risk_ids, confidences, severities, importances) -> Dict[str, Any]: |
|
|
"""Severity-weighted aggregation (emphasizes high-risk clauses)""" |
|
|
|
|
|
weights = severities / np.sum(severities) if np.sum(severities) > 0 else np.ones_like(severities) / len(severities) |
|
|
|
|
|
|
|
|
weighted_severity = float(np.sum(severities * weights)) |
|
|
weighted_importance = float(np.sum(importances * weights)) |
|
|
weighted_confidence = float(np.sum(confidences * weights)) |
|
|
|
|
|
|
|
|
risk_weights = defaultdict(float) |
|
|
for risk_id, weight in zip(risk_ids, weights): |
|
|
risk_weights[risk_id] += weight |
|
|
|
|
|
dominant_risk = max(risk_weights.items(), key=lambda x: x[1])[0] |
|
|
|
|
|
return { |
|
|
'contract_risk_id': int(dominant_risk), |
|
|
'contract_severity': weighted_severity, |
|
|
'contract_importance': weighted_importance, |
|
|
'contract_confidence': weighted_confidence, |
|
|
'aggregation_method': 'severity_weighted', |
|
|
'rationale': 'Weighted by clause severity (emphasizes high-risk clauses)' |
|
|
} |
|
|
|
|
|
def _aggregate_distribution(self, risk_ids, confidences, severities, importances) -> Dict[str, Any]: |
|
|
"""Risk distribution-based aggregation""" |
|
|
|
|
|
unique_risks, counts = np.unique(risk_ids, return_counts=True) |
|
|
risk_proportions = counts / len(risk_ids) |
|
|
|
|
|
|
|
|
entropy = -np.sum(risk_proportions * np.log(risk_proportions + 1e-10)) |
|
|
|
|
|
|
|
|
risk_severities = {} |
|
|
for risk_id in unique_risks: |
|
|
mask = risk_ids == risk_id |
|
|
risk_severities[risk_id] = np.mean(severities[mask]) |
|
|
|
|
|
dominant_risk = max(risk_severities.items(), key=lambda x: x[1])[0] |
|
|
|
|
|
return { |
|
|
'contract_risk_id': int(dominant_risk), |
|
|
'contract_severity': float(np.mean(severities)), |
|
|
'contract_importance': float(np.mean(importances)), |
|
|
'contract_confidence': float(np.mean(confidences)), |
|
|
'risk_diversity': float(entropy), |
|
|
'aggregation_method': 'risk_distribution', |
|
|
'rationale': 'Based on risk distribution analysis' |
|
|
} |
|
|
|
|
|
def _compute_clause_statistics(self, risk_ids, confidences, severities, importances) -> Dict[str, float]: |
|
|
"""Compute statistical summary of clause-level predictions""" |
|
|
return { |
|
|
'mean_severity': float(np.mean(severities)), |
|
|
'std_severity': float(np.std(severities)), |
|
|
'max_severity': float(np.max(severities)), |
|
|
'min_severity': float(np.min(severities)), |
|
|
'mean_importance': float(np.mean(importances)), |
|
|
'std_importance': float(np.std(importances)), |
|
|
'mean_confidence': float(np.mean(confidences)), |
|
|
'std_confidence': float(np.std(confidences)) |
|
|
} |
|
|
|
|
|
def _compute_risk_distribution(self, risk_ids, severities) -> Dict[int, Dict[str, float]]: |
|
|
"""Compute distribution of risk types and their statistics""" |
|
|
risk_dist = {} |
|
|
unique_risks = np.unique(risk_ids) |
|
|
|
|
|
for risk_id in unique_risks: |
|
|
mask = risk_ids == risk_id |
|
|
risk_dist[int(risk_id)] = { |
|
|
'count': int(np.sum(mask)), |
|
|
'proportion': float(np.mean(mask)), |
|
|
'avg_severity': float(np.mean(severities[mask])), |
|
|
'max_severity': float(np.max(severities[mask])) |
|
|
} |
|
|
|
|
|
return risk_dist |
|
|
|
|
|
def _identify_high_risk_clauses(self, clause_predictions: List[Dict[str, Any]], |
|
|
threshold: float = 7.0) -> List[Dict[str, Any]]: |
|
|
"""Identify clauses with high risk (severity above threshold)""" |
|
|
high_risk = [] |
|
|
|
|
|
for idx, pred in enumerate(clause_predictions): |
|
|
if pred['severity_score'] >= threshold: |
|
|
high_risk.append({ |
|
|
'clause_index': idx, |
|
|
'risk_id': pred['predicted_risk_id'], |
|
|
'severity': pred['severity_score'], |
|
|
'importance': pred['importance_score'], |
|
|
'confidence': pred['confidence'] |
|
|
}) |
|
|
|
|
|
|
|
|
high_risk.sort(key=lambda x: x['severity'], reverse=True) |
|
|
|
|
|
return high_risk |
|
|
|
|
|
def compare_contracts(self, contract_a_predictions: List[Dict[str, Any]], |
|
|
contract_b_predictions: List[Dict[str, Any]]) -> Dict[str, Any]: |
|
|
""" |
|
|
Compare risk profiles of two contracts. |
|
|
|
|
|
Args: |
|
|
contract_a_predictions: Clause predictions for contract A |
|
|
contract_b_predictions: Clause predictions for contract B |
|
|
|
|
|
Returns: |
|
|
Comparison analysis including relative risk levels and differences |
|
|
""" |
|
|
|
|
|
contract_a = self.aggregate_contract_risk(contract_a_predictions, method='weighted_mean') |
|
|
contract_b = self.aggregate_contract_risk(contract_b_predictions, method='weighted_mean') |
|
|
|
|
|
|
|
|
severity_diff = contract_a['contract_severity'] - contract_b['contract_severity'] |
|
|
importance_diff = contract_a['contract_importance'] - contract_b['contract_importance'] |
|
|
|
|
|
|
|
|
riskier_contract = 'Contract A' if severity_diff > 0 else 'Contract B' |
|
|
risk_difference = abs(severity_diff) |
|
|
|
|
|
return { |
|
|
'contract_a': { |
|
|
'severity': contract_a['contract_severity'], |
|
|
'importance': contract_a['contract_importance'], |
|
|
'num_clauses': contract_a['num_clauses'], |
|
|
'high_risk_clauses': len(contract_a['high_risk_clauses']) |
|
|
}, |
|
|
'contract_b': { |
|
|
'severity': contract_b['contract_severity'], |
|
|
'importance': contract_b['contract_importance'], |
|
|
'num_clauses': contract_b['num_clauses'], |
|
|
'high_risk_clauses': len(contract_b['high_risk_clauses']) |
|
|
}, |
|
|
'comparison': { |
|
|
'riskier_contract': riskier_contract, |
|
|
'severity_difference': float(severity_diff), |
|
|
'importance_difference': float(importance_diff), |
|
|
'risk_magnitude': 'high' if risk_difference > 2.0 else 'moderate' if risk_difference > 1.0 else 'low' |
|
|
} |
|
|
} |
|
|
|
|
|
def generate_contract_report(self, clause_predictions: List[Dict[str, Any]], |
|
|
contract_name: str = "Contract") -> str: |
|
|
""" |
|
|
Generate a human-readable report of contract risk assessment. |
|
|
|
|
|
Args: |
|
|
clause_predictions: Clause-level predictions |
|
|
contract_name: Name/identifier for the contract |
|
|
|
|
|
Returns: |
|
|
Formatted text report |
|
|
""" |
|
|
|
|
|
contract_risk = self.aggregate_contract_risk(clause_predictions, method='weighted_mean') |
|
|
|
|
|
|
|
|
report = f"\n{'='*70}\n" |
|
|
report += f"CONTRACT RISK ASSESSMENT REPORT: {contract_name}\n" |
|
|
report += f"{'='*70}\n\n" |
|
|
|
|
|
report += f"📊 OVERALL ASSESSMENT\n" |
|
|
report += f"{'-'*70}\n" |
|
|
report += f"Risk Category ID: {contract_risk['contract_risk_id']}\n" |
|
|
report += f"Overall Severity: {contract_risk['contract_severity']:.2f}/10.0\n" |
|
|
report += f"Overall Importance: {contract_risk['contract_importance']:.2f}/10.0\n" |
|
|
report += f"Confidence Level: {contract_risk['contract_confidence']:.2%}\n" |
|
|
report += f"Number of Clauses Analyzed: {contract_risk['num_clauses']}\n\n" |
|
|
|
|
|
|
|
|
severity = contract_risk['contract_severity'] |
|
|
if severity >= 8.0: |
|
|
risk_level = "🔴 CRITICAL RISK" |
|
|
elif severity >= 6.0: |
|
|
risk_level = "🟠 HIGH RISK" |
|
|
elif severity >= 4.0: |
|
|
risk_level = "🟡 MODERATE RISK" |
|
|
else: |
|
|
risk_level = "🟢 LOW RISK" |
|
|
|
|
|
report += f"Risk Level: {risk_level}\n\n" |
|
|
|
|
|
|
|
|
high_risk = contract_risk['high_risk_clauses'] |
|
|
if high_risk: |
|
|
report += f"⚠️ HIGH-RISK CLAUSES (Severity ≥ 7.0)\n" |
|
|
report += f"{'-'*70}\n" |
|
|
for clause in high_risk[:5]: |
|
|
report += f"Clause {clause['clause_index']}: " |
|
|
report += f"Severity={clause['severity']:.2f}, " |
|
|
report += f"Importance={clause['importance']:.2f}, " |
|
|
report += f"Confidence={clause['confidence']:.2%}\n" |
|
|
if len(high_risk) > 5: |
|
|
report += f"... and {len(high_risk) - 5} more high-risk clauses\n" |
|
|
report += "\n" |
|
|
|
|
|
|
|
|
report += f"📈 RISK DISTRIBUTION\n" |
|
|
report += f"{'-'*70}\n" |
|
|
risk_dist = contract_risk['risk_distribution'] |
|
|
for risk_id, stats in sorted(risk_dist.items(), key=lambda x: x[1]['avg_severity'], reverse=True): |
|
|
report += f"Risk Type {risk_id}: " |
|
|
report += f"{stats['count']} clauses ({stats['proportion']:.1%}), " |
|
|
report += f"Avg Severity={stats['avg_severity']:.2f}\n" |
|
|
|
|
|
report += f"\n{'='*70}\n" |
|
|
|
|
|
return report |
|
|
|
|
|
|
|
|
class RiskDependencyAnalyzer: |
|
|
""" |
|
|
Analyzes dependencies and interactions between different risk types in a contract. |
|
|
|
|
|
This helps identify: |
|
|
- Co-occurrence patterns (which risks tend to appear together) |
|
|
- Risk amplification (how one risk type affects others) |
|
|
- Risk chains (sequences of related risks) |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize the risk dependency analyzer""" |
|
|
self.cooccurrence_matrix = None |
|
|
|
|
|
def analyze_risk_cooccurrence(self, clause_predictions: List[Dict[str, Any]], |
|
|
num_risk_types: int = 7) -> np.ndarray: |
|
|
""" |
|
|
Analyze co-occurrence of risk types within a contract. |
|
|
|
|
|
Args: |
|
|
clause_predictions: Clause-level predictions |
|
|
num_risk_types: Total number of risk types |
|
|
|
|
|
Returns: |
|
|
Co-occurrence matrix (num_risk_types x num_risk_types) |
|
|
""" |
|
|
|
|
|
risk_ids = [p['predicted_risk_id'] for p in clause_predictions] |
|
|
|
|
|
|
|
|
cooccur = np.zeros((num_risk_types, num_risk_types)) |
|
|
|
|
|
|
|
|
for i in range(num_risk_types): |
|
|
for j in range(num_risk_types): |
|
|
|
|
|
has_i = i in risk_ids |
|
|
has_j = j in risk_ids |
|
|
if has_i and has_j: |
|
|
cooccur[i, j] += 1 |
|
|
|
|
|
self.cooccurrence_matrix = cooccur |
|
|
return cooccur |
|
|
|
|
|
def find_risk_chains(self, clause_predictions: List[Dict[str, Any]], |
|
|
window_size: int = 3) -> List[List[int]]: |
|
|
""" |
|
|
Identify sequences of related risks (risk chains) in contract clauses. |
|
|
|
|
|
Args: |
|
|
clause_predictions: Clause-level predictions (ordered by clause position) |
|
|
window_size: Size of sliding window to find risk chains |
|
|
|
|
|
Returns: |
|
|
List of risk chains (sequences of risk IDs) |
|
|
""" |
|
|
if len(clause_predictions) < window_size: |
|
|
return [] |
|
|
|
|
|
risk_ids = [p['predicted_risk_id'] for p in clause_predictions] |
|
|
|
|
|
chains = [] |
|
|
for i in range(len(risk_ids) - window_size + 1): |
|
|
chain = risk_ids[i:i+window_size] |
|
|
|
|
|
if len(set(chain)) >= 2: |
|
|
chains.append(chain) |
|
|
|
|
|
return chains |
|
|
|
|
|
def compute_risk_correlation(self, contract_predictions: List[List[Dict[str, Any]]], |
|
|
num_risk_types: int = 7) -> np.ndarray: |
|
|
""" |
|
|
Compute correlation between risk types across multiple contracts. |
|
|
|
|
|
Args: |
|
|
contract_predictions: List of contract predictions (each is list of clause predictions) |
|
|
num_risk_types: Total number of risk types |
|
|
|
|
|
Returns: |
|
|
Correlation matrix showing how risk types co-occur across contracts |
|
|
""" |
|
|
|
|
|
num_contracts = len(contract_predictions) |
|
|
risk_matrix = np.zeros((num_contracts, num_risk_types)) |
|
|
|
|
|
for contract_idx, clause_preds in enumerate(contract_predictions): |
|
|
risk_ids = [p['predicted_risk_id'] for p in clause_preds] |
|
|
for risk_id in set(risk_ids): |
|
|
risk_matrix[contract_idx, risk_id] = 1 |
|
|
|
|
|
|
|
|
correlation = np.corrcoef(risk_matrix.T) |
|
|
|
|
|
return correlation |
|
|
|
|
|
def analyze_risk_amplification(self, clause_predictions: List[Dict[str, Any]]) -> Dict[int, Dict[str, float]]: |
|
|
""" |
|
|
Analyze how the presence of one risk type affects severity of others. |
|
|
|
|
|
Args: |
|
|
clause_predictions: Clause-level predictions |
|
|
|
|
|
Returns: |
|
|
Dictionary mapping risk_id to its amplification effects on other risks |
|
|
""" |
|
|
|
|
|
risk_groups = defaultdict(list) |
|
|
for pred in clause_predictions: |
|
|
risk_groups[pred['predicted_risk_id']].append(pred) |
|
|
|
|
|
amplification = {} |
|
|
|
|
|
|
|
|
for risk_id, clauses in risk_groups.items(): |
|
|
severities = [c['severity_score'] for c in clauses] |
|
|
importances = [c['importance_score'] for c in clauses] |
|
|
|
|
|
amplification[risk_id] = { |
|
|
'avg_severity': float(np.mean(severities)), |
|
|
'max_severity': float(np.max(severities)), |
|
|
'avg_importance': float(np.mean(importances)), |
|
|
'clause_count': len(clauses), |
|
|
'severity_variance': float(np.var(severities)) |
|
|
} |
|
|
|
|
|
return amplification |
|
|
|