|
|
""" |
|
|
Alternative Risk Discovery Methods for Comparison |
|
|
|
|
|
This module implements 3 alternative approaches to risk pattern discovery: |
|
|
1. Topic Modeling (LDA) - Discovers latent risk topics |
|
|
2. Hierarchical Clustering (Agglomerative) - Discovers nested risk hierarchies |
|
|
3. Density-Based Clustering (DBSCAN) - Discovers risk clusters of varying shapes |
|
|
|
|
|
Each method provides a different perspective on risk patterns in legal contracts. |
|
|
""" |
|
|
import re |
|
|
import numpy as np |
|
|
from typing import Dict, List, Tuple, Any |
|
|
from collections import Counter, defaultdict |
|
|
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer |
|
|
from sklearn.decomposition import LatentDirichletAllocation, NMF |
|
|
from sklearn.cluster import AgglomerativeClustering, DBSCAN |
|
|
from sklearn.metrics import silhouette_score |
|
|
import warnings |
|
|
|
|
|
|
|
|
class TopicModelingRiskDiscovery: |
|
|
""" |
|
|
Risk discovery using Latent Dirichlet Allocation (LDA) topic modeling. |
|
|
|
|
|
Discovers risk patterns as latent topics where each clause is a mixture of topics. |
|
|
Better for discovering overlapping risk categories and multi-faceted risks. |
|
|
|
|
|
Advantages: |
|
|
- Handles overlapping risk types naturally |
|
|
- Provides probability distribution over risk types |
|
|
- Discovers interpretable topic words |
|
|
- Works well with legal text (documents with multiple themes) |
|
|
|
|
|
Disadvantages: |
|
|
- Requires more tuning (alpha, beta parameters) |
|
|
- Slower than K-Means |
|
|
- Less clear cluster boundaries |
|
|
""" |
|
|
|
|
|
def __init__(self, n_topics: int = 7, random_state: int = 42): |
|
|
self.n_topics = n_topics |
|
|
self.random_state = random_state |
|
|
|
|
|
|
|
|
self.vectorizer = CountVectorizer( |
|
|
max_features=5000, |
|
|
ngram_range=(1, 2), |
|
|
stop_words='english', |
|
|
lowercase=True, |
|
|
min_df=3, |
|
|
max_df=0.85 |
|
|
) |
|
|
|
|
|
|
|
|
self.lda_model = LatentDirichletAllocation( |
|
|
n_components=n_topics, |
|
|
random_state=random_state, |
|
|
max_iter=20, |
|
|
learning_method='batch', |
|
|
doc_topic_prior=0.1, |
|
|
topic_word_prior=0.01, |
|
|
n_jobs=-1 |
|
|
) |
|
|
|
|
|
self.discovered_topics = {} |
|
|
self.topic_labels = None |
|
|
self.feature_matrix = None |
|
|
self.topic_word_distribution = None |
|
|
|
|
|
def discover_risk_patterns(self, clauses: List[str]) -> Dict[str, Any]: |
|
|
""" |
|
|
Discover risk patterns using LDA topic modeling. |
|
|
|
|
|
Args: |
|
|
clauses: List of legal clause texts |
|
|
|
|
|
Returns: |
|
|
Dictionary with discovered topics and assignments |
|
|
""" |
|
|
print(f"π Discovering risk topics using LDA (n_topics={self.n_topics})...") |
|
|
|
|
|
|
|
|
cleaned_clauses = [self._clean_text(c) for c in clauses] |
|
|
|
|
|
|
|
|
print(" π Creating document-term matrix...") |
|
|
self.feature_matrix = self.vectorizer.fit_transform(cleaned_clauses) |
|
|
feature_names = self.vectorizer.get_feature_names_out() |
|
|
|
|
|
|
|
|
print(" π§ Fitting LDA model...") |
|
|
self.lda_model.fit(self.feature_matrix) |
|
|
|
|
|
|
|
|
self.topic_word_distribution = self.lda_model.components_ |
|
|
|
|
|
|
|
|
doc_topic_dist = self.lda_model.transform(self.feature_matrix) |
|
|
|
|
|
|
|
|
self.topic_labels = np.argmax(doc_topic_dist, axis=1) |
|
|
|
|
|
|
|
|
print(" π Extracting topic keywords...") |
|
|
n_top_words = 15 |
|
|
for topic_idx in range(self.n_topics): |
|
|
top_word_indices = np.argsort(self.topic_word_distribution[topic_idx])[-n_top_words:][::-1] |
|
|
top_words = [feature_names[i] for i in top_word_indices] |
|
|
top_weights = [self.topic_word_distribution[topic_idx][i] for i in top_word_indices] |
|
|
|
|
|
|
|
|
topic_name = self._generate_topic_name(top_words) |
|
|
|
|
|
|
|
|
clause_count = np.sum(self.topic_labels == topic_idx) |
|
|
|
|
|
self.discovered_topics[topic_idx] = { |
|
|
'topic_id': topic_idx, |
|
|
'topic_name': topic_name, |
|
|
'top_words': top_words, |
|
|
'word_weights': top_weights, |
|
|
'clause_count': int(clause_count), |
|
|
'proportion': float(clause_count / len(clauses)) |
|
|
} |
|
|
|
|
|
|
|
|
perplexity = self.lda_model.perplexity(self.feature_matrix) |
|
|
log_likelihood = self.lda_model.score(self.feature_matrix) |
|
|
|
|
|
print(f"β
LDA discovery complete: {self.n_topics} topics found") |
|
|
print(f" Perplexity: {perplexity:.2f} (lower is better)") |
|
|
print(f" Log-likelihood: {log_likelihood:.2f}") |
|
|
|
|
|
return { |
|
|
'method': 'LDA_Topic_Modeling', |
|
|
'n_topics': self.n_topics, |
|
|
'discovered_topics': self.discovered_topics, |
|
|
'topic_labels': self.topic_labels, |
|
|
'doc_topic_distribution': doc_topic_dist, |
|
|
'perplexity': perplexity, |
|
|
'log_likelihood': log_likelihood, |
|
|
'quality_metrics': { |
|
|
'perplexity': perplexity, |
|
|
'avg_topic_diversity': self._compute_topic_diversity() |
|
|
} |
|
|
} |
|
|
|
|
|
def get_clause_topic_distribution(self, clause_idx: int) -> Dict[int, float]: |
|
|
"""Get probability distribution over topics for a specific clause""" |
|
|
if self.feature_matrix is None: |
|
|
return {} |
|
|
|
|
|
doc_topic_dist = self.lda_model.transform(self.feature_matrix) |
|
|
return {topic_id: float(prob) for topic_id, prob in enumerate(doc_topic_dist[clause_idx])} |
|
|
|
|
|
def _clean_text(self, text: str) -> str: |
|
|
"""Clean clause text""" |
|
|
if not isinstance(text, str): |
|
|
return "" |
|
|
text = re.sub(r'\s+', ' ', text) |
|
|
return text.strip() |
|
|
|
|
|
def _generate_topic_name(self, top_words: List[str]) -> str: |
|
|
"""Generate descriptive name from top words""" |
|
|
|
|
|
themes = { |
|
|
'liability': ['liability', 'liable', 'damages', 'loss', 'harm', 'injury'], |
|
|
'indemnity': ['indemnify', 'indemnification', 'hold', 'harmless', 'defend'], |
|
|
'termination': ['terminate', 'termination', 'cancel', 'end', 'expire'], |
|
|
'intellectual_property': ['intellectual', 'property', 'ip', 'patent', 'copyright', 'trademark'], |
|
|
'confidentiality': ['confidential', 'confidentiality', 'disclosure', 'nda', 'secret'], |
|
|
'payment': ['payment', 'pay', 'fee', 'price', 'cost', 'charge'], |
|
|
'compliance': ['comply', 'compliance', 'regulation', 'law', 'legal', 'regulatory'], |
|
|
'warranty': ['warranty', 'warrant', 'represent', 'guarantee', 'assure'] |
|
|
} |
|
|
|
|
|
|
|
|
theme_scores = defaultdict(int) |
|
|
for word in top_words[:10]: |
|
|
for theme, keywords in themes.items(): |
|
|
if any(keyword in word.lower() for keyword in keywords): |
|
|
theme_scores[theme] += 1 |
|
|
|
|
|
|
|
|
if theme_scores: |
|
|
best_theme = max(theme_scores.items(), key=lambda x: x[1])[0] |
|
|
return f"Topic_{best_theme.upper()}" |
|
|
else: |
|
|
return f"Topic_{top_words[0].upper()}_{top_words[1].upper()}" |
|
|
|
|
|
def _compute_topic_diversity(self) -> float: |
|
|
"""Compute average diversity of topics (entropy of word distribution)""" |
|
|
diversities = [] |
|
|
for topic_idx in range(self.n_topics): |
|
|
word_dist = self.topic_word_distribution[topic_idx] |
|
|
word_dist = word_dist / np.sum(word_dist) |
|
|
entropy = -np.sum(word_dist * np.log(word_dist + 1e-10)) |
|
|
diversities.append(entropy) |
|
|
return float(np.mean(diversities)) |
|
|
|
|
|
|
|
|
class HierarchicalRiskDiscovery: |
|
|
""" |
|
|
Risk discovery using Hierarchical Agglomerative Clustering. |
|
|
|
|
|
Discovers nested risk hierarchies where similar risks are grouped at multiple levels. |
|
|
Better for understanding relationships between risk types. |
|
|
|
|
|
Advantages: |
|
|
- Discovers hierarchical structure (parent-child risk relationships) |
|
|
- No need to specify number of clusters upfront |
|
|
- Deterministic results |
|
|
- Can cut dendrogram at different levels |
|
|
|
|
|
Disadvantages: |
|
|
- Slower for large datasets (O(nΒ²) or O(nΒ³)) |
|
|
- Memory intensive |
|
|
- Cannot handle very large datasets |
|
|
""" |
|
|
|
|
|
def __init__(self, n_clusters: int = 7, linkage: str = 'ward', random_state: int = 42): |
|
|
self.n_clusters = n_clusters |
|
|
self.linkage = linkage |
|
|
self.random_state = random_state |
|
|
|
|
|
|
|
|
self.vectorizer = TfidfVectorizer( |
|
|
max_features=8000, |
|
|
ngram_range=(1, 3), |
|
|
stop_words='english', |
|
|
lowercase=True, |
|
|
min_df=2, |
|
|
max_df=0.90 |
|
|
) |
|
|
|
|
|
|
|
|
self.clustering_model = AgglomerativeClustering( |
|
|
n_clusters=n_clusters, |
|
|
linkage=linkage |
|
|
) |
|
|
|
|
|
self.discovered_clusters = {} |
|
|
self.cluster_labels = None |
|
|
self.feature_matrix = None |
|
|
|
|
|
def discover_risk_patterns(self, clauses: List[str]) -> Dict[str, Any]: |
|
|
""" |
|
|
Discover risk patterns using hierarchical clustering. |
|
|
|
|
|
Args: |
|
|
clauses: List of legal clause texts |
|
|
|
|
|
Returns: |
|
|
Dictionary with discovered clusters and hierarchy |
|
|
""" |
|
|
print(f"π Discovering risk patterns using Hierarchical Clustering (n_clusters={self.n_clusters})...") |
|
|
|
|
|
|
|
|
cleaned_clauses = [self._clean_text(c) for c in clauses] |
|
|
|
|
|
|
|
|
print(" π Creating TF-IDF feature matrix...") |
|
|
self.feature_matrix = self.vectorizer.fit_transform(cleaned_clauses) |
|
|
feature_names = self.vectorizer.get_feature_names_out() |
|
|
|
|
|
|
|
|
print(f" π§ Fitting Hierarchical Clustering (linkage={self.linkage})...") |
|
|
self.cluster_labels = self.clustering_model.fit_predict(self.feature_matrix.toarray()) |
|
|
|
|
|
|
|
|
print(" π Analyzing discovered clusters...") |
|
|
for cluster_id in range(self.n_clusters): |
|
|
cluster_mask = self.cluster_labels == cluster_id |
|
|
cluster_indices = np.where(cluster_mask)[0] |
|
|
|
|
|
|
|
|
cluster_clauses = [clauses[i] for i in cluster_indices] |
|
|
|
|
|
|
|
|
cluster_tfidf = self.feature_matrix[cluster_mask].mean(axis=0) |
|
|
top_term_indices = np.argsort(np.asarray(cluster_tfidf).flatten())[-15:][::-1] |
|
|
top_terms = [feature_names[i] for i in top_term_indices] |
|
|
top_scores = [float(cluster_tfidf[0, i]) for i in top_term_indices] |
|
|
|
|
|
|
|
|
cluster_name = self._generate_cluster_name(top_terms) |
|
|
|
|
|
self.discovered_clusters[cluster_id] = { |
|
|
'cluster_id': cluster_id, |
|
|
'cluster_name': cluster_name, |
|
|
'top_terms': top_terms, |
|
|
'term_scores': top_scores, |
|
|
'clause_count': int(len(cluster_indices)), |
|
|
'proportion': float(len(cluster_indices) / len(clauses)), |
|
|
'sample_clauses': cluster_clauses[:3] |
|
|
} |
|
|
|
|
|
|
|
|
if len(clauses) < 10000: |
|
|
silhouette = silhouette_score(self.feature_matrix, self.cluster_labels) |
|
|
else: |
|
|
silhouette = None |
|
|
|
|
|
print(f"β
Hierarchical clustering complete: {self.n_clusters} clusters found") |
|
|
if silhouette: |
|
|
print(f" Silhouette Score: {silhouette:.3f} (range: -1 to 1, higher is better)") |
|
|
|
|
|
return { |
|
|
'method': 'Hierarchical_Agglomerative_Clustering', |
|
|
'n_clusters': self.n_clusters, |
|
|
'linkage': self.linkage, |
|
|
'discovered_clusters': self.discovered_clusters, |
|
|
'cluster_labels': self.cluster_labels, |
|
|
'quality_metrics': { |
|
|
'silhouette_score': silhouette if silhouette else 'N/A', |
|
|
'avg_cluster_size': float(np.mean([c['clause_count'] for c in self.discovered_clusters.values()])) |
|
|
} |
|
|
} |
|
|
|
|
|
def _clean_text(self, text: str) -> str: |
|
|
"""Clean clause text""" |
|
|
if not isinstance(text, str): |
|
|
return "" |
|
|
text = re.sub(r'\s+', ' ', text) |
|
|
return text.strip() |
|
|
|
|
|
def _generate_cluster_name(self, top_terms: List[str]) -> str: |
|
|
"""Generate descriptive name from top terms""" |
|
|
|
|
|
themes = { |
|
|
'LIABILITY': ['liability', 'liable', 'damages', 'loss'], |
|
|
'INDEMNITY': ['indemnify', 'indemnification', 'hold', 'harmless'], |
|
|
'TERMINATION': ['terminate', 'termination', 'cancel', 'expire'], |
|
|
'IP': ['intellectual', 'property', 'patent', 'copyright'], |
|
|
'CONFIDENTIAL': ['confidential', 'nda', 'disclosure', 'secret'], |
|
|
'PAYMENT': ['payment', 'pay', 'fee', 'price'], |
|
|
'COMPLIANCE': ['comply', 'compliance', 'regulation', 'law'], |
|
|
'WARRANTY': ['warranty', 'warrant', 'represent', 'guarantee'] |
|
|
} |
|
|
|
|
|
for theme, keywords in themes.items(): |
|
|
if any(keyword in term.lower() for term in top_terms[:5] for keyword in keywords): |
|
|
return f"RISK_{theme}" |
|
|
|
|
|
return f"RISK_{top_terms[0].upper()}_{top_terms[1].upper()}" |
|
|
|
|
|
|
|
|
class DensityBasedRiskDiscovery: |
|
|
""" |
|
|
Risk discovery using DBSCAN (Density-Based Spatial Clustering). |
|
|
|
|
|
Discovers risk clusters based on density, identifying core risks and outliers. |
|
|
Better for finding unusual/rare risk patterns and handling noise. |
|
|
|
|
|
Advantages: |
|
|
- Discovers clusters of arbitrary shapes |
|
|
- Identifies outliers/noise (rare risk patterns) |
|
|
- No need to specify number of clusters |
|
|
- Robust to outliers |
|
|
|
|
|
Disadvantages: |
|
|
- Sensitive to hyperparameters (eps, min_samples) |
|
|
- Struggles with varying density clusters |
|
|
- Can produce many small clusters |
|
|
""" |
|
|
|
|
|
def __init__(self, eps: float = 0.5, min_samples: int = 5, random_state: int = 42): |
|
|
self.eps = eps |
|
|
self.min_samples = min_samples |
|
|
self.random_state = random_state |
|
|
|
|
|
|
|
|
self.vectorizer = TfidfVectorizer( |
|
|
max_features=6000, |
|
|
ngram_range=(1, 2), |
|
|
stop_words='english', |
|
|
lowercase=True, |
|
|
min_df=3, |
|
|
max_df=0.85 |
|
|
) |
|
|
|
|
|
|
|
|
self.dbscan_model = DBSCAN( |
|
|
eps=eps, |
|
|
min_samples=min_samples, |
|
|
metric='cosine', |
|
|
n_jobs=-1 |
|
|
) |
|
|
|
|
|
self.discovered_clusters = {} |
|
|
self.cluster_labels = None |
|
|
self.feature_matrix = None |
|
|
self.outlier_indices = [] |
|
|
|
|
|
def discover_risk_patterns(self, clauses: List[str], auto_tune: bool = True) -> Dict[str, Any]: |
|
|
""" |
|
|
Discover risk patterns using DBSCAN. |
|
|
|
|
|
Args: |
|
|
clauses: List of legal clause texts |
|
|
auto_tune: If True, automatically tune eps parameter |
|
|
|
|
|
Returns: |
|
|
Dictionary with discovered clusters and outliers |
|
|
""" |
|
|
print(f"π Discovering risk patterns using DBSCAN...") |
|
|
|
|
|
|
|
|
cleaned_clauses = [self._clean_text(c) for c in clauses] |
|
|
|
|
|
|
|
|
print(" π Creating TF-IDF feature matrix...") |
|
|
self.feature_matrix = self.vectorizer.fit_transform(cleaned_clauses) |
|
|
feature_names = self.vectorizer.get_feature_names_out() |
|
|
|
|
|
|
|
|
if auto_tune: |
|
|
print(" π§ Auto-tuning eps parameter...") |
|
|
self.eps = self._auto_tune_eps(self.feature_matrix) |
|
|
self.dbscan_model.eps = self.eps |
|
|
print(f" Selected eps={self.eps:.3f}") |
|
|
|
|
|
|
|
|
print(f" π§ Fitting DBSCAN (eps={self.eps}, min_samples={self.min_samples})...") |
|
|
self.cluster_labels = self.dbscan_model.fit_predict(self.feature_matrix) |
|
|
|
|
|
|
|
|
unique_clusters = [c for c in np.unique(self.cluster_labels) if c != -1] |
|
|
n_clusters = len(unique_clusters) |
|
|
n_noise = np.sum(self.cluster_labels == -1) |
|
|
|
|
|
print(f" π Found {n_clusters} clusters and {n_noise} outliers/noise points") |
|
|
|
|
|
|
|
|
print(" π Analyzing discovered clusters...") |
|
|
for cluster_id in unique_clusters: |
|
|
cluster_mask = self.cluster_labels == cluster_id |
|
|
cluster_indices = np.where(cluster_mask)[0] |
|
|
|
|
|
|
|
|
cluster_clauses = [clauses[i] for i in cluster_indices] |
|
|
|
|
|
|
|
|
cluster_tfidf = self.feature_matrix[cluster_mask].mean(axis=0) |
|
|
top_term_indices = np.argsort(np.asarray(cluster_tfidf).flatten())[-15:][::-1] |
|
|
top_terms = [feature_names[i] for i in top_term_indices] |
|
|
top_scores = [float(cluster_tfidf[0, i]) for i in top_term_indices] |
|
|
|
|
|
|
|
|
cluster_name = self._generate_cluster_name(top_terms, cluster_id) |
|
|
|
|
|
self.discovered_clusters[cluster_id] = { |
|
|
'cluster_id': cluster_id, |
|
|
'cluster_name': cluster_name, |
|
|
'top_terms': top_terms, |
|
|
'term_scores': top_scores, |
|
|
'clause_count': int(len(cluster_indices)), |
|
|
'proportion': float(len(cluster_indices) / len(clauses)), |
|
|
'is_core_cluster': len(cluster_indices) >= self.min_samples * 3 |
|
|
} |
|
|
|
|
|
|
|
|
self.outlier_indices = np.where(self.cluster_labels == -1)[0] |
|
|
outlier_clauses = [clauses[i] for i in self.outlier_indices] |
|
|
|
|
|
print(f"β
DBSCAN discovery complete: {n_clusters} clusters, {n_noise} outliers") |
|
|
|
|
|
return { |
|
|
'method': 'DBSCAN_Density_Based_Clustering', |
|
|
'n_clusters': n_clusters, |
|
|
'n_outliers': int(n_noise), |
|
|
'eps': self.eps, |
|
|
'min_samples': self.min_samples, |
|
|
'discovered_clusters': self.discovered_clusters, |
|
|
'cluster_labels': self.cluster_labels, |
|
|
'outlier_indices': self.outlier_indices.tolist(), |
|
|
'outlier_clauses': outlier_clauses[:10], |
|
|
'quality_metrics': { |
|
|
'n_clusters': n_clusters, |
|
|
'outlier_ratio': float(n_noise / len(clauses)), |
|
|
'avg_cluster_size': float(np.mean([c['clause_count'] for c in self.discovered_clusters.values()])) if n_clusters > 0 else 0 |
|
|
} |
|
|
} |
|
|
|
|
|
def _clean_text(self, text: str) -> str: |
|
|
"""Clean clause text""" |
|
|
if not isinstance(text, str): |
|
|
return "" |
|
|
text = re.sub(r'\s+', ' ', text) |
|
|
return text.strip() |
|
|
|
|
|
def _auto_tune_eps(self, feature_matrix, sample_size: int = 1000) -> float: |
|
|
""" |
|
|
Auto-tune eps parameter using k-distance graph. |
|
|
|
|
|
Uses a sample of data to estimate optimal eps. |
|
|
""" |
|
|
from sklearn.neighbors import NearestNeighbors |
|
|
|
|
|
|
|
|
n_samples = min(sample_size, feature_matrix.shape[0]) |
|
|
if feature_matrix.shape[0] > sample_size: |
|
|
indices = np.random.choice(feature_matrix.shape[0], sample_size, replace=False) |
|
|
sample_matrix = feature_matrix[indices] |
|
|
else: |
|
|
sample_matrix = feature_matrix |
|
|
|
|
|
|
|
|
k = self.min_samples |
|
|
nbrs = NearestNeighbors(n_neighbors=k, metric='cosine').fit(sample_matrix) |
|
|
distances, _ = nbrs.kneighbors(sample_matrix) |
|
|
|
|
|
|
|
|
k_distances = np.sort(distances[:, -1]) |
|
|
|
|
|
|
|
|
|
|
|
eps = np.percentile(k_distances, 90) |
|
|
|
|
|
return float(eps) |
|
|
|
|
|
def _generate_cluster_name(self, top_terms: List[str], cluster_id: int) -> str: |
|
|
"""Generate descriptive name from top terms""" |
|
|
|
|
|
themes = { |
|
|
'LIABILITY': ['liability', 'liable', 'damages', 'loss'], |
|
|
'INDEMNITY': ['indemnify', 'indemnification', 'hold', 'harmless'], |
|
|
'TERMINATION': ['terminate', 'termination', 'cancel', 'expire'], |
|
|
'IP': ['intellectual', 'property', 'patent', 'copyright'], |
|
|
'CONFIDENTIAL': ['confidential', 'nda', 'disclosure', 'secret'], |
|
|
'PAYMENT': ['payment', 'pay', 'fee', 'price'], |
|
|
'COMPLIANCE': ['comply', 'compliance', 'regulation', 'law'], |
|
|
'WARRANTY': ['warranty', 'warrant', 'represent', 'guarantee'] |
|
|
} |
|
|
|
|
|
for theme, keywords in themes.items(): |
|
|
if any(keyword in term.lower() for term in top_terms[:5] for keyword in keywords): |
|
|
return f"RISK_{theme}_C{cluster_id}" |
|
|
|
|
|
return f"RISK_CLUSTER_{cluster_id}_{top_terms[0].upper()}" |
|
|
|
|
|
def get_outlier_analysis(self) -> Dict[str, Any]: |
|
|
""" |
|
|
Analyze outlier/noise points to identify rare risk patterns. |
|
|
|
|
|
Returns: |
|
|
Dictionary with outlier analysis |
|
|
""" |
|
|
if len(self.outlier_indices) == 0: |
|
|
return {'message': 'No outliers found'} |
|
|
|
|
|
return { |
|
|
'n_outliers': len(self.outlier_indices), |
|
|
'outlier_ratio': len(self.outlier_indices) / len(self.cluster_labels), |
|
|
'interpretation': 'Outliers may represent rare or unique risk patterns that do not fit common categories' |
|
|
} |
|
|
|
|
|
|
|
|
class NMFRiskDiscovery: |
|
|
""" |
|
|
Risk discovery using Non-negative Matrix Factorization (NMF). |
|
|
|
|
|
NMF decomposes the document-term matrix into interpretable parts-based representations. |
|
|
Different from clustering - learns additive combinations of basis patterns. |
|
|
|
|
|
Advantages: |
|
|
- β
Parts-based decomposition (additive patterns) |
|
|
- β
Highly interpretable results |
|
|
- β
Non-negative weights (intuitive) |
|
|
- β
Fast convergence |
|
|
- β
Works well with TF-IDF |
|
|
|
|
|
Disadvantages: |
|
|
- β Requires non-negative features |
|
|
- β Sensitive to initialization |
|
|
- β May not capture global structure |
|
|
""" |
|
|
|
|
|
def __init__(self, n_components: int = 7, random_state: int = 42): |
|
|
self.n_components = n_components |
|
|
self.random_state = random_state |
|
|
|
|
|
|
|
|
self.vectorizer = TfidfVectorizer( |
|
|
max_features=8000, |
|
|
ngram_range=(1, 2), |
|
|
stop_words='english', |
|
|
lowercase=True, |
|
|
min_df=3, |
|
|
max_df=0.85, |
|
|
norm='l2' |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import sklearn |
|
|
sklearn_version = tuple(map(int, sklearn.__version__.split('.')[:2])) |
|
|
|
|
|
nmf_params = { |
|
|
'n_components': n_components, |
|
|
'random_state': random_state, |
|
|
'init': 'nndsvda', |
|
|
'max_iter': 500 |
|
|
} |
|
|
|
|
|
|
|
|
if sklearn_version >= (1, 0): |
|
|
|
|
|
nmf_params['alpha_W'] = 0.1 |
|
|
nmf_params['alpha_H'] = 0.1 |
|
|
nmf_params['l1_ratio'] = 0.5 |
|
|
elif sklearn_version >= (0, 19): |
|
|
|
|
|
nmf_params['alpha'] = 0.1 |
|
|
nmf_params['l1_ratio'] = 0.5 |
|
|
|
|
|
|
|
|
self.nmf_model = NMF(**nmf_params) |
|
|
|
|
|
self.discovered_components = {} |
|
|
self.component_labels = None |
|
|
self.feature_matrix = None |
|
|
self.W_matrix = None |
|
|
self.H_matrix = None |
|
|
|
|
|
def discover_risk_patterns(self, clauses: List[str]) -> Dict[str, Any]: |
|
|
""" |
|
|
Discover risk patterns using NMF decomposition. |
|
|
|
|
|
Args: |
|
|
clauses: List of legal clause texts |
|
|
|
|
|
Returns: |
|
|
Dictionary with discovered components and assignments |
|
|
""" |
|
|
print(f"π Discovering risk patterns using NMF (n_components={self.n_components})...") |
|
|
|
|
|
|
|
|
cleaned_clauses = [self._clean_text(c) for c in clauses] |
|
|
|
|
|
|
|
|
print(" π Creating TF-IDF feature matrix...") |
|
|
self.feature_matrix = self.vectorizer.fit_transform(cleaned_clauses) |
|
|
feature_names = self.vectorizer.get_feature_names_out() |
|
|
|
|
|
|
|
|
print(" π§ Fitting NMF model...") |
|
|
self.W_matrix = self.nmf_model.fit_transform(self.feature_matrix) |
|
|
self.H_matrix = self.nmf_model.components_ |
|
|
|
|
|
|
|
|
self.component_labels = np.argmax(self.W_matrix, axis=1) |
|
|
|
|
|
|
|
|
print(" π Extracting component keywords...") |
|
|
n_top_words = 15 |
|
|
for component_idx in range(self.n_components): |
|
|
top_word_indices = np.argsort(self.H_matrix[component_idx])[-n_top_words:][::-1] |
|
|
top_words = [feature_names[i] for i in top_word_indices] |
|
|
top_weights = [self.H_matrix[component_idx][i] for i in top_word_indices] |
|
|
|
|
|
|
|
|
component_name = self._generate_component_name(top_words) |
|
|
|
|
|
|
|
|
clause_count = np.sum(self.component_labels == component_idx) |
|
|
|
|
|
|
|
|
avg_weight = np.mean(self.W_matrix[:, component_idx]) |
|
|
|
|
|
self.discovered_components[component_idx] = { |
|
|
'component_id': component_idx, |
|
|
'component_name': component_name, |
|
|
'top_words': top_words, |
|
|
'word_weights': top_weights, |
|
|
'clause_count': int(clause_count), |
|
|
'proportion': float(clause_count / len(clauses)), |
|
|
'avg_strength': float(avg_weight) |
|
|
} |
|
|
|
|
|
|
|
|
reconstruction_error = self.nmf_model.reconstruction_err_ |
|
|
|
|
|
|
|
|
sparsity = np.mean(self.W_matrix == 0) |
|
|
|
|
|
print(f"β
NMF discovery complete: {self.n_components} components found") |
|
|
print(f" Reconstruction error: {reconstruction_error:.2f}") |
|
|
print(f" Sparsity: {sparsity:.2%}") |
|
|
|
|
|
return { |
|
|
'method': 'NMF_Matrix_Factorization', |
|
|
'n_components': self.n_components, |
|
|
'discovered_components': self.discovered_components, |
|
|
'component_labels': self.component_labels, |
|
|
'component_strengths': self.W_matrix, |
|
|
'quality_metrics': { |
|
|
'reconstruction_error': float(reconstruction_error), |
|
|
'sparsity': float(sparsity), |
|
|
'avg_component_strength': float(np.mean(np.max(self.W_matrix, axis=1))) |
|
|
} |
|
|
} |
|
|
|
|
|
def get_clause_composition(self, clause_idx: int) -> Dict[int, float]: |
|
|
"""Get component composition for a specific clause""" |
|
|
if self.W_matrix is None: |
|
|
return {} |
|
|
|
|
|
return {comp_id: float(weight) for comp_id, weight in enumerate(self.W_matrix[clause_idx])} |
|
|
|
|
|
def _clean_text(self, text: str) -> str: |
|
|
"""Clean clause text""" |
|
|
if not isinstance(text, str): |
|
|
return "" |
|
|
text = re.sub(r'\s+', ' ', text) |
|
|
return text.strip() |
|
|
|
|
|
def _generate_component_name(self, top_words: List[str]) -> str: |
|
|
"""Generate descriptive name from top words""" |
|
|
themes = { |
|
|
'LIABILITY': ['liability', 'liable', 'damages', 'loss'], |
|
|
'INDEMNITY': ['indemnify', 'indemnification', 'hold', 'harmless'], |
|
|
'TERMINATION': ['terminate', 'termination', 'cancel', 'expire'], |
|
|
'IP': ['intellectual', 'property', 'patent', 'copyright'], |
|
|
'CONFIDENTIAL': ['confidential', 'nda', 'disclosure', 'secret'], |
|
|
'PAYMENT': ['payment', 'pay', 'fee', 'price'], |
|
|
'COMPLIANCE': ['comply', 'compliance', 'regulation', 'law'], |
|
|
'WARRANTY': ['warranty', 'warrant', 'represent', 'guarantee'] |
|
|
} |
|
|
|
|
|
for theme, keywords in themes.items(): |
|
|
if any(keyword in term.lower() for term in top_words[:5] for keyword in keywords): |
|
|
return f"COMPONENT_{theme}" |
|
|
|
|
|
return f"COMPONENT_{top_words[0].upper()}_{top_words[1].upper()}" |
|
|
|
|
|
|
|
|
class SpectralClusteringRiskDiscovery: |
|
|
""" |
|
|
Risk discovery using Spectral Clustering. |
|
|
|
|
|
Uses graph theory and eigenvalues to cluster data. Excellent for non-convex clusters |
|
|
that other methods miss. Based on similarity graph construction. |
|
|
|
|
|
Advantages: |
|
|
- β
Handles non-convex clusters (arbitrary shapes) |
|
|
- β
Uses graph structure (captures relationships) |
|
|
- β
Theoretically sound (spectral graph theory) |
|
|
- β
Good for manifold-structured data |
|
|
|
|
|
Disadvantages: |
|
|
- β Computationally expensive (eigenvalue decomposition) |
|
|
- β Memory intensive for large datasets |
|
|
- β Sensitive to similarity metric |
|
|
- β Requires number of clusters |
|
|
""" |
|
|
|
|
|
def __init__(self, n_clusters: int = 7, affinity: str = 'rbf', random_state: int = 42): |
|
|
self.n_clusters = n_clusters |
|
|
self.affinity = affinity |
|
|
self.random_state = random_state |
|
|
|
|
|
|
|
|
self.vectorizer = TfidfVectorizer( |
|
|
max_features=6000, |
|
|
ngram_range=(1, 2), |
|
|
stop_words='english', |
|
|
lowercase=True, |
|
|
min_df=3, |
|
|
max_df=0.85 |
|
|
) |
|
|
|
|
|
|
|
|
from sklearn.cluster import SpectralClustering |
|
|
|
|
|
|
|
|
self.spectral_model = SpectralClustering( |
|
|
n_clusters=n_clusters, |
|
|
affinity=affinity, |
|
|
random_state=random_state, |
|
|
n_init=10, |
|
|
assign_labels='kmeans' |
|
|
) |
|
|
|
|
|
self.discovered_clusters = {} |
|
|
self.cluster_labels = None |
|
|
self.feature_matrix = None |
|
|
|
|
|
def discover_risk_patterns(self, clauses: List[str]) -> Dict[str, Any]: |
|
|
""" |
|
|
Discover risk patterns using Spectral Clustering. |
|
|
|
|
|
Args: |
|
|
clauses: List of legal clause texts |
|
|
|
|
|
Returns: |
|
|
Dictionary with discovered clusters |
|
|
""" |
|
|
print(f"π Discovering risk patterns using Spectral Clustering (n_clusters={self.n_clusters})...") |
|
|
|
|
|
|
|
|
cleaned_clauses = [self._clean_text(c) for c in clauses] |
|
|
|
|
|
|
|
|
print(" π Creating TF-IDF feature matrix...") |
|
|
self.feature_matrix = self.vectorizer.fit_transform(cleaned_clauses) |
|
|
feature_names = self.vectorizer.get_feature_names_out() |
|
|
|
|
|
|
|
|
print(f" π§ Fitting Spectral Clustering (affinity={self.affinity})...") |
|
|
print(" (This may take a while for large datasets...)") |
|
|
|
|
|
|
|
|
if self.feature_matrix.shape[0] > 5000: |
|
|
print(f" Large dataset detected ({self.feature_matrix.shape[0]} clauses)") |
|
|
print(" Using nearest neighbors affinity for efficiency...") |
|
|
self.spectral_model.affinity = 'nearest_neighbors' |
|
|
self.spectral_model.n_neighbors = 10 |
|
|
|
|
|
self.cluster_labels = self.spectral_model.fit_predict(self.feature_matrix) |
|
|
|
|
|
|
|
|
print(" π Analyzing discovered clusters...") |
|
|
for cluster_id in range(self.n_clusters): |
|
|
cluster_mask = self.cluster_labels == cluster_id |
|
|
cluster_indices = np.where(cluster_mask)[0] |
|
|
|
|
|
if len(cluster_indices) == 0: |
|
|
continue |
|
|
|
|
|
|
|
|
cluster_clauses = [clauses[i] for i in cluster_indices] |
|
|
|
|
|
|
|
|
cluster_tfidf = self.feature_matrix[cluster_mask].mean(axis=0) |
|
|
top_term_indices = np.argsort(np.asarray(cluster_tfidf).flatten())[-15:][::-1] |
|
|
top_terms = [feature_names[i] for i in top_term_indices] |
|
|
top_scores = [float(cluster_tfidf[0, i]) for i in top_term_indices] |
|
|
|
|
|
|
|
|
cluster_name = self._generate_cluster_name(top_terms) |
|
|
|
|
|
self.discovered_clusters[cluster_id] = { |
|
|
'cluster_id': cluster_id, |
|
|
'cluster_name': cluster_name, |
|
|
'top_terms': top_terms, |
|
|
'term_scores': top_scores, |
|
|
'clause_count': int(len(cluster_indices)), |
|
|
'proportion': float(len(cluster_indices) / len(clauses)) |
|
|
} |
|
|
|
|
|
|
|
|
if len(clauses) < 10000: |
|
|
from sklearn.metrics import silhouette_score |
|
|
silhouette = silhouette_score(self.feature_matrix, self.cluster_labels) |
|
|
else: |
|
|
silhouette = None |
|
|
|
|
|
print(f"β
Spectral clustering complete: {len(self.discovered_clusters)} clusters found") |
|
|
if silhouette: |
|
|
print(f" Silhouette Score: {silhouette:.3f}") |
|
|
|
|
|
return { |
|
|
'method': 'Spectral_Clustering', |
|
|
'n_clusters': self.n_clusters, |
|
|
'affinity': self.affinity, |
|
|
'discovered_clusters': self.discovered_clusters, |
|
|
'cluster_labels': self.cluster_labels, |
|
|
'quality_metrics': { |
|
|
'silhouette_score': silhouette if silhouette else 'N/A', |
|
|
'n_clusters_found': len(self.discovered_clusters) |
|
|
} |
|
|
} |
|
|
|
|
|
def _clean_text(self, text: str) -> str: |
|
|
"""Clean clause text""" |
|
|
if not isinstance(text, str): |
|
|
return "" |
|
|
text = re.sub(r'\s+', ' ', text) |
|
|
return text.strip() |
|
|
|
|
|
def _generate_cluster_name(self, top_terms: List[str]) -> str: |
|
|
"""Generate descriptive name from top terms""" |
|
|
themes = { |
|
|
'LIABILITY': ['liability', 'liable', 'damages', 'loss'], |
|
|
'INDEMNITY': ['indemnify', 'indemnification', 'hold', 'harmless'], |
|
|
'TERMINATION': ['terminate', 'termination', 'cancel', 'expire'], |
|
|
'IP': ['intellectual', 'property', 'patent', 'copyright'], |
|
|
'CONFIDENTIAL': ['confidential', 'nda', 'disclosure', 'secret'], |
|
|
'PAYMENT': ['payment', 'pay', 'fee', 'price'], |
|
|
'COMPLIANCE': ['comply', 'compliance', 'regulation', 'law'], |
|
|
'WARRANTY': ['warranty', 'warrant', 'represent', 'guarantee'] |
|
|
} |
|
|
|
|
|
for theme, keywords in themes.items(): |
|
|
if any(keyword in term.lower() for term in top_terms[:5] for keyword in keywords): |
|
|
return f"SPECTRAL_{theme}" |
|
|
|
|
|
return f"SPECTRAL_{top_terms[0].upper()}_{top_terms[1].upper()}" |
|
|
|
|
|
|
|
|
class GaussianMixtureRiskDiscovery: |
|
|
""" |
|
|
Risk discovery using Gaussian Mixture Models (GMM). |
|
|
|
|
|
Probabilistic model that assumes data comes from mixture of Gaussian distributions. |
|
|
Provides soft clustering with probability estimates. |
|
|
|
|
|
Advantages: |
|
|
- β
Probabilistic (soft clustering) |
|
|
- β
Provides uncertainty estimates |
|
|
- β
Can model elliptical clusters |
|
|
- β
Flexible covariance structures |
|
|
- β
Works with EM algorithm (handles missing data) |
|
|
|
|
|
Disadvantages: |
|
|
- β Assumes Gaussian distributions |
|
|
- β Sensitive to initialization |
|
|
- β Can get stuck in local optima |
|
|
- β Computationally intensive |
|
|
""" |
|
|
|
|
|
def __init__(self, n_components: int = 7, covariance_type: str = 'diag', random_state: int = 42): |
|
|
self.n_components = n_components |
|
|
self.covariance_type = covariance_type |
|
|
self.random_state = random_state |
|
|
|
|
|
|
|
|
self.vectorizer = TfidfVectorizer( |
|
|
max_features=5000, |
|
|
ngram_range=(1, 2), |
|
|
stop_words='english', |
|
|
lowercase=True, |
|
|
min_df=3, |
|
|
max_df=0.85 |
|
|
) |
|
|
|
|
|
|
|
|
from sklearn.mixture import GaussianMixture |
|
|
|
|
|
|
|
|
self.gmm_model = GaussianMixture( |
|
|
n_components=n_components, |
|
|
covariance_type=covariance_type, |
|
|
random_state=random_state, |
|
|
n_init=10, |
|
|
max_iter=200 |
|
|
) |
|
|
|
|
|
self.discovered_components = {} |
|
|
self.component_labels = None |
|
|
self.feature_matrix = None |
|
|
self.probabilities = None |
|
|
|
|
|
def discover_risk_patterns(self, clauses: List[str]) -> Dict[str, Any]: |
|
|
""" |
|
|
Discover risk patterns using Gaussian Mixture Model. |
|
|
|
|
|
Args: |
|
|
clauses: List of legal clause texts |
|
|
|
|
|
Returns: |
|
|
Dictionary with discovered components and probabilities |
|
|
""" |
|
|
print(f"π Discovering risk patterns using GMM (n_components={self.n_components})...") |
|
|
|
|
|
|
|
|
cleaned_clauses = [self._clean_text(c) for c in clauses] |
|
|
|
|
|
|
|
|
print(" π Creating TF-IDF feature matrix...") |
|
|
self.feature_matrix = self.vectorizer.fit_transform(cleaned_clauses) |
|
|
feature_names = self.vectorizer.get_feature_names_out() |
|
|
|
|
|
|
|
|
print(" π Reducing dimensionality (GMM requires dense matrix)...") |
|
|
from sklearn.decomposition import TruncatedSVD |
|
|
svd = TruncatedSVD(n_components=min(100, self.feature_matrix.shape[1] - 1), random_state=self.random_state) |
|
|
X_reduced = svd.fit_transform(self.feature_matrix) |
|
|
|
|
|
|
|
|
print(f" π§ Fitting Gaussian Mixture Model (covariance={self.covariance_type})...") |
|
|
self.gmm_model.fit(X_reduced) |
|
|
|
|
|
|
|
|
self.component_labels = self.gmm_model.predict(X_reduced) |
|
|
self.probabilities = self.gmm_model.predict_proba(X_reduced) |
|
|
|
|
|
|
|
|
print(" π Analyzing discovered components...") |
|
|
for component_id in range(self.n_components): |
|
|
component_mask = self.component_labels == component_id |
|
|
component_indices = np.where(component_mask)[0] |
|
|
|
|
|
if len(component_indices) == 0: |
|
|
continue |
|
|
|
|
|
|
|
|
component_clauses = [clauses[i] for i in component_indices] |
|
|
|
|
|
|
|
|
component_tfidf = self.feature_matrix[component_mask].mean(axis=0) |
|
|
top_term_indices = np.argsort(np.asarray(component_tfidf).flatten())[-15:][::-1] |
|
|
top_terms = [feature_names[i] for i in top_term_indices] |
|
|
top_scores = [float(component_tfidf[0, i]) for i in top_term_indices] |
|
|
|
|
|
|
|
|
component_name = self._generate_component_name(top_terms) |
|
|
|
|
|
|
|
|
avg_probability = np.mean(self.probabilities[component_mask, component_id]) |
|
|
|
|
|
self.discovered_components[component_id] = { |
|
|
'component_id': component_id, |
|
|
'component_name': component_name, |
|
|
'top_terms': top_terms, |
|
|
'term_scores': top_scores, |
|
|
'clause_count': int(len(component_indices)), |
|
|
'proportion': float(len(component_indices) / len(clauses)), |
|
|
'avg_confidence': float(avg_probability) |
|
|
} |
|
|
|
|
|
|
|
|
bic = self.gmm_model.bic(X_reduced) |
|
|
aic = self.gmm_model.aic(X_reduced) |
|
|
|
|
|
print(f"β
GMM discovery complete: {len(self.discovered_components)} components found") |
|
|
print(f" BIC: {bic:.2f} (lower is better)") |
|
|
print(f" AIC: {aic:.2f} (lower is better)") |
|
|
|
|
|
return { |
|
|
'method': 'Gaussian_Mixture_Model', |
|
|
'n_components': self.n_components, |
|
|
'covariance_type': self.covariance_type, |
|
|
'discovered_components': self.discovered_components, |
|
|
'component_labels': self.component_labels, |
|
|
'probabilities': self.probabilities, |
|
|
'quality_metrics': { |
|
|
'bic': float(bic), |
|
|
'aic': float(aic), |
|
|
'avg_confidence': float(np.mean(np.max(self.probabilities, axis=1))) |
|
|
} |
|
|
} |
|
|
|
|
|
def get_clause_probabilities(self, clause_idx: int) -> Dict[int, float]: |
|
|
"""Get probability distribution over components for a specific clause""" |
|
|
if self.probabilities is None: |
|
|
return {} |
|
|
|
|
|
return {comp_id: float(prob) for comp_id, prob in enumerate(self.probabilities[clause_idx])} |
|
|
|
|
|
def _clean_text(self, text: str) -> str: |
|
|
"""Clean clause text""" |
|
|
if not isinstance(text, str): |
|
|
return "" |
|
|
text = re.sub(r'\s+', ' ', text) |
|
|
return text.strip() |
|
|
|
|
|
def _generate_component_name(self, top_terms: List[str]) -> str: |
|
|
"""Generate descriptive name from top terms""" |
|
|
themes = { |
|
|
'LIABILITY': ['liability', 'liable', 'damages', 'loss'], |
|
|
'INDEMNITY': ['indemnify', 'indemnification', 'hold', 'harmless'], |
|
|
'TERMINATION': ['terminate', 'termination', 'cancel', 'expire'], |
|
|
'IP': ['intellectual', 'property', 'patent', 'copyright'], |
|
|
'CONFIDENTIAL': ['confidential', 'nda', 'disclosure', 'secret'], |
|
|
'PAYMENT': ['payment', 'pay', 'fee', 'price'], |
|
|
'COMPLIANCE': ['comply', 'compliance', 'regulation', 'law'], |
|
|
'WARRANTY': ['warranty', 'warrant', 'represent', 'guarantee'] |
|
|
} |
|
|
|
|
|
for theme, keywords in themes.items(): |
|
|
if any(keyword in term.lower() for term in top_terms[:5] for keyword in keywords): |
|
|
return f"GMM_{theme}" |
|
|
|
|
|
return f"GMM_{top_terms[0].upper()}_{top_terms[1].upper()}" |
|
|
|
|
|
|
|
|
class MiniBatchKMeansRiskDiscovery: |
|
|
""" |
|
|
Risk discovery using Mini-Batch K-Means. |
|
|
|
|
|
Scalable version of K-Means that uses mini-batches for faster computation. |
|
|
Ideal for very large datasets (100K+ clauses). |
|
|
|
|
|
Advantages: |
|
|
- β
Extremely fast (processes mini-batches) |
|
|
- β
Scalable to millions of samples |
|
|
- β
Low memory footprint |
|
|
- β
Online learning (can update incrementally) |
|
|
- β
Similar quality to standard K-Means |
|
|
|
|
|
Disadvantages: |
|
|
- β Slightly less accurate than standard K-Means |
|
|
- β Results vary with batch size |
|
|
- β Still requires number of clusters |
|
|
""" |
|
|
|
|
|
def __init__(self, n_clusters: int = 7, batch_size: int = 1000, random_state: int = 42): |
|
|
self.n_clusters = n_clusters |
|
|
self.batch_size = batch_size |
|
|
self.random_state = random_state |
|
|
|
|
|
|
|
|
self.vectorizer = TfidfVectorizer( |
|
|
max_features=10000, |
|
|
ngram_range=(1, 3), |
|
|
stop_words='english', |
|
|
lowercase=True, |
|
|
min_df=2, |
|
|
max_df=0.95 |
|
|
) |
|
|
|
|
|
|
|
|
from sklearn.cluster import MiniBatchKMeans |
|
|
|
|
|
|
|
|
self.kmeans_model = MiniBatchKMeans( |
|
|
n_clusters=n_clusters, |
|
|
random_state=random_state, |
|
|
batch_size=batch_size, |
|
|
n_init=10, |
|
|
max_iter=300, |
|
|
reassignment_ratio=0.01 |
|
|
) |
|
|
|
|
|
self.discovered_clusters = {} |
|
|
self.cluster_labels = None |
|
|
self.feature_matrix = None |
|
|
|
|
|
def discover_risk_patterns(self, clauses: List[str]) -> Dict[str, Any]: |
|
|
""" |
|
|
Discover risk patterns using Mini-Batch K-Means. |
|
|
|
|
|
Args: |
|
|
clauses: List of legal clause texts |
|
|
|
|
|
Returns: |
|
|
Dictionary with discovered clusters |
|
|
""" |
|
|
print(f"π Discovering risk patterns using Mini-Batch K-Means (n_clusters={self.n_clusters})...") |
|
|
|
|
|
|
|
|
cleaned_clauses = [self._clean_text(c) for c in clauses] |
|
|
|
|
|
|
|
|
print(" π Creating TF-IDF feature matrix...") |
|
|
self.feature_matrix = self.vectorizer.fit_transform(cleaned_clauses) |
|
|
feature_names = self.vectorizer.get_feature_names_out() |
|
|
|
|
|
|
|
|
print(f" π§ Fitting Mini-Batch K-Means (batch_size={self.batch_size})...") |
|
|
self.cluster_labels = self.kmeans_model.fit_predict(self.feature_matrix) |
|
|
|
|
|
|
|
|
print(" π Analyzing discovered clusters...") |
|
|
for cluster_id in range(self.n_clusters): |
|
|
cluster_mask = self.cluster_labels == cluster_id |
|
|
cluster_indices = np.where(cluster_mask)[0] |
|
|
|
|
|
if len(cluster_indices) == 0: |
|
|
continue |
|
|
|
|
|
|
|
|
cluster_center = self.kmeans_model.cluster_centers_[cluster_id] |
|
|
|
|
|
|
|
|
top_term_indices = np.argsort(cluster_center)[-15:][::-1] |
|
|
top_terms = [feature_names[i] for i in top_term_indices] |
|
|
top_scores = [float(cluster_center[i]) for i in top_term_indices] |
|
|
|
|
|
|
|
|
cluster_name = self._generate_cluster_name(top_terms) |
|
|
|
|
|
|
|
|
from scipy.spatial.distance import cdist |
|
|
distances = cdist( |
|
|
self.feature_matrix[cluster_mask].toarray(), |
|
|
[cluster_center], |
|
|
metric='euclidean' |
|
|
) |
|
|
avg_distance = np.mean(distances) |
|
|
|
|
|
self.discovered_clusters[cluster_id] = { |
|
|
'cluster_id': cluster_id, |
|
|
'cluster_name': cluster_name, |
|
|
'top_terms': top_terms, |
|
|
'term_scores': top_scores, |
|
|
'clause_count': int(len(cluster_indices)), |
|
|
'proportion': float(len(cluster_indices) / len(clauses)), |
|
|
'avg_distance_to_center': float(avg_distance) |
|
|
} |
|
|
|
|
|
|
|
|
inertia = self.kmeans_model.inertia_ |
|
|
|
|
|
print(f"β
Mini-Batch K-Means complete: {self.n_clusters} clusters found") |
|
|
print(f" Inertia: {inertia:.2f} (lower is better)") |
|
|
print(f" Speed boost vs standard K-Means: ~3-5x faster") |
|
|
|
|
|
return { |
|
|
'method': 'MiniBatch_KMeans', |
|
|
'n_clusters': self.n_clusters, |
|
|
'batch_size': self.batch_size, |
|
|
'discovered_clusters': self.discovered_clusters, |
|
|
'cluster_labels': self.cluster_labels, |
|
|
'quality_metrics': { |
|
|
'inertia': float(inertia), |
|
|
'avg_cluster_cohesion': float(np.mean([c['avg_distance_to_center'] for c in self.discovered_clusters.values()])) |
|
|
} |
|
|
} |
|
|
|
|
|
def _clean_text(self, text: str) -> str: |
|
|
"""Clean clause text""" |
|
|
if not isinstance(text, str): |
|
|
return "" |
|
|
text = re.sub(r'\s+', ' ', text) |
|
|
return text.strip() |
|
|
|
|
|
def _generate_cluster_name(self, top_terms: List[str]) -> str: |
|
|
"""Generate descriptive name from top terms""" |
|
|
themes = { |
|
|
'LIABILITY': ['liability', 'liable', 'damages', 'loss'], |
|
|
'INDEMNITY': ['indemnify', 'indemnification', 'hold', 'harmless'], |
|
|
'TERMINATION': ['terminate', 'termination', 'cancel', 'expire'], |
|
|
'IP': ['intellectual', 'property', 'patent', 'copyright'], |
|
|
'CONFIDENTIAL': ['confidential', 'nda', 'disclosure', 'secret'], |
|
|
'PAYMENT': ['payment', 'pay', 'fee', 'price'], |
|
|
'COMPLIANCE': ['comply', 'compliance', 'regulation', 'law'], |
|
|
'WARRANTY': ['warranty', 'warrant', 'represent', 'guarantee'] |
|
|
} |
|
|
|
|
|
for theme, keywords in themes.items(): |
|
|
if any(keyword in term.lower() for term in top_terms[:5] for keyword in keywords): |
|
|
return f"MB_{theme}" |
|
|
|
|
|
return f"MB_{top_terms[0].upper()}_{top_terms[1].upper()}" |
|
|
|
|
|
|
|
|
|
|
|
def compare_risk_discovery_methods(clauses: List[str], n_patterns: int = 7, |
|
|
include_advanced: bool = True) -> Dict[str, Any]: |
|
|
""" |
|
|
Compare all risk discovery methods on the same dataset. |
|
|
|
|
|
Args: |
|
|
clauses: List of legal clause texts |
|
|
n_patterns: Number of risk patterns/clusters to discover |
|
|
include_advanced: If True, includes advanced methods (slower but comprehensive) |
|
|
|
|
|
Returns: |
|
|
Comparison results with metrics for each method |
|
|
""" |
|
|
print("="*80) |
|
|
print("π¬ COMPARING RISK DISCOVERY METHODS") |
|
|
print(f" Methods to test: {9 if include_advanced else 4}") |
|
|
print("="*80) |
|
|
|
|
|
results = {} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("\n" + "="*80) |
|
|
print("METHOD 1: K-Means Clustering (Original) - FAST") |
|
|
print("="*80) |
|
|
from risk_discovery import UnsupervisedRiskDiscovery |
|
|
kmeans_discovery = UnsupervisedRiskDiscovery(n_clusters=n_patterns) |
|
|
results['kmeans'] = kmeans_discovery.discover_risk_patterns(clauses) |
|
|
|
|
|
|
|
|
print("\n" + "="*80) |
|
|
print("METHOD 2: LDA Topic Modeling - PROBABILISTIC") |
|
|
print("="*80) |
|
|
lda_discovery = TopicModelingRiskDiscovery(n_topics=n_patterns) |
|
|
results['lda'] = lda_discovery.discover_risk_patterns(clauses) |
|
|
|
|
|
|
|
|
print("\n" + "="*80) |
|
|
print("METHOD 3: Hierarchical Clustering - STRUCTURE") |
|
|
print("="*80) |
|
|
hierarchical_discovery = HierarchicalRiskDiscovery(n_clusters=n_patterns) |
|
|
results['hierarchical'] = hierarchical_discovery.discover_risk_patterns(clauses) |
|
|
|
|
|
|
|
|
print("\n" + "="*80) |
|
|
print("METHOD 4: DBSCAN (Density-Based) - OUTLIERS") |
|
|
print("="*80) |
|
|
dbscan_discovery = DensityBasedRiskDiscovery(eps=0.3, min_samples=5) |
|
|
results['dbscan'] = dbscan_discovery.discover_risk_patterns(clauses, auto_tune=True) |
|
|
|
|
|
if include_advanced: |
|
|
|
|
|
|
|
|
|
|
|
print("\n" + "="*80) |
|
|
print("METHOD 5: NMF (Matrix Factorization) - PARTS-BASED") |
|
|
print("="*80) |
|
|
nmf_discovery = NMFRiskDiscovery(n_components=n_patterns) |
|
|
results['nmf'] = nmf_discovery.discover_risk_patterns(clauses) |
|
|
|
|
|
|
|
|
print("\n" + "="*80) |
|
|
print("METHOD 6: Spectral Clustering - GRAPH-BASED") |
|
|
print("="*80) |
|
|
spectral_discovery = SpectralClusteringRiskDiscovery(n_clusters=n_patterns) |
|
|
results['spectral'] = spectral_discovery.discover_risk_patterns(clauses) |
|
|
|
|
|
|
|
|
print("\n" + "="*80) |
|
|
print("METHOD 7: Gaussian Mixture Model - PROBABILISTIC SOFT") |
|
|
print("="*80) |
|
|
gmm_discovery = GaussianMixtureRiskDiscovery(n_components=n_patterns) |
|
|
results['gmm'] = gmm_discovery.discover_risk_patterns(clauses) |
|
|
|
|
|
|
|
|
print("\n" + "="*80) |
|
|
print("METHOD 8: Mini-Batch K-Means - ULTRA FAST") |
|
|
print("="*80) |
|
|
minibatch_discovery = MiniBatchKMeansRiskDiscovery(n_clusters=n_patterns) |
|
|
results['minibatch_kmeans'] = minibatch_discovery.discover_risk_patterns(clauses) |
|
|
|
|
|
|
|
|
print("\n" + "="*80) |
|
|
print("METHOD 9: Risk-o-meter (Doc2Vec + SVM) - PAPER BASELINE") |
|
|
print("="*80) |
|
|
print("π Based on: Chakrabarti et al., 2018") |
|
|
print(" Achievement: 91% accuracy on termination clauses") |
|
|
try: |
|
|
from risk_o_meter import RiskOMeterFramework |
|
|
risk_o_meter = RiskOMeterFramework( |
|
|
vector_size=100, |
|
|
epochs=30, |
|
|
verbose=True |
|
|
) |
|
|
results['risk_o_meter'] = risk_o_meter.discover_risk_patterns(clauses, n_patterns) |
|
|
except ImportError: |
|
|
print("β οΈ Risk-o-meter requires gensim. Install with: pip install gensim>=4.3.0") |
|
|
print(" Skipping Risk-o-meter comparison...") |
|
|
except Exception as e: |
|
|
print(f"β οΈ Risk-o-meter error: {e}") |
|
|
print(" Skipping Risk-o-meter comparison...") |
|
|
|
|
|
|
|
|
print("\n" + "="*80) |
|
|
print("π COMPARISON SUMMARY") |
|
|
print("="*80) |
|
|
|
|
|
summary = { |
|
|
'n_clauses': len(clauses), |
|
|
'target_patterns': n_patterns, |
|
|
'methods_compared': 9 if include_advanced else 4, |
|
|
'method_results': {} |
|
|
} |
|
|
|
|
|
for method_name, method_results in results.items(): |
|
|
n_discovered = method_results.get('n_clusters') or method_results.get('n_topics', 0) |
|
|
|
|
|
print(f"\n{method_name.upper()}:") |
|
|
print(f" Patterns Discovered: {n_discovered}") |
|
|
|
|
|
if 'quality_metrics' in method_results: |
|
|
print(f" Quality Metrics: {method_results['quality_metrics']}") |
|
|
|
|
|
summary['method_results'][method_name] = { |
|
|
'n_patterns': n_discovered, |
|
|
'method': method_results['method'], |
|
|
'quality_metrics': method_results.get('quality_metrics', {}) |
|
|
} |
|
|
|
|
|
print("\n" + "="*80) |
|
|
print("β
COMPARISON COMPLETE") |
|
|
print("="*80) |
|
|
|
|
|
return { |
|
|
'summary': summary, |
|
|
'detailed_results': results |
|
|
} |
|
|
|