code2-repo / hierarchical_risk.py
Deepu1965's picture
Upload folder using huggingface_hub
9b1c753 verified
"""
Hierarchical Risk Modeling: Clause-to-Contract Level Aggregation
This module implements hierarchical risk assessment that aggregates clause-level
predictions to contract-level risk scores and insights.
"""
import numpy as np
import torch
from typing import Dict, List, Any, Tuple
from collections import defaultdict
import warnings
class HierarchicalRiskAggregator:
"""
Aggregates clause-level risk predictions to contract-level risk assessment.
Supports multiple aggregation strategies:
- Maximum risk (worst-case scenario)
- Average risk (overall risk profile)
- Weighted average (importance-weighted)
- Risk distribution analysis
"""
def __init__(self):
"""Initialize the hierarchical risk aggregator"""
self.aggregation_methods = {
'max': self._aggregate_max,
'mean': self._aggregate_mean,
'weighted_mean': self._aggregate_weighted,
'risk_distribution': self._aggregate_distribution,
'severity_weighted': self._aggregate_severity_weighted
}
def aggregate_contract_risk(self,
clause_predictions: List[Dict[str, Any]],
method: str = 'weighted_mean') -> Dict[str, Any]:
"""
Aggregate clause-level predictions to contract-level risk assessment.
Args:
clause_predictions: List of dictionaries containing clause-level predictions
Each dict should have: risk_id, confidence, severity, importance
method: Aggregation method ('max', 'mean', 'weighted_mean', 'risk_distribution')
Returns:
Dictionary containing contract-level risk assessment
"""
if not clause_predictions:
return {'error': 'No clause predictions provided'}
if method not in self.aggregation_methods:
warnings.warn(f"Unknown method {method}, using 'weighted_mean'")
method = 'weighted_mean'
# Extract predictions
risk_ids = np.array([p['predicted_risk_id'] for p in clause_predictions])
confidences = np.array([p['confidence'] for p in clause_predictions])
severities = np.array([p['severity_score'] for p in clause_predictions])
importances = np.array([p['importance_score'] for p in clause_predictions])
# Apply aggregation method
contract_risk = self.aggregation_methods[method](
risk_ids, confidences, severities, importances
)
# Add common statistics
contract_risk.update({
'num_clauses': len(clause_predictions),
'clause_statistics': self._compute_clause_statistics(
risk_ids, confidences, severities, importances
),
'risk_distribution': self._compute_risk_distribution(risk_ids, severities),
'high_risk_clauses': self._identify_high_risk_clauses(
clause_predictions, threshold=7.0
)
})
return contract_risk
def _aggregate_max(self, risk_ids, confidences, severities, importances) -> Dict[str, Any]:
"""Maximum risk aggregation (worst-case scenario)"""
max_severity_idx = np.argmax(severities)
return {
'contract_risk_id': int(risk_ids[max_severity_idx]),
'contract_severity': float(severities[max_severity_idx]),
'contract_importance': float(importances[max_severity_idx]),
'contract_confidence': float(confidences[max_severity_idx]),
'aggregation_method': 'max',
'rationale': 'Based on highest severity clause'
}
def _aggregate_mean(self, risk_ids, confidences, severities, importances) -> Dict[str, Any]:
"""Simple mean aggregation"""
# Most common risk type
unique_risks, counts = np.unique(risk_ids, return_counts=True)
dominant_risk = unique_risks[np.argmax(counts)]
return {
'contract_risk_id': int(dominant_risk),
'contract_severity': float(np.mean(severities)),
'contract_importance': float(np.mean(importances)),
'contract_confidence': float(np.mean(confidences)),
'aggregation_method': 'mean',
'rationale': 'Based on average across all clauses'
}
def _aggregate_weighted(self, risk_ids, confidences, severities, importances) -> Dict[str, Any]:
"""Importance-weighted aggregation"""
# Normalize importance scores to use as weights
weights = importances / np.sum(importances) if np.sum(importances) > 0 else np.ones_like(importances) / len(importances)
# Weighted average of severity
weighted_severity = float(np.sum(severities * weights))
weighted_importance = float(np.sum(importances * weights))
weighted_confidence = float(np.sum(confidences * weights))
# Weight risk types by their importance
risk_weights = defaultdict(float)
for risk_id, weight in zip(risk_ids, weights):
risk_weights[risk_id] += weight
dominant_risk = max(risk_weights.items(), key=lambda x: x[1])[0]
return {
'contract_risk_id': int(dominant_risk),
'contract_severity': weighted_severity,
'contract_importance': weighted_importance,
'contract_confidence': weighted_confidence,
'aggregation_method': 'weighted_mean',
'rationale': 'Weighted by clause importance scores'
}
def _aggregate_severity_weighted(self, risk_ids, confidences, severities, importances) -> Dict[str, Any]:
"""Severity-weighted aggregation (emphasizes high-risk clauses)"""
# Use severity as weights
weights = severities / np.sum(severities) if np.sum(severities) > 0 else np.ones_like(severities) / len(severities)
# Weighted statistics
weighted_severity = float(np.sum(severities * weights))
weighted_importance = float(np.sum(importances * weights))
weighted_confidence = float(np.sum(confidences * weights))
# Weight risk types by their severity
risk_weights = defaultdict(float)
for risk_id, weight in zip(risk_ids, weights):
risk_weights[risk_id] += weight
dominant_risk = max(risk_weights.items(), key=lambda x: x[1])[0]
return {
'contract_risk_id': int(dominant_risk),
'contract_severity': weighted_severity,
'contract_importance': weighted_importance,
'contract_confidence': weighted_confidence,
'aggregation_method': 'severity_weighted',
'rationale': 'Weighted by clause severity (emphasizes high-risk clauses)'
}
def _aggregate_distribution(self, risk_ids, confidences, severities, importances) -> Dict[str, Any]:
"""Risk distribution-based aggregation"""
# Analyze risk distribution
unique_risks, counts = np.unique(risk_ids, return_counts=True)
risk_proportions = counts / len(risk_ids)
# Calculate diversity (entropy)
entropy = -np.sum(risk_proportions * np.log(risk_proportions + 1e-10))
# Dominant risk with highest severity
risk_severities = {}
for risk_id in unique_risks:
mask = risk_ids == risk_id
risk_severities[risk_id] = np.mean(severities[mask])
dominant_risk = max(risk_severities.items(), key=lambda x: x[1])[0]
return {
'contract_risk_id': int(dominant_risk),
'contract_severity': float(np.mean(severities)),
'contract_importance': float(np.mean(importances)),
'contract_confidence': float(np.mean(confidences)),
'risk_diversity': float(entropy),
'aggregation_method': 'risk_distribution',
'rationale': 'Based on risk distribution analysis'
}
def _compute_clause_statistics(self, risk_ids, confidences, severities, importances) -> Dict[str, float]:
"""Compute statistical summary of clause-level predictions"""
return {
'mean_severity': float(np.mean(severities)),
'std_severity': float(np.std(severities)),
'max_severity': float(np.max(severities)),
'min_severity': float(np.min(severities)),
'mean_importance': float(np.mean(importances)),
'std_importance': float(np.std(importances)),
'mean_confidence': float(np.mean(confidences)),
'std_confidence': float(np.std(confidences))
}
def _compute_risk_distribution(self, risk_ids, severities) -> Dict[int, Dict[str, float]]:
"""Compute distribution of risk types and their statistics"""
risk_dist = {}
unique_risks = np.unique(risk_ids)
for risk_id in unique_risks:
mask = risk_ids == risk_id
risk_dist[int(risk_id)] = {
'count': int(np.sum(mask)),
'proportion': float(np.mean(mask)),
'avg_severity': float(np.mean(severities[mask])),
'max_severity': float(np.max(severities[mask]))
}
return risk_dist
def _identify_high_risk_clauses(self, clause_predictions: List[Dict[str, Any]],
threshold: float = 7.0) -> List[Dict[str, Any]]:
"""Identify clauses with high risk (severity above threshold)"""
high_risk = []
for idx, pred in enumerate(clause_predictions):
if pred['severity_score'] >= threshold:
high_risk.append({
'clause_index': idx,
'risk_id': pred['predicted_risk_id'],
'severity': pred['severity_score'],
'importance': pred['importance_score'],
'confidence': pred['confidence']
})
# Sort by severity (descending)
high_risk.sort(key=lambda x: x['severity'], reverse=True)
return high_risk
def compare_contracts(self, contract_a_predictions: List[Dict[str, Any]],
contract_b_predictions: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Compare risk profiles of two contracts.
Args:
contract_a_predictions: Clause predictions for contract A
contract_b_predictions: Clause predictions for contract B
Returns:
Comparison analysis including relative risk levels and differences
"""
# Aggregate both contracts
contract_a = self.aggregate_contract_risk(contract_a_predictions, method='weighted_mean')
contract_b = self.aggregate_contract_risk(contract_b_predictions, method='weighted_mean')
# Compute differences
severity_diff = contract_a['contract_severity'] - contract_b['contract_severity']
importance_diff = contract_a['contract_importance'] - contract_b['contract_importance']
# Determine which is riskier
riskier_contract = 'Contract A' if severity_diff > 0 else 'Contract B'
risk_difference = abs(severity_diff)
return {
'contract_a': {
'severity': contract_a['contract_severity'],
'importance': contract_a['contract_importance'],
'num_clauses': contract_a['num_clauses'],
'high_risk_clauses': len(contract_a['high_risk_clauses'])
},
'contract_b': {
'severity': contract_b['contract_severity'],
'importance': contract_b['contract_importance'],
'num_clauses': contract_b['num_clauses'],
'high_risk_clauses': len(contract_b['high_risk_clauses'])
},
'comparison': {
'riskier_contract': riskier_contract,
'severity_difference': float(severity_diff),
'importance_difference': float(importance_diff),
'risk_magnitude': 'high' if risk_difference > 2.0 else 'moderate' if risk_difference > 1.0 else 'low'
}
}
def generate_contract_report(self, clause_predictions: List[Dict[str, Any]],
contract_name: str = "Contract") -> str:
"""
Generate a human-readable report of contract risk assessment.
Args:
clause_predictions: Clause-level predictions
contract_name: Name/identifier for the contract
Returns:
Formatted text report
"""
# Aggregate risk
contract_risk = self.aggregate_contract_risk(clause_predictions, method='weighted_mean')
# Build report
report = f"\n{'='*70}\n"
report += f"CONTRACT RISK ASSESSMENT REPORT: {contract_name}\n"
report += f"{'='*70}\n\n"
report += f"📊 OVERALL ASSESSMENT\n"
report += f"{'-'*70}\n"
report += f"Risk Category ID: {contract_risk['contract_risk_id']}\n"
report += f"Overall Severity: {contract_risk['contract_severity']:.2f}/10.0\n"
report += f"Overall Importance: {contract_risk['contract_importance']:.2f}/10.0\n"
report += f"Confidence Level: {contract_risk['contract_confidence']:.2%}\n"
report += f"Number of Clauses Analyzed: {contract_risk['num_clauses']}\n\n"
# Severity interpretation
severity = contract_risk['contract_severity']
if severity >= 8.0:
risk_level = "🔴 CRITICAL RISK"
elif severity >= 6.0:
risk_level = "🟠 HIGH RISK"
elif severity >= 4.0:
risk_level = "🟡 MODERATE RISK"
else:
risk_level = "🟢 LOW RISK"
report += f"Risk Level: {risk_level}\n\n"
# High-risk clauses
high_risk = contract_risk['high_risk_clauses']
if high_risk:
report += f"⚠️ HIGH-RISK CLAUSES (Severity ≥ 7.0)\n"
report += f"{'-'*70}\n"
for clause in high_risk[:5]: # Show top 5
report += f"Clause {clause['clause_index']}: "
report += f"Severity={clause['severity']:.2f}, "
report += f"Importance={clause['importance']:.2f}, "
report += f"Confidence={clause['confidence']:.2%}\n"
if len(high_risk) > 5:
report += f"... and {len(high_risk) - 5} more high-risk clauses\n"
report += "\n"
# Risk distribution
report += f"📈 RISK DISTRIBUTION\n"
report += f"{'-'*70}\n"
risk_dist = contract_risk['risk_distribution']
for risk_id, stats in sorted(risk_dist.items(), key=lambda x: x[1]['avg_severity'], reverse=True):
report += f"Risk Type {risk_id}: "
report += f"{stats['count']} clauses ({stats['proportion']:.1%}), "
report += f"Avg Severity={stats['avg_severity']:.2f}\n"
report += f"\n{'='*70}\n"
return report
class RiskDependencyAnalyzer:
"""
Analyzes dependencies and interactions between different risk types in a contract.
This helps identify:
- Co-occurrence patterns (which risks tend to appear together)
- Risk amplification (how one risk type affects others)
- Risk chains (sequences of related risks)
"""
def __init__(self):
"""Initialize the risk dependency analyzer"""
self.cooccurrence_matrix = None
def analyze_risk_cooccurrence(self, clause_predictions: List[Dict[str, Any]],
num_risk_types: int = 7) -> np.ndarray:
"""
Analyze co-occurrence of risk types within a contract.
Args:
clause_predictions: Clause-level predictions
num_risk_types: Total number of risk types
Returns:
Co-occurrence matrix (num_risk_types x num_risk_types)
"""
# Extract risk IDs
risk_ids = [p['predicted_risk_id'] for p in clause_predictions]
# Initialize co-occurrence matrix
cooccur = np.zeros((num_risk_types, num_risk_types))
# Count co-occurrences (risks appearing in same contract)
for i in range(num_risk_types):
for j in range(num_risk_types):
# Count how many times risk i and j appear together
has_i = i in risk_ids
has_j = j in risk_ids
if has_i and has_j:
cooccur[i, j] += 1
self.cooccurrence_matrix = cooccur
return cooccur
def find_risk_chains(self, clause_predictions: List[Dict[str, Any]],
window_size: int = 3) -> List[List[int]]:
"""
Identify sequences of related risks (risk chains) in contract clauses.
Args:
clause_predictions: Clause-level predictions (ordered by clause position)
window_size: Size of sliding window to find risk chains
Returns:
List of risk chains (sequences of risk IDs)
"""
if len(clause_predictions) < window_size:
return []
risk_ids = [p['predicted_risk_id'] for p in clause_predictions]
chains = []
for i in range(len(risk_ids) - window_size + 1):
chain = risk_ids[i:i+window_size]
# Only keep chains with at least 2 different risk types
if len(set(chain)) >= 2:
chains.append(chain)
return chains
def compute_risk_correlation(self, contract_predictions: List[List[Dict[str, Any]]],
num_risk_types: int = 7) -> np.ndarray:
"""
Compute correlation between risk types across multiple contracts.
Args:
contract_predictions: List of contract predictions (each is list of clause predictions)
num_risk_types: Total number of risk types
Returns:
Correlation matrix showing how risk types co-occur across contracts
"""
# Create binary matrix: contracts x risk_types
num_contracts = len(contract_predictions)
risk_matrix = np.zeros((num_contracts, num_risk_types))
for contract_idx, clause_preds in enumerate(contract_predictions):
risk_ids = [p['predicted_risk_id'] for p in clause_preds]
for risk_id in set(risk_ids):
risk_matrix[contract_idx, risk_id] = 1
# Compute correlation
correlation = np.corrcoef(risk_matrix.T)
return correlation
def analyze_risk_amplification(self, clause_predictions: List[Dict[str, Any]]) -> Dict[int, Dict[str, float]]:
"""
Analyze how the presence of one risk type affects severity of others.
Args:
clause_predictions: Clause-level predictions
Returns:
Dictionary mapping risk_id to its amplification effects on other risks
"""
# Group clauses by risk type
risk_groups = defaultdict(list)
for pred in clause_predictions:
risk_groups[pred['predicted_risk_id']].append(pred)
amplification = {}
# For each risk type, compute its average severity
for risk_id, clauses in risk_groups.items():
severities = [c['severity_score'] for c in clauses]
importances = [c['importance_score'] for c in clauses]
amplification[risk_id] = {
'avg_severity': float(np.mean(severities)),
'max_severity': float(np.max(severities)),
'avg_importance': float(np.mean(importances)),
'clause_count': len(clauses),
'severity_variance': float(np.var(severities))
}
return amplification