code2-repo / risk_discovery.py
Deepu1965's picture
Upload folder using huggingface_hub
9b1c753 verified
"""Unsupervised Risk Discovery System - No Hardcoded Categories!
"""
import re
from typing import Dict, List, Tuple, Any
from collections import Counter
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import KMeans
from sklearn.decomposition import LatentDirichletAllocation
class UnsupervisedRiskDiscovery:
"""
Discovers risk patterns in legal contracts using unsupervised learning.
NO hardcoded risk categories - learns everything from text!
"""
def __init__(self, n_clusters: int = 7, random_state: int = 42):
self.n_clusters = n_clusters
self.random_state = random_state
# Initialize components
self.tfidf_vectorizer = TfidfVectorizer(
max_features=10000,
ngram_range=(1, 3),
stop_words='english',
lowercase=True,
min_df=2,
max_df=0.95
)
self.kmeans = KMeans(
n_clusters=n_clusters,
random_state=random_state,
n_init=10
)
# Risk pattern storage
self.discovered_patterns = {}
self.risk_features = {}
self.cluster_labels = None
self.feature_matrix = None
# Legal language patterns (domain-agnostic)
self.legal_indicators = {
'obligation_strength': r'\b(?:shall|must|required|mandatory|obligated|bound)\b',
'prohibition_terms': r'\b(?:shall not|must not|prohibited|forbidden|restricted)\b',
'conditional_risk': r'\b(?:if|unless|provided|subject to|in the event|failure to)\b',
'liability_terms': r'\b(?:liable|responsibility|damages|penalty|loss|harm)\b',
'temporal_urgency': r'\b(?:immediately|within|before|after|deadline|expir)\b',
'monetary_terms': r'\$|USD|dollar|payment|fee|cost|expense|fine',
'parties': r'\b(?:Party|Parties|Company|Corporation|Licensor|Licensee|Vendor|Customer)\b',
'dates': r'\b(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},?\s+\d{4}|\d{1,2}[/-]\d{1,2}[/-]\d{2,4}'
}
# Legal complexity indicators
self.complexity_indicators = {
'modal_verbs': r'\b(?:shall|must|may|should|will|might|could|would)\b',
'conditional_terms': r'\b(?:if|unless|provided|subject to|in the event|notwithstanding)\b',
'legal_conjunctions': r'\b(?:whereas|therefore|furthermore|moreover|however)\b',
'obligation_terms': r'\b(?:agrees?|undertakes?|covenants?|warrants?|represents?)\b'
}
def clean_clause_text(self, text: str) -> str:
"""Clean and normalize clause text"""
if not isinstance(text, str):
return ""
# Remove excessive whitespace
text = re.sub(r'\s+', ' ', text)
# Remove special characters but keep legal punctuation
text = re.sub(r'[^\w\s.,;:()"-]', ' ', text)
# Clean up spacing
text = text.strip()
return text
def extract_risk_features(self, clause_text: str) -> Dict[str, float]:
"""
Extract numerical features that indicate risk levels (domain-agnostic)
"""
text_lower = clause_text.lower()
words = text_lower.split()
features = {}
# Basic text statistics
features['clause_length'] = len(words)
features['sentence_count'] = len(re.split(r'[.!?]+', clause_text))
features['avg_word_length'] = np.mean([len(word) for word in words]) if words else 0
# Legal language intensity
for pattern_name, pattern in self.legal_indicators.items():
matches = len(re.findall(pattern, text_lower))
features[f'{pattern_name}_count'] = matches
features[f'{pattern_name}_density'] = matches / len(words) if words else 0
# Legal complexity features
for pattern_name, pattern in self.complexity_indicators.items():
matches = len(re.findall(pattern, text_lower))
features[f'{pattern_name}_complexity'] = matches / len(words) if words else 0
# Risk intensity indicators
features['obligation_strength'] = (
features.get('obligation_strength_density', 0) * 2 +
features.get('modal_verbs_complexity', 0)
)
features['legal_complexity'] = (
features.get('conditional_terms_complexity', 0) +
features.get('legal_conjunctions_complexity', 0) +
features.get('obligation_terms_complexity', 0)
)
features['risk_intensity'] = (
features.get('liability_terms_density', 0) * 2 +
features.get('prohibition_terms_density', 0) +
features.get('conditional_risk_density', 0)
)
return features
def discover_risk_patterns(self, clause_texts: List[str]) -> Dict[str, Any]:
"""
Discover risk patterns using unsupervised clustering.
Returns discovered risk types and their characteristics.
"""
print(f"πŸ” Discovering risk patterns from {len(clause_texts)} clauses...")
# Clean texts
cleaned_texts = [self.clean_clause_text(text) for text in clause_texts]
# Extract TF-IDF features
print("πŸ“Š Extracting TF-IDF features...")
self.feature_matrix = self.tfidf_vectorizer.fit_transform(cleaned_texts)
# Perform clustering
print(f"🎯 Clustering into {self.n_clusters} risk patterns...")
self.cluster_labels = self.kmeans.fit_predict(self.feature_matrix)
# Extract risk features for each clause
print("βš–οΈ Extracting legal risk features...")
risk_features_list = [self.extract_risk_features(text) for text in clause_texts]
# Analyze discovered clusters
self.discovered_patterns = self._analyze_clusters(
cleaned_texts, self.cluster_labels, risk_features_list
)
print("βœ… Risk pattern discovery complete!")
print(f"πŸ“‹ Discovered {len(self.discovered_patterns)} risk patterns:")
for i, (pattern_name, details) in enumerate(self.discovered_patterns.items()):
print(f" {i+1}. {pattern_name}: {details['clause_count']} clauses")
print(f" Key terms: {', '.join(details['key_terms'][:5])}")
print(f" Risk intensity: {details['avg_risk_intensity']:.3f}")
# Calculate quality metrics
from sklearn.metrics import silhouette_score
try:
silhouette = silhouette_score(self.feature_matrix, self.cluster_labels)
except:
silhouette = 0.0
# Return structured results for comparison
return {
'method': 'K-Means_Clustering',
'n_clusters': self.n_clusters,
'discovered_patterns': self.discovered_patterns,
'cluster_labels': self.cluster_labels,
'quality_metrics': {
'silhouette_score': silhouette,
'n_patterns': len(self.discovered_patterns)
}
}
def _analyze_clusters(self, texts: List[str], labels: np.ndarray,
risk_features: List[Dict]) -> Dict[str, Any]:
"""Analyze and name discovered clusters"""
patterns = {}
# Get feature names
feature_names = self.tfidf_vectorizer.get_feature_names_out()
for cluster_id in range(self.n_clusters):
# Get clauses in this cluster
cluster_mask = labels == cluster_id
cluster_texts = [texts[i] for i in range(len(texts)) if cluster_mask[i]]
cluster_features = [risk_features[i] for i in range(len(risk_features)) if cluster_mask[i]]
# Get top terms for this cluster
cluster_center = self.kmeans.cluster_centers_[cluster_id]
top_indices = cluster_center.argsort()[-20:][::-1]
top_terms = [feature_names[i] for i in top_indices]
# Calculate average risk features
avg_features = {}
if cluster_features:
for key in cluster_features[0].keys():
avg_features[key] = np.mean([f.get(key, 0) for f in cluster_features])
# Generate cluster name based on top terms and risk characteristics
cluster_name = self._generate_cluster_name(top_terms, avg_features)
patterns[cluster_name] = {
'cluster_id': cluster_id,
'clause_count': len(cluster_texts),
'key_terms': top_terms,
'avg_risk_intensity': avg_features.get('risk_intensity', 0),
'avg_legal_complexity': avg_features.get('legal_complexity', 0),
'avg_obligation_strength': avg_features.get('obligation_strength', 0),
'sample_clauses': cluster_texts[:3],
'risk_features': avg_features
}
return patterns
def _generate_cluster_name(self, top_terms: List[str], avg_features: Dict[str, float]) -> str:
"""Generate meaningful names for discovered clusters"""
# Analyze top terms to identify risk theme
term_analysis = {
'liability': ['liable', 'liability', 'damages', 'loss', 'harm', 'injury'],
'obligation': ['shall', 'must', 'required', 'obligation', 'duty'],
'indemnity': ['indemnify', 'indemnification', 'defend', 'hold harmless'],
'termination': ['terminate', 'termination', 'end', 'expire', 'breach'],
'intellectual_property': ['intellectual', 'property', 'patent', 'copyright', 'trademark'],
'confidentiality': ['confidential', 'confidentiality', 'non-disclosure', 'proprietary'],
'compliance': ['comply', 'compliance', 'regulation', 'law', 'legal']
}
# Score each theme based on term presence
theme_scores = {}
for theme, keywords in term_analysis.items():
score = sum(1 for term in top_terms[:10] if any(kw in term.lower() for kw in keywords))
theme_scores[theme] = score
# Get best matching theme
best_theme = max(theme_scores, key=theme_scores.get) if theme_scores else 'general'
# Add intensity modifier based on risk features
risk_intensity = avg_features.get('risk_intensity', 0)
if risk_intensity > 0.1:
intensity = 'high_risk'
elif risk_intensity > 0.05:
intensity = 'moderate_risk'
else:
intensity = 'low_risk'
return f"{intensity}_{best_theme}_pattern"
def get_risk_labels(self, clause_texts: List[str]) -> List[int]:
"""Get risk cluster labels for new clause texts"""
if self.cluster_labels is None:
raise ValueError("Must discover patterns first using discover_risk_patterns()")
cleaned_texts = [self.clean_clause_text(text) for text in clause_texts]
feature_matrix = self.tfidf_vectorizer.transform(cleaned_texts)
return self.kmeans.predict(feature_matrix)
def get_discovered_risk_names(self) -> List[str]:
"""Get list of discovered risk pattern names"""
if not self.discovered_patterns:
raise ValueError("Must discover patterns first using discover_risk_patterns()")
return list(self.discovered_patterns.keys())
class LDARiskDiscovery:
"""
LDA-based risk discovery system - wrapper around TopicModelingRiskDiscovery
Provides a compatible interface with UnsupervisedRiskDiscovery while using LDA underneath.
LDA (Latent Dirichlet Allocation) is superior for legal text because:
- Discovers overlapping risk categories (clauses can belong to multiple topics)
- Provides probability distributions over risk types
- Better balance across discovered patterns
- More interpretable topic-word distributions
"""
def __init__(self, n_clusters: int = 7, doc_topic_prior: float = 0.1,
topic_word_prior: float = 0.01, max_iter: int = 20,
max_features: int = 5000, learning_method: str = 'batch',
random_state: int = 42):
"""
Initialize LDA risk discovery system.
Args:
n_clusters: Number of risk topics to discover
doc_topic_prior: Alpha parameter (document-topic concentration, lower = more focused)
topic_word_prior: Beta parameter (topic-word concentration, lower = more focused)
max_iter: Maximum iterations for LDA training
max_features: Vocabulary size for feature extraction
learning_method: 'batch' (more accurate) or 'online' (faster for large datasets)
random_state: Random seed for reproducibility
"""
from risk_discovery_alternatives import TopicModelingRiskDiscovery
self.n_clusters = n_clusters
self.random_state = random_state
# Initialize LDA backend
self.lda_backend = TopicModelingRiskDiscovery(
n_topics=n_clusters,
random_state=random_state
)
# Override LDA parameters
self.lda_backend.lda_model.doc_topic_prior = doc_topic_prior
self.lda_backend.lda_model.topic_word_prior = topic_word_prior
self.lda_backend.lda_model.max_iter = max_iter
self.lda_backend.lda_model.learning_method = learning_method
self.lda_backend.vectorizer.max_features = max_features
# Storage for compatibility
self.discovered_patterns = {}
self.cluster_labels = None # Will store dominant topic per document
self.feature_matrix = None
# Legal language patterns (same as UnsupervisedRiskDiscovery for compatibility)
self.legal_indicators = {
'obligation_strength': r'\b(?:shall|must|required|mandatory|obligated|bound)\b',
'prohibition_terms': r'\b(?:shall not|must not|prohibited|forbidden|restricted)\b',
'conditional_risk': r'\b(?:if|unless|provided|subject to|in the event|failure to)\b',
'liability_terms': r'\b(?:liable|responsibility|damages|penalty|loss|harm)\b',
'temporal_urgency': r'\b(?:immediately|within|before|after|deadline|expir)\b',
'monetary_terms': r'\$|USD|dollar|payment|fee|cost|expense|fine',
'parties': r'\b(?:Party|Parties|Company|Corporation|Licensor|Licensee|Vendor|Customer)\b',
'dates': r'\b(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},?\s+\d{4}|\d{1,2}[/-]\d{1,2}[/-]\d{2,4}'
}
# Legal complexity indicators
self.complexity_indicators = {
'modal_verbs': r'\b(?:shall|must|may|should|will|might|could|would)\b',
'conditional_terms': r'\b(?:if|unless|provided|subject to|in the event|notwithstanding)\b',
'legal_conjunctions': r'\b(?:whereas|therefore|furthermore|moreover|however)\b',
'obligation_terms': r'\b(?:agrees?|undertakes?|covenants?|warrants?|represents?)\b'
}
def discover_risk_patterns(self, clause_texts: List[str]) -> Dict[str, Any]:
"""
Discover risk patterns using LDA topic modeling.
Compatible with UnsupervisedRiskDiscovery interface.
Args:
clause_texts: List of legal clause texts
Returns:
Dictionary with discovered patterns and quality metrics
"""
print(f"πŸ” Discovering risk patterns using LDA (n_topics={self.n_clusters})...")
print(" πŸ“Š LDA provides balanced, overlapping risk categories")
print(" 🎯 Best for legal text with multi-faceted risks")
# Run LDA discovery
results = self.lda_backend.discover_risk_patterns(clause_texts)
# Store results for compatibility
self.discovered_patterns = results.get('discovered_topics', {})
self.cluster_labels = results.get('topic_labels', None)
self.feature_matrix = self.lda_backend.feature_matrix
# Add keywords field for compatibility with trainer
for topic_name, topic_info in self.discovered_patterns.items():
if 'keywords' not in topic_info and 'top_words' in topic_info:
topic_info['keywords'] = topic_info['top_words']
print(f"βœ… LDA discovery complete: {len(self.discovered_patterns)} risk topics found")
return results
def get_risk_labels(self, clause_texts: List[str]) -> List[int]:
"""
Get dominant topic labels for new clause texts.
Returns the most probable topic for each clause.
Args:
clause_texts: List of legal clause texts
Returns:
List of topic IDs (0 to n_clusters-1)
"""
if self.cluster_labels is None:
raise ValueError("Must discover patterns first using discover_risk_patterns()")
# Clean and transform new clauses
cleaned_texts = [self.lda_backend._clean_text(text) for text in clause_texts]
feature_matrix = self.lda_backend.vectorizer.transform(cleaned_texts)
# Get topic distribution and extract dominant topic
doc_topic_dist = self.lda_backend.lda_model.transform(feature_matrix)
# Return the topic with highest probability for each document
labels = doc_topic_dist.argmax(axis=1).tolist()
return labels
def get_discovered_risk_names(self) -> List[str]:
"""Get list of discovered risk topic names"""
if not self.discovered_patterns:
raise ValueError("Must discover patterns first using discover_risk_patterns()")
return list(self.discovered_patterns.keys())
def get_topic_distribution(self, clause_texts: List[str]) -> np.ndarray:
"""
Get full probability distribution over topics for clauses.
This is unique to LDA - shows membership in ALL topics with probabilities.
Args:
clause_texts: List of legal clause texts
Returns:
Array of shape (n_clauses, n_topics) with probability distributions
"""
cleaned = [self.lda_backend._clean_text(c) for c in clause_texts]
feature_matrix = self.lda_backend.vectorizer.transform(cleaned)
return self.lda_backend.lda_model.transform(feature_matrix)
def clean_clause_text(self, text: str) -> str:
"""Clean and normalize clause text - for compatibility with trainer"""
if not isinstance(text, str):
return ""
# Remove excessive whitespace
text = re.sub(r'\s+', ' ', text)
# Remove special characters but keep legal punctuation
text = re.sub(r'[^\w\s.,;:()"-]', ' ', text)
# Clean up spacing
text = text.strip()
return text
def extract_risk_features(self, clause_text: str) -> Dict[str, float]:
"""
Extract numerical features that indicate risk levels.
Required by trainer for generating synthetic severity/importance scores.
"""
text_lower = clause_text.lower()
words = text_lower.split()
features = {}
# Basic text statistics
features['clause_length'] = len(words)
features['sentence_count'] = len(re.split(r'[.!?]+', clause_text))
features['avg_word_length'] = np.mean([len(word) for word in words]) if words else 0
# Legal language intensity
for pattern_name, pattern in self.legal_indicators.items():
matches = len(re.findall(pattern, text_lower))
features[f'{pattern_name}_count'] = matches
features[f'{pattern_name}_density'] = matches / len(words) if words else 0
# Legal complexity features
for pattern_name, pattern in self.complexity_indicators.items():
matches = len(re.findall(pattern, text_lower))
features[f'{pattern_name}_complexity'] = matches / len(words) if words else 0
# Risk intensity indicators
features['obligation_strength'] = (
features.get('obligation_strength_density', 0) * 2 +
features.get('modal_verbs_complexity', 0)
)
features['legal_complexity'] = (
features.get('conditional_terms_complexity', 0) +
features.get('legal_conjunctions_complexity', 0) +
features.get('obligation_terms_complexity', 0)
)
features['risk_intensity'] = (
features.get('liability_terms_density', 0) * 2 +
features.get('prohibition_terms_density', 0) +
features.get('conditional_risk_density', 0)
)
return features