code2-repo / risk_discovery_alternatives.py
Deepu1965's picture
Upload folder using huggingface_hub
9b1c753 verified
"""
Alternative Risk Discovery Methods for Comparison
This module implements 3 alternative approaches to risk pattern discovery:
1. Topic Modeling (LDA) - Discovers latent risk topics
2. Hierarchical Clustering (Agglomerative) - Discovers nested risk hierarchies
3. Density-Based Clustering (DBSCAN) - Discovers risk clusters of varying shapes
Each method provides a different perspective on risk patterns in legal contracts.
"""
import re
import numpy as np
from typing import Dict, List, Tuple, Any
from collections import Counter, defaultdict
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.decomposition import LatentDirichletAllocation, NMF
from sklearn.cluster import AgglomerativeClustering, DBSCAN
from sklearn.metrics import silhouette_score
import warnings
class TopicModelingRiskDiscovery:
"""
Risk discovery using Latent Dirichlet Allocation (LDA) topic modeling.
Discovers risk patterns as latent topics where each clause is a mixture of topics.
Better for discovering overlapping risk categories and multi-faceted risks.
Advantages:
- Handles overlapping risk types naturally
- Provides probability distribution over risk types
- Discovers interpretable topic words
- Works well with legal text (documents with multiple themes)
Disadvantages:
- Requires more tuning (alpha, beta parameters)
- Slower than K-Means
- Less clear cluster boundaries
"""
def __init__(self, n_topics: int = 7, random_state: int = 42):
self.n_topics = n_topics
self.random_state = random_state
# Use CountVectorizer for LDA (works better than TF-IDF)
self.vectorizer = CountVectorizer(
max_features=5000,
ngram_range=(1, 2),
stop_words='english',
lowercase=True,
min_df=3,
max_df=0.85
)
# LDA model
self.lda_model = LatentDirichletAllocation(
n_components=n_topics,
random_state=random_state,
max_iter=20,
learning_method='batch',
doc_topic_prior=0.1, # Alpha - document-topic density
topic_word_prior=0.01, # Beta - topic-word density
n_jobs=-1
)
self.discovered_topics = {}
self.topic_labels = None
self.feature_matrix = None
self.topic_word_distribution = None
def discover_risk_patterns(self, clauses: List[str]) -> Dict[str, Any]:
"""
Discover risk patterns using LDA topic modeling.
Args:
clauses: List of legal clause texts
Returns:
Dictionary with discovered topics and assignments
"""
print(f"πŸ” Discovering risk topics using LDA (n_topics={self.n_topics})...")
# Clean clauses
cleaned_clauses = [self._clean_text(c) for c in clauses]
# Create document-term matrix
print(" πŸ“Š Creating document-term matrix...")
self.feature_matrix = self.vectorizer.fit_transform(cleaned_clauses)
feature_names = self.vectorizer.get_feature_names_out()
# Fit LDA model
print(" 🧠 Fitting LDA model...")
self.lda_model.fit(self.feature_matrix)
# Get topic-word distribution
self.topic_word_distribution = self.lda_model.components_
# Get document-topic distribution
doc_topic_dist = self.lda_model.transform(self.feature_matrix)
# Assign each document to dominant topic
self.topic_labels = np.argmax(doc_topic_dist, axis=1)
# Extract top words for each topic
print(" πŸ“ Extracting topic keywords...")
n_top_words = 15
for topic_idx in range(self.n_topics):
top_word_indices = np.argsort(self.topic_word_distribution[topic_idx])[-n_top_words:][::-1]
top_words = [feature_names[i] for i in top_word_indices]
top_weights = [self.topic_word_distribution[topic_idx][i] for i in top_word_indices]
# Generate topic name from top words
topic_name = self._generate_topic_name(top_words)
# Count clauses in this topic
clause_count = np.sum(self.topic_labels == topic_idx)
self.discovered_topics[topic_idx] = {
'topic_id': topic_idx,
'topic_name': topic_name,
'top_words': top_words,
'word_weights': top_weights,
'clause_count': int(clause_count),
'proportion': float(clause_count / len(clauses))
}
# Compute perplexity and log-likelihood
perplexity = self.lda_model.perplexity(self.feature_matrix)
log_likelihood = self.lda_model.score(self.feature_matrix)
print(f"βœ… LDA discovery complete: {self.n_topics} topics found")
print(f" Perplexity: {perplexity:.2f} (lower is better)")
print(f" Log-likelihood: {log_likelihood:.2f}")
return {
'method': 'LDA_Topic_Modeling',
'n_topics': self.n_topics,
'discovered_topics': self.discovered_topics,
'topic_labels': self.topic_labels,
'doc_topic_distribution': doc_topic_dist,
'perplexity': perplexity,
'log_likelihood': log_likelihood,
'quality_metrics': {
'perplexity': perplexity,
'avg_topic_diversity': self._compute_topic_diversity()
}
}
def get_clause_topic_distribution(self, clause_idx: int) -> Dict[int, float]:
"""Get probability distribution over topics for a specific clause"""
if self.feature_matrix is None:
return {}
doc_topic_dist = self.lda_model.transform(self.feature_matrix)
return {topic_id: float(prob) for topic_id, prob in enumerate(doc_topic_dist[clause_idx])}
def _clean_text(self, text: str) -> str:
"""Clean clause text"""
if not isinstance(text, str):
return ""
text = re.sub(r'\s+', ' ', text)
return text.strip()
def _generate_topic_name(self, top_words: List[str]) -> str:
"""Generate descriptive name from top words"""
# Look for common legal risk themes
themes = {
'liability': ['liability', 'liable', 'damages', 'loss', 'harm', 'injury'],
'indemnity': ['indemnify', 'indemnification', 'hold', 'harmless', 'defend'],
'termination': ['terminate', 'termination', 'cancel', 'end', 'expire'],
'intellectual_property': ['intellectual', 'property', 'ip', 'patent', 'copyright', 'trademark'],
'confidentiality': ['confidential', 'confidentiality', 'disclosure', 'nda', 'secret'],
'payment': ['payment', 'pay', 'fee', 'price', 'cost', 'charge'],
'compliance': ['comply', 'compliance', 'regulation', 'law', 'legal', 'regulatory'],
'warranty': ['warranty', 'warrant', 'represent', 'guarantee', 'assure']
}
# Score each theme
theme_scores = defaultdict(int)
for word in top_words[:10]:
for theme, keywords in themes.items():
if any(keyword in word.lower() for keyword in keywords):
theme_scores[theme] += 1
# Pick best theme or use top words
if theme_scores:
best_theme = max(theme_scores.items(), key=lambda x: x[1])[0]
return f"Topic_{best_theme.upper()}"
else:
return f"Topic_{top_words[0].upper()}_{top_words[1].upper()}"
def _compute_topic_diversity(self) -> float:
"""Compute average diversity of topics (entropy of word distribution)"""
diversities = []
for topic_idx in range(self.n_topics):
word_dist = self.topic_word_distribution[topic_idx]
word_dist = word_dist / np.sum(word_dist) # Normalize
entropy = -np.sum(word_dist * np.log(word_dist + 1e-10))
diversities.append(entropy)
return float(np.mean(diversities))
class HierarchicalRiskDiscovery:
"""
Risk discovery using Hierarchical Agglomerative Clustering.
Discovers nested risk hierarchies where similar risks are grouped at multiple levels.
Better for understanding relationships between risk types.
Advantages:
- Discovers hierarchical structure (parent-child risk relationships)
- No need to specify number of clusters upfront
- Deterministic results
- Can cut dendrogram at different levels
Disadvantages:
- Slower for large datasets (O(nΒ²) or O(nΒ³))
- Memory intensive
- Cannot handle very large datasets
"""
def __init__(self, n_clusters: int = 7, linkage: str = 'ward', random_state: int = 42):
self.n_clusters = n_clusters
self.linkage = linkage # 'ward', 'average', 'complete', 'single'
self.random_state = random_state
# TF-IDF vectorizer
self.vectorizer = TfidfVectorizer(
max_features=8000,
ngram_range=(1, 3),
stop_words='english',
lowercase=True,
min_df=2,
max_df=0.90
)
# Hierarchical clustering model
self.clustering_model = AgglomerativeClustering(
n_clusters=n_clusters,
linkage=linkage
)
self.discovered_clusters = {}
self.cluster_labels = None
self.feature_matrix = None
def discover_risk_patterns(self, clauses: List[str]) -> Dict[str, Any]:
"""
Discover risk patterns using hierarchical clustering.
Args:
clauses: List of legal clause texts
Returns:
Dictionary with discovered clusters and hierarchy
"""
print(f"πŸ” Discovering risk patterns using Hierarchical Clustering (n_clusters={self.n_clusters})...")
# Clean clauses
cleaned_clauses = [self._clean_text(c) for c in clauses]
# Create TF-IDF matrix
print(" πŸ“Š Creating TF-IDF feature matrix...")
self.feature_matrix = self.vectorizer.fit_transform(cleaned_clauses)
feature_names = self.vectorizer.get_feature_names_out()
# Fit hierarchical clustering
print(f" 🧠 Fitting Hierarchical Clustering (linkage={self.linkage})...")
self.cluster_labels = self.clustering_model.fit_predict(self.feature_matrix.toarray())
# Analyze each cluster
print(" πŸ“ Analyzing discovered clusters...")
for cluster_id in range(self.n_clusters):
cluster_mask = self.cluster_labels == cluster_id
cluster_indices = np.where(cluster_mask)[0]
# Get representative clauses
cluster_clauses = [clauses[i] for i in cluster_indices]
# Extract top TF-IDF terms for this cluster
cluster_tfidf = self.feature_matrix[cluster_mask].mean(axis=0)
top_term_indices = np.argsort(np.asarray(cluster_tfidf).flatten())[-15:][::-1]
top_terms = [feature_names[i] for i in top_term_indices]
top_scores = [float(cluster_tfidf[0, i]) for i in top_term_indices]
# Generate cluster name
cluster_name = self._generate_cluster_name(top_terms)
self.discovered_clusters[cluster_id] = {
'cluster_id': cluster_id,
'cluster_name': cluster_name,
'top_terms': top_terms,
'term_scores': top_scores,
'clause_count': int(len(cluster_indices)),
'proportion': float(len(cluster_indices) / len(clauses)),
'sample_clauses': cluster_clauses[:3] # First 3 clauses as examples
}
# Compute silhouette score
if len(clauses) < 10000: # Only for reasonable sizes
silhouette = silhouette_score(self.feature_matrix, self.cluster_labels)
else:
silhouette = None
print(f"βœ… Hierarchical clustering complete: {self.n_clusters} clusters found")
if silhouette:
print(f" Silhouette Score: {silhouette:.3f} (range: -1 to 1, higher is better)")
return {
'method': 'Hierarchical_Agglomerative_Clustering',
'n_clusters': self.n_clusters,
'linkage': self.linkage,
'discovered_clusters': self.discovered_clusters,
'cluster_labels': self.cluster_labels,
'quality_metrics': {
'silhouette_score': silhouette if silhouette else 'N/A',
'avg_cluster_size': float(np.mean([c['clause_count'] for c in self.discovered_clusters.values()]))
}
}
def _clean_text(self, text: str) -> str:
"""Clean clause text"""
if not isinstance(text, str):
return ""
text = re.sub(r'\s+', ' ', text)
return text.strip()
def _generate_cluster_name(self, top_terms: List[str]) -> str:
"""Generate descriptive name from top terms"""
# Legal risk theme detection
themes = {
'LIABILITY': ['liability', 'liable', 'damages', 'loss'],
'INDEMNITY': ['indemnify', 'indemnification', 'hold', 'harmless'],
'TERMINATION': ['terminate', 'termination', 'cancel', 'expire'],
'IP': ['intellectual', 'property', 'patent', 'copyright'],
'CONFIDENTIAL': ['confidential', 'nda', 'disclosure', 'secret'],
'PAYMENT': ['payment', 'pay', 'fee', 'price'],
'COMPLIANCE': ['comply', 'compliance', 'regulation', 'law'],
'WARRANTY': ['warranty', 'warrant', 'represent', 'guarantee']
}
for theme, keywords in themes.items():
if any(keyword in term.lower() for term in top_terms[:5] for keyword in keywords):
return f"RISK_{theme}"
return f"RISK_{top_terms[0].upper()}_{top_terms[1].upper()}"
class DensityBasedRiskDiscovery:
"""
Risk discovery using DBSCAN (Density-Based Spatial Clustering).
Discovers risk clusters based on density, identifying core risks and outliers.
Better for finding unusual/rare risk patterns and handling noise.
Advantages:
- Discovers clusters of arbitrary shapes
- Identifies outliers/noise (rare risk patterns)
- No need to specify number of clusters
- Robust to outliers
Disadvantages:
- Sensitive to hyperparameters (eps, min_samples)
- Struggles with varying density clusters
- Can produce many small clusters
"""
def __init__(self, eps: float = 0.5, min_samples: int = 5, random_state: int = 42):
self.eps = eps # Maximum distance between samples
self.min_samples = min_samples # Minimum samples in neighborhood
self.random_state = random_state
# TF-IDF vectorizer
self.vectorizer = TfidfVectorizer(
max_features=6000,
ngram_range=(1, 2),
stop_words='english',
lowercase=True,
min_df=3,
max_df=0.85
)
# DBSCAN model
self.dbscan_model = DBSCAN(
eps=eps,
min_samples=min_samples,
metric='cosine',
n_jobs=-1
)
self.discovered_clusters = {}
self.cluster_labels = None
self.feature_matrix = None
self.outlier_indices = []
def discover_risk_patterns(self, clauses: List[str], auto_tune: bool = True) -> Dict[str, Any]:
"""
Discover risk patterns using DBSCAN.
Args:
clauses: List of legal clause texts
auto_tune: If True, automatically tune eps parameter
Returns:
Dictionary with discovered clusters and outliers
"""
print(f"πŸ” Discovering risk patterns using DBSCAN...")
# Clean clauses
cleaned_clauses = [self._clean_text(c) for c in clauses]
# Create TF-IDF matrix
print(" πŸ“Š Creating TF-IDF feature matrix...")
self.feature_matrix = self.vectorizer.fit_transform(cleaned_clauses)
feature_names = self.vectorizer.get_feature_names_out()
# Auto-tune eps if requested
if auto_tune:
print(" πŸ”§ Auto-tuning eps parameter...")
self.eps = self._auto_tune_eps(self.feature_matrix)
self.dbscan_model.eps = self.eps
print(f" Selected eps={self.eps:.3f}")
# Fit DBSCAN
print(f" 🧠 Fitting DBSCAN (eps={self.eps}, min_samples={self.min_samples})...")
self.cluster_labels = self.dbscan_model.fit_predict(self.feature_matrix)
# Identify unique clusters (excluding noise label -1)
unique_clusters = [c for c in np.unique(self.cluster_labels) if c != -1]
n_clusters = len(unique_clusters)
n_noise = np.sum(self.cluster_labels == -1)
print(f" πŸ“Š Found {n_clusters} clusters and {n_noise} outliers/noise points")
# Analyze each cluster
print(" πŸ“ Analyzing discovered clusters...")
for cluster_id in unique_clusters:
cluster_mask = self.cluster_labels == cluster_id
cluster_indices = np.where(cluster_mask)[0]
# Get representative clauses
cluster_clauses = [clauses[i] for i in cluster_indices]
# Extract top TF-IDF terms
cluster_tfidf = self.feature_matrix[cluster_mask].mean(axis=0)
top_term_indices = np.argsort(np.asarray(cluster_tfidf).flatten())[-15:][::-1]
top_terms = [feature_names[i] for i in top_term_indices]
top_scores = [float(cluster_tfidf[0, i]) for i in top_term_indices]
# Generate cluster name
cluster_name = self._generate_cluster_name(top_terms, cluster_id)
self.discovered_clusters[cluster_id] = {
'cluster_id': cluster_id,
'cluster_name': cluster_name,
'top_terms': top_terms,
'term_scores': top_scores,
'clause_count': int(len(cluster_indices)),
'proportion': float(len(cluster_indices) / len(clauses)),
'is_core_cluster': len(cluster_indices) >= self.min_samples * 3
}
# Analyze outliers/noise
self.outlier_indices = np.where(self.cluster_labels == -1)[0]
outlier_clauses = [clauses[i] for i in self.outlier_indices]
print(f"βœ… DBSCAN discovery complete: {n_clusters} clusters, {n_noise} outliers")
return {
'method': 'DBSCAN_Density_Based_Clustering',
'n_clusters': n_clusters,
'n_outliers': int(n_noise),
'eps': self.eps,
'min_samples': self.min_samples,
'discovered_clusters': self.discovered_clusters,
'cluster_labels': self.cluster_labels,
'outlier_indices': self.outlier_indices.tolist(),
'outlier_clauses': outlier_clauses[:10], # First 10 outliers
'quality_metrics': {
'n_clusters': n_clusters,
'outlier_ratio': float(n_noise / len(clauses)),
'avg_cluster_size': float(np.mean([c['clause_count'] for c in self.discovered_clusters.values()])) if n_clusters > 0 else 0
}
}
def _clean_text(self, text: str) -> str:
"""Clean clause text"""
if not isinstance(text, str):
return ""
text = re.sub(r'\s+', ' ', text)
return text.strip()
def _auto_tune_eps(self, feature_matrix, sample_size: int = 1000) -> float:
"""
Auto-tune eps parameter using k-distance graph.
Uses a sample of data to estimate optimal eps.
"""
from sklearn.neighbors import NearestNeighbors
# Sample data if too large
n_samples = min(sample_size, feature_matrix.shape[0])
if feature_matrix.shape[0] > sample_size:
indices = np.random.choice(feature_matrix.shape[0], sample_size, replace=False)
sample_matrix = feature_matrix[indices]
else:
sample_matrix = feature_matrix
# Compute k-nearest neighbors
k = self.min_samples
nbrs = NearestNeighbors(n_neighbors=k, metric='cosine').fit(sample_matrix)
distances, _ = nbrs.kneighbors(sample_matrix)
# Get k-th nearest neighbor distance
k_distances = np.sort(distances[:, -1])
# Use elbow method: find point where distances increase rapidly
# Simple heuristic: use 90th percentile
eps = np.percentile(k_distances, 90)
return float(eps)
def _generate_cluster_name(self, top_terms: List[str], cluster_id: int) -> str:
"""Generate descriptive name from top terms"""
# Legal risk theme detection
themes = {
'LIABILITY': ['liability', 'liable', 'damages', 'loss'],
'INDEMNITY': ['indemnify', 'indemnification', 'hold', 'harmless'],
'TERMINATION': ['terminate', 'termination', 'cancel', 'expire'],
'IP': ['intellectual', 'property', 'patent', 'copyright'],
'CONFIDENTIAL': ['confidential', 'nda', 'disclosure', 'secret'],
'PAYMENT': ['payment', 'pay', 'fee', 'price'],
'COMPLIANCE': ['comply', 'compliance', 'regulation', 'law'],
'WARRANTY': ['warranty', 'warrant', 'represent', 'guarantee']
}
for theme, keywords in themes.items():
if any(keyword in term.lower() for term in top_terms[:5] for keyword in keywords):
return f"RISK_{theme}_C{cluster_id}"
return f"RISK_CLUSTER_{cluster_id}_{top_terms[0].upper()}"
def get_outlier_analysis(self) -> Dict[str, Any]:
"""
Analyze outlier/noise points to identify rare risk patterns.
Returns:
Dictionary with outlier analysis
"""
if len(self.outlier_indices) == 0:
return {'message': 'No outliers found'}
return {
'n_outliers': len(self.outlier_indices),
'outlier_ratio': len(self.outlier_indices) / len(self.cluster_labels),
'interpretation': 'Outliers may represent rare or unique risk patterns that do not fit common categories'
}
class NMFRiskDiscovery:
"""
Risk discovery using Non-negative Matrix Factorization (NMF).
NMF decomposes the document-term matrix into interpretable parts-based representations.
Different from clustering - learns additive combinations of basis patterns.
Advantages:
- βœ… Parts-based decomposition (additive patterns)
- βœ… Highly interpretable results
- βœ… Non-negative weights (intuitive)
- βœ… Fast convergence
- βœ… Works well with TF-IDF
Disadvantages:
- ❌ Requires non-negative features
- ❌ Sensitive to initialization
- ❌ May not capture global structure
"""
def __init__(self, n_components: int = 7, random_state: int = 42):
self.n_components = n_components
self.random_state = random_state
# TF-IDF vectorizer
self.vectorizer = TfidfVectorizer(
max_features=8000,
ngram_range=(1, 2),
stop_words='english',
lowercase=True,
min_df=3,
max_df=0.85,
norm='l2' # Important for NMF
)
# NMF model - handle different scikit-learn versions
# Versions < 1.0: use 'alpha' and 'l1_ratio'
# Versions >= 1.0: use 'alpha_W', 'alpha_H', 'l1_ratio'
# Very old versions: neither parameter exists
import sklearn
sklearn_version = tuple(map(int, sklearn.__version__.split('.')[:2]))
nmf_params = {
'n_components': n_components,
'random_state': random_state,
'init': 'nndsvda',
'max_iter': 500
}
# Add regularization params if supported
if sklearn_version >= (1, 0):
# scikit-learn >= 1.0
nmf_params['alpha_W'] = 0.1
nmf_params['alpha_H'] = 0.1
nmf_params['l1_ratio'] = 0.5
elif sklearn_version >= (0, 19):
# scikit-learn 0.19 to 0.24
nmf_params['alpha'] = 0.1
nmf_params['l1_ratio'] = 0.5
# else: very old version, use basic params only
self.nmf_model = NMF(**nmf_params)
self.discovered_components = {}
self.component_labels = None
self.feature_matrix = None
self.W_matrix = None # Document-component matrix
self.H_matrix = None # Component-feature matrix
def discover_risk_patterns(self, clauses: List[str]) -> Dict[str, Any]:
"""
Discover risk patterns using NMF decomposition.
Args:
clauses: List of legal clause texts
Returns:
Dictionary with discovered components and assignments
"""
print(f"πŸ” Discovering risk patterns using NMF (n_components={self.n_components})...")
# Clean clauses
cleaned_clauses = [self._clean_text(c) for c in clauses]
# Create TF-IDF matrix
print(" πŸ“Š Creating TF-IDF feature matrix...")
self.feature_matrix = self.vectorizer.fit_transform(cleaned_clauses)
feature_names = self.vectorizer.get_feature_names_out()
# Fit NMF model
print(" 🧠 Fitting NMF model...")
self.W_matrix = self.nmf_model.fit_transform(self.feature_matrix)
self.H_matrix = self.nmf_model.components_
# Assign each document to dominant component
self.component_labels = np.argmax(self.W_matrix, axis=1)
# Extract top words for each component
print(" πŸ“ Extracting component keywords...")
n_top_words = 15
for component_idx in range(self.n_components):
top_word_indices = np.argsort(self.H_matrix[component_idx])[-n_top_words:][::-1]
top_words = [feature_names[i] for i in top_word_indices]
top_weights = [self.H_matrix[component_idx][i] for i in top_word_indices]
# Generate component name
component_name = self._generate_component_name(top_words)
# Count clauses in this component
clause_count = np.sum(self.component_labels == component_idx)
# Get average component weight (strength)
avg_weight = np.mean(self.W_matrix[:, component_idx])
self.discovered_components[component_idx] = {
'component_id': component_idx,
'component_name': component_name,
'top_words': top_words,
'word_weights': top_weights,
'clause_count': int(clause_count),
'proportion': float(clause_count / len(clauses)),
'avg_strength': float(avg_weight)
}
# Compute reconstruction error
reconstruction_error = self.nmf_model.reconstruction_err_
# Compute sparsity (how sparse are the representations)
sparsity = np.mean(self.W_matrix == 0)
print(f"βœ… NMF discovery complete: {self.n_components} components found")
print(f" Reconstruction error: {reconstruction_error:.2f}")
print(f" Sparsity: {sparsity:.2%}")
return {
'method': 'NMF_Matrix_Factorization',
'n_components': self.n_components,
'discovered_components': self.discovered_components,
'component_labels': self.component_labels,
'component_strengths': self.W_matrix,
'quality_metrics': {
'reconstruction_error': float(reconstruction_error),
'sparsity': float(sparsity),
'avg_component_strength': float(np.mean(np.max(self.W_matrix, axis=1)))
}
}
def get_clause_composition(self, clause_idx: int) -> Dict[int, float]:
"""Get component composition for a specific clause"""
if self.W_matrix is None:
return {}
return {comp_id: float(weight) for comp_id, weight in enumerate(self.W_matrix[clause_idx])}
def _clean_text(self, text: str) -> str:
"""Clean clause text"""
if not isinstance(text, str):
return ""
text = re.sub(r'\s+', ' ', text)
return text.strip()
def _generate_component_name(self, top_words: List[str]) -> str:
"""Generate descriptive name from top words"""
themes = {
'LIABILITY': ['liability', 'liable', 'damages', 'loss'],
'INDEMNITY': ['indemnify', 'indemnification', 'hold', 'harmless'],
'TERMINATION': ['terminate', 'termination', 'cancel', 'expire'],
'IP': ['intellectual', 'property', 'patent', 'copyright'],
'CONFIDENTIAL': ['confidential', 'nda', 'disclosure', 'secret'],
'PAYMENT': ['payment', 'pay', 'fee', 'price'],
'COMPLIANCE': ['comply', 'compliance', 'regulation', 'law'],
'WARRANTY': ['warranty', 'warrant', 'represent', 'guarantee']
}
for theme, keywords in themes.items():
if any(keyword in term.lower() for term in top_words[:5] for keyword in keywords):
return f"COMPONENT_{theme}"
return f"COMPONENT_{top_words[0].upper()}_{top_words[1].upper()}"
class SpectralClusteringRiskDiscovery:
"""
Risk discovery using Spectral Clustering.
Uses graph theory and eigenvalues to cluster data. Excellent for non-convex clusters
that other methods miss. Based on similarity graph construction.
Advantages:
- βœ… Handles non-convex clusters (arbitrary shapes)
- βœ… Uses graph structure (captures relationships)
- βœ… Theoretically sound (spectral graph theory)
- βœ… Good for manifold-structured data
Disadvantages:
- ❌ Computationally expensive (eigenvalue decomposition)
- ❌ Memory intensive for large datasets
- ❌ Sensitive to similarity metric
- ❌ Requires number of clusters
"""
def __init__(self, n_clusters: int = 7, affinity: str = 'rbf', random_state: int = 42):
self.n_clusters = n_clusters
self.affinity = affinity # 'rbf', 'nearest_neighbors', 'precomputed'
self.random_state = random_state
# TF-IDF vectorizer
self.vectorizer = TfidfVectorizer(
max_features=6000,
ngram_range=(1, 2),
stop_words='english',
lowercase=True,
min_df=3,
max_df=0.85
)
# Import spectral clustering
from sklearn.cluster import SpectralClustering
# Spectral clustering model
self.spectral_model = SpectralClustering(
n_clusters=n_clusters,
affinity=affinity,
random_state=random_state,
n_init=10,
assign_labels='kmeans' # or 'discretize'
)
self.discovered_clusters = {}
self.cluster_labels = None
self.feature_matrix = None
def discover_risk_patterns(self, clauses: List[str]) -> Dict[str, Any]:
"""
Discover risk patterns using Spectral Clustering.
Args:
clauses: List of legal clause texts
Returns:
Dictionary with discovered clusters
"""
print(f"πŸ” Discovering risk patterns using Spectral Clustering (n_clusters={self.n_clusters})...")
# Clean clauses
cleaned_clauses = [self._clean_text(c) for c in clauses]
# Create TF-IDF matrix
print(" πŸ“Š Creating TF-IDF feature matrix...")
self.feature_matrix = self.vectorizer.fit_transform(cleaned_clauses)
feature_names = self.vectorizer.get_feature_names_out()
# Fit spectral clustering
print(f" 🧠 Fitting Spectral Clustering (affinity={self.affinity})...")
print(" (This may take a while for large datasets...)")
# For very large datasets, sample for affinity matrix
if self.feature_matrix.shape[0] > 5000:
print(f" Large dataset detected ({self.feature_matrix.shape[0]} clauses)")
print(" Using nearest neighbors affinity for efficiency...")
self.spectral_model.affinity = 'nearest_neighbors'
self.spectral_model.n_neighbors = 10
self.cluster_labels = self.spectral_model.fit_predict(self.feature_matrix)
# Analyze each cluster
print(" πŸ“ Analyzing discovered clusters...")
for cluster_id in range(self.n_clusters):
cluster_mask = self.cluster_labels == cluster_id
cluster_indices = np.where(cluster_mask)[0]
if len(cluster_indices) == 0:
continue
# Get representative clauses
cluster_clauses = [clauses[i] for i in cluster_indices]
# Extract top TF-IDF terms
cluster_tfidf = self.feature_matrix[cluster_mask].mean(axis=0)
top_term_indices = np.argsort(np.asarray(cluster_tfidf).flatten())[-15:][::-1]
top_terms = [feature_names[i] for i in top_term_indices]
top_scores = [float(cluster_tfidf[0, i]) for i in top_term_indices]
# Generate cluster name
cluster_name = self._generate_cluster_name(top_terms)
self.discovered_clusters[cluster_id] = {
'cluster_id': cluster_id,
'cluster_name': cluster_name,
'top_terms': top_terms,
'term_scores': top_scores,
'clause_count': int(len(cluster_indices)),
'proportion': float(len(cluster_indices) / len(clauses))
}
# Compute silhouette score if dataset not too large
if len(clauses) < 10000:
from sklearn.metrics import silhouette_score
silhouette = silhouette_score(self.feature_matrix, self.cluster_labels)
else:
silhouette = None
print(f"βœ… Spectral clustering complete: {len(self.discovered_clusters)} clusters found")
if silhouette:
print(f" Silhouette Score: {silhouette:.3f}")
return {
'method': 'Spectral_Clustering',
'n_clusters': self.n_clusters,
'affinity': self.affinity,
'discovered_clusters': self.discovered_clusters,
'cluster_labels': self.cluster_labels,
'quality_metrics': {
'silhouette_score': silhouette if silhouette else 'N/A',
'n_clusters_found': len(self.discovered_clusters)
}
}
def _clean_text(self, text: str) -> str:
"""Clean clause text"""
if not isinstance(text, str):
return ""
text = re.sub(r'\s+', ' ', text)
return text.strip()
def _generate_cluster_name(self, top_terms: List[str]) -> str:
"""Generate descriptive name from top terms"""
themes = {
'LIABILITY': ['liability', 'liable', 'damages', 'loss'],
'INDEMNITY': ['indemnify', 'indemnification', 'hold', 'harmless'],
'TERMINATION': ['terminate', 'termination', 'cancel', 'expire'],
'IP': ['intellectual', 'property', 'patent', 'copyright'],
'CONFIDENTIAL': ['confidential', 'nda', 'disclosure', 'secret'],
'PAYMENT': ['payment', 'pay', 'fee', 'price'],
'COMPLIANCE': ['comply', 'compliance', 'regulation', 'law'],
'WARRANTY': ['warranty', 'warrant', 'represent', 'guarantee']
}
for theme, keywords in themes.items():
if any(keyword in term.lower() for term in top_terms[:5] for keyword in keywords):
return f"SPECTRAL_{theme}"
return f"SPECTRAL_{top_terms[0].upper()}_{top_terms[1].upper()}"
class GaussianMixtureRiskDiscovery:
"""
Risk discovery using Gaussian Mixture Models (GMM).
Probabilistic model that assumes data comes from mixture of Gaussian distributions.
Provides soft clustering with probability estimates.
Advantages:
- βœ… Probabilistic (soft clustering)
- βœ… Provides uncertainty estimates
- βœ… Can model elliptical clusters
- βœ… Flexible covariance structures
- βœ… Works with EM algorithm (handles missing data)
Disadvantages:
- ❌ Assumes Gaussian distributions
- ❌ Sensitive to initialization
- ❌ Can get stuck in local optima
- ❌ Computationally intensive
"""
def __init__(self, n_components: int = 7, covariance_type: str = 'diag', random_state: int = 42):
self.n_components = n_components
self.covariance_type = covariance_type # 'full', 'tied', 'diag', 'spherical'
self.random_state = random_state
# TF-IDF vectorizer
self.vectorizer = TfidfVectorizer(
max_features=5000,
ngram_range=(1, 2),
stop_words='english',
lowercase=True,
min_df=3,
max_df=0.85
)
# Import GMM
from sklearn.mixture import GaussianMixture
# GMM model
self.gmm_model = GaussianMixture(
n_components=n_components,
covariance_type=covariance_type,
random_state=random_state,
n_init=10,
max_iter=200
)
self.discovered_components = {}
self.component_labels = None
self.feature_matrix = None
self.probabilities = None
def discover_risk_patterns(self, clauses: List[str]) -> Dict[str, Any]:
"""
Discover risk patterns using Gaussian Mixture Model.
Args:
clauses: List of legal clause texts
Returns:
Dictionary with discovered components and probabilities
"""
print(f"πŸ” Discovering risk patterns using GMM (n_components={self.n_components})...")
# Clean clauses
cleaned_clauses = [self._clean_text(c) for c in clauses]
# Create TF-IDF matrix
print(" πŸ“Š Creating TF-IDF feature matrix...")
self.feature_matrix = self.vectorizer.fit_transform(cleaned_clauses)
feature_names = self.vectorizer.get_feature_names_out()
# Reduce dimensionality for GMM (dense matrix needed)
print(" πŸ”„ Reducing dimensionality (GMM requires dense matrix)...")
from sklearn.decomposition import TruncatedSVD
svd = TruncatedSVD(n_components=min(100, self.feature_matrix.shape[1] - 1), random_state=self.random_state)
X_reduced = svd.fit_transform(self.feature_matrix)
# Fit GMM model
print(f" 🧠 Fitting Gaussian Mixture Model (covariance={self.covariance_type})...")
self.gmm_model.fit(X_reduced)
# Get predictions and probabilities
self.component_labels = self.gmm_model.predict(X_reduced)
self.probabilities = self.gmm_model.predict_proba(X_reduced)
# Analyze each component
print(" πŸ“ Analyzing discovered components...")
for component_id in range(self.n_components):
component_mask = self.component_labels == component_id
component_indices = np.where(component_mask)[0]
if len(component_indices) == 0:
continue
# Get representative clauses
component_clauses = [clauses[i] for i in component_indices]
# Extract top TF-IDF terms
component_tfidf = self.feature_matrix[component_mask].mean(axis=0)
top_term_indices = np.argsort(np.asarray(component_tfidf).flatten())[-15:][::-1]
top_terms = [feature_names[i] for i in top_term_indices]
top_scores = [float(component_tfidf[0, i]) for i in top_term_indices]
# Generate component name
component_name = self._generate_component_name(top_terms)
# Compute average probability for this component
avg_probability = np.mean(self.probabilities[component_mask, component_id])
self.discovered_components[component_id] = {
'component_id': component_id,
'component_name': component_name,
'top_terms': top_terms,
'term_scores': top_scores,
'clause_count': int(len(component_indices)),
'proportion': float(len(component_indices) / len(clauses)),
'avg_confidence': float(avg_probability)
}
# Compute BIC and AIC (model selection criteria)
bic = self.gmm_model.bic(X_reduced)
aic = self.gmm_model.aic(X_reduced)
print(f"βœ… GMM discovery complete: {len(self.discovered_components)} components found")
print(f" BIC: {bic:.2f} (lower is better)")
print(f" AIC: {aic:.2f} (lower is better)")
return {
'method': 'Gaussian_Mixture_Model',
'n_components': self.n_components,
'covariance_type': self.covariance_type,
'discovered_components': self.discovered_components,
'component_labels': self.component_labels,
'probabilities': self.probabilities,
'quality_metrics': {
'bic': float(bic),
'aic': float(aic),
'avg_confidence': float(np.mean(np.max(self.probabilities, axis=1)))
}
}
def get_clause_probabilities(self, clause_idx: int) -> Dict[int, float]:
"""Get probability distribution over components for a specific clause"""
if self.probabilities is None:
return {}
return {comp_id: float(prob) for comp_id, prob in enumerate(self.probabilities[clause_idx])}
def _clean_text(self, text: str) -> str:
"""Clean clause text"""
if not isinstance(text, str):
return ""
text = re.sub(r'\s+', ' ', text)
return text.strip()
def _generate_component_name(self, top_terms: List[str]) -> str:
"""Generate descriptive name from top terms"""
themes = {
'LIABILITY': ['liability', 'liable', 'damages', 'loss'],
'INDEMNITY': ['indemnify', 'indemnification', 'hold', 'harmless'],
'TERMINATION': ['terminate', 'termination', 'cancel', 'expire'],
'IP': ['intellectual', 'property', 'patent', 'copyright'],
'CONFIDENTIAL': ['confidential', 'nda', 'disclosure', 'secret'],
'PAYMENT': ['payment', 'pay', 'fee', 'price'],
'COMPLIANCE': ['comply', 'compliance', 'regulation', 'law'],
'WARRANTY': ['warranty', 'warrant', 'represent', 'guarantee']
}
for theme, keywords in themes.items():
if any(keyword in term.lower() for term in top_terms[:5] for keyword in keywords):
return f"GMM_{theme}"
return f"GMM_{top_terms[0].upper()}_{top_terms[1].upper()}"
class MiniBatchKMeansRiskDiscovery:
"""
Risk discovery using Mini-Batch K-Means.
Scalable version of K-Means that uses mini-batches for faster computation.
Ideal for very large datasets (100K+ clauses).
Advantages:
- βœ… Extremely fast (processes mini-batches)
- βœ… Scalable to millions of samples
- βœ… Low memory footprint
- βœ… Online learning (can update incrementally)
- βœ… Similar quality to standard K-Means
Disadvantages:
- ❌ Slightly less accurate than standard K-Means
- ❌ Results vary with batch size
- ❌ Still requires number of clusters
"""
def __init__(self, n_clusters: int = 7, batch_size: int = 1000, random_state: int = 42):
self.n_clusters = n_clusters
self.batch_size = batch_size
self.random_state = random_state
# TF-IDF vectorizer
self.vectorizer = TfidfVectorizer(
max_features=10000,
ngram_range=(1, 3),
stop_words='english',
lowercase=True,
min_df=2,
max_df=0.95
)
# Import Mini-Batch K-Means
from sklearn.cluster import MiniBatchKMeans
# Mini-Batch K-Means model
self.kmeans_model = MiniBatchKMeans(
n_clusters=n_clusters,
random_state=random_state,
batch_size=batch_size,
n_init=10,
max_iter=300,
reassignment_ratio=0.01
)
self.discovered_clusters = {}
self.cluster_labels = None
self.feature_matrix = None
def discover_risk_patterns(self, clauses: List[str]) -> Dict[str, Any]:
"""
Discover risk patterns using Mini-Batch K-Means.
Args:
clauses: List of legal clause texts
Returns:
Dictionary with discovered clusters
"""
print(f"πŸ” Discovering risk patterns using Mini-Batch K-Means (n_clusters={self.n_clusters})...")
# Clean clauses
cleaned_clauses = [self._clean_text(c) for c in clauses]
# Create TF-IDF matrix
print(" πŸ“Š Creating TF-IDF feature matrix...")
self.feature_matrix = self.vectorizer.fit_transform(cleaned_clauses)
feature_names = self.vectorizer.get_feature_names_out()
# Fit Mini-Batch K-Means
print(f" 🧠 Fitting Mini-Batch K-Means (batch_size={self.batch_size})...")
self.cluster_labels = self.kmeans_model.fit_predict(self.feature_matrix)
# Analyze each cluster
print(" πŸ“ Analyzing discovered clusters...")
for cluster_id in range(self.n_clusters):
cluster_mask = self.cluster_labels == cluster_id
cluster_indices = np.where(cluster_mask)[0]
if len(cluster_indices) == 0:
continue
# Get cluster center
cluster_center = self.kmeans_model.cluster_centers_[cluster_id]
# Get top terms from cluster center
top_term_indices = np.argsort(cluster_center)[-15:][::-1]
top_terms = [feature_names[i] for i in top_term_indices]
top_scores = [float(cluster_center[i]) for i in top_term_indices]
# Generate cluster name
cluster_name = self._generate_cluster_name(top_terms)
# Compute cluster cohesion (inertia contribution)
from scipy.spatial.distance import cdist
distances = cdist(
self.feature_matrix[cluster_mask].toarray(),
[cluster_center],
metric='euclidean'
)
avg_distance = np.mean(distances)
self.discovered_clusters[cluster_id] = {
'cluster_id': cluster_id,
'cluster_name': cluster_name,
'top_terms': top_terms,
'term_scores': top_scores,
'clause_count': int(len(cluster_indices)),
'proportion': float(len(cluster_indices) / len(clauses)),
'avg_distance_to_center': float(avg_distance)
}
# Compute inertia (total within-cluster sum of squares)
inertia = self.kmeans_model.inertia_
print(f"βœ… Mini-Batch K-Means complete: {self.n_clusters} clusters found")
print(f" Inertia: {inertia:.2f} (lower is better)")
print(f" Speed boost vs standard K-Means: ~3-5x faster")
return {
'method': 'MiniBatch_KMeans',
'n_clusters': self.n_clusters,
'batch_size': self.batch_size,
'discovered_clusters': self.discovered_clusters,
'cluster_labels': self.cluster_labels,
'quality_metrics': {
'inertia': float(inertia),
'avg_cluster_cohesion': float(np.mean([c['avg_distance_to_center'] for c in self.discovered_clusters.values()]))
}
}
def _clean_text(self, text: str) -> str:
"""Clean clause text"""
if not isinstance(text, str):
return ""
text = re.sub(r'\s+', ' ', text)
return text.strip()
def _generate_cluster_name(self, top_terms: List[str]) -> str:
"""Generate descriptive name from top terms"""
themes = {
'LIABILITY': ['liability', 'liable', 'damages', 'loss'],
'INDEMNITY': ['indemnify', 'indemnification', 'hold', 'harmless'],
'TERMINATION': ['terminate', 'termination', 'cancel', 'expire'],
'IP': ['intellectual', 'property', 'patent', 'copyright'],
'CONFIDENTIAL': ['confidential', 'nda', 'disclosure', 'secret'],
'PAYMENT': ['payment', 'pay', 'fee', 'price'],
'COMPLIANCE': ['comply', 'compliance', 'regulation', 'law'],
'WARRANTY': ['warranty', 'warrant', 'represent', 'guarantee']
}
for theme, keywords in themes.items():
if any(keyword in term.lower() for term in top_terms[:5] for keyword in keywords):
return f"MB_{theme}"
return f"MB_{top_terms[0].upper()}_{top_terms[1].upper()}"
# Utility function to compare all methods
def compare_risk_discovery_methods(clauses: List[str], n_patterns: int = 7,
include_advanced: bool = True) -> Dict[str, Any]:
"""
Compare all risk discovery methods on the same dataset.
Args:
clauses: List of legal clause texts
n_patterns: Number of risk patterns/clusters to discover
include_advanced: If True, includes advanced methods (slower but comprehensive)
Returns:
Comparison results with metrics for each method
"""
print("="*80)
print("πŸ”¬ COMPARING RISK DISCOVERY METHODS")
print(f" Methods to test: {9 if include_advanced else 4}")
print("="*80)
results = {}
# ===== BASIC METHODS (Fast) =====
# 1. K-Means (Original)
print("\n" + "="*80)
print("METHOD 1: K-Means Clustering (Original) - FAST")
print("="*80)
from risk_discovery import UnsupervisedRiskDiscovery
kmeans_discovery = UnsupervisedRiskDiscovery(n_clusters=n_patterns)
results['kmeans'] = kmeans_discovery.discover_risk_patterns(clauses)
# 2. LDA Topic Modeling
print("\n" + "="*80)
print("METHOD 2: LDA Topic Modeling - PROBABILISTIC")
print("="*80)
lda_discovery = TopicModelingRiskDiscovery(n_topics=n_patterns)
results['lda'] = lda_discovery.discover_risk_patterns(clauses)
# 3. Hierarchical Clustering
print("\n" + "="*80)
print("METHOD 3: Hierarchical Clustering - STRUCTURE")
print("="*80)
hierarchical_discovery = HierarchicalRiskDiscovery(n_clusters=n_patterns)
results['hierarchical'] = hierarchical_discovery.discover_risk_patterns(clauses)
# 4. DBSCAN
print("\n" + "="*80)
print("METHOD 4: DBSCAN (Density-Based) - OUTLIERS")
print("="*80)
dbscan_discovery = DensityBasedRiskDiscovery(eps=0.3, min_samples=5)
results['dbscan'] = dbscan_discovery.discover_risk_patterns(clauses, auto_tune=True)
if include_advanced:
# ===== ADVANCED METHODS =====
# 5. NMF (Non-negative Matrix Factorization)
print("\n" + "="*80)
print("METHOD 5: NMF (Matrix Factorization) - PARTS-BASED")
print("="*80)
nmf_discovery = NMFRiskDiscovery(n_components=n_patterns)
results['nmf'] = nmf_discovery.discover_risk_patterns(clauses)
# 6. Spectral Clustering
print("\n" + "="*80)
print("METHOD 6: Spectral Clustering - GRAPH-BASED")
print("="*80)
spectral_discovery = SpectralClusteringRiskDiscovery(n_clusters=n_patterns)
results['spectral'] = spectral_discovery.discover_risk_patterns(clauses)
# 7. Gaussian Mixture Model
print("\n" + "="*80)
print("METHOD 7: Gaussian Mixture Model - PROBABILISTIC SOFT")
print("="*80)
gmm_discovery = GaussianMixtureRiskDiscovery(n_components=n_patterns)
results['gmm'] = gmm_discovery.discover_risk_patterns(clauses)
# 8. Mini-Batch K-Means
print("\n" + "="*80)
print("METHOD 8: Mini-Batch K-Means - ULTRA FAST")
print("="*80)
minibatch_discovery = MiniBatchKMeansRiskDiscovery(n_clusters=n_patterns)
results['minibatch_kmeans'] = minibatch_discovery.discover_risk_patterns(clauses)
# 9. Risk-o-meter (Doc2Vec + SVM) - Chakrabarti et al., 2018
print("\n" + "="*80)
print("METHOD 9: Risk-o-meter (Doc2Vec + SVM) - PAPER BASELINE")
print("="*80)
print("πŸ“„ Based on: Chakrabarti et al., 2018")
print(" Achievement: 91% accuracy on termination clauses")
try:
from risk_o_meter import RiskOMeterFramework
risk_o_meter = RiskOMeterFramework(
vector_size=100,
epochs=30,
verbose=True
)
results['risk_o_meter'] = risk_o_meter.discover_risk_patterns(clauses, n_patterns)
except ImportError:
print("⚠️ Risk-o-meter requires gensim. Install with: pip install gensim>=4.3.0")
print(" Skipping Risk-o-meter comparison...")
except Exception as e:
print(f"⚠️ Risk-o-meter error: {e}")
print(" Skipping Risk-o-meter comparison...")
# Generate comparison summary
print("\n" + "="*80)
print("πŸ“Š COMPARISON SUMMARY")
print("="*80)
summary = {
'n_clauses': len(clauses),
'target_patterns': n_patterns,
'methods_compared': 9 if include_advanced else 4,
'method_results': {}
}
for method_name, method_results in results.items():
n_discovered = method_results.get('n_clusters') or method_results.get('n_topics', 0)
print(f"\n{method_name.upper()}:")
print(f" Patterns Discovered: {n_discovered}")
if 'quality_metrics' in method_results:
print(f" Quality Metrics: {method_results['quality_metrics']}")
summary['method_results'][method_name] = {
'n_patterns': n_discovered,
'method': method_results['method'],
'quality_metrics': method_results.get('quality_metrics', {})
}
print("\n" + "="*80)
print("βœ… COMPARISON COMPLETE")
print("="*80)
return {
'summary': summary,
'detailed_results': results
}