|
|
"""Unsupervised Risk Discovery System - No Hardcoded Categories! |
|
|
""" |
|
|
import re |
|
|
from typing import Dict, List, Tuple, Any |
|
|
from collections import Counter |
|
|
import numpy as np |
|
|
from sklearn.feature_extraction.text import TfidfVectorizer |
|
|
from sklearn.cluster import KMeans |
|
|
from sklearn.decomposition import LatentDirichletAllocation |
|
|
|
|
|
class UnsupervisedRiskDiscovery: |
|
|
""" |
|
|
Discovers risk patterns in legal contracts using unsupervised learning. |
|
|
NO hardcoded risk categories - learns everything from text! |
|
|
""" |
|
|
|
|
|
def __init__(self, n_clusters: int = 7, random_state: int = 42): |
|
|
self.n_clusters = n_clusters |
|
|
self.random_state = random_state |
|
|
|
|
|
|
|
|
self.tfidf_vectorizer = TfidfVectorizer( |
|
|
max_features=10000, |
|
|
ngram_range=(1, 3), |
|
|
stop_words='english', |
|
|
lowercase=True, |
|
|
min_df=2, |
|
|
max_df=0.95 |
|
|
) |
|
|
|
|
|
self.kmeans = KMeans( |
|
|
n_clusters=n_clusters, |
|
|
random_state=random_state, |
|
|
n_init=10 |
|
|
) |
|
|
|
|
|
|
|
|
self.discovered_patterns = {} |
|
|
self.risk_features = {} |
|
|
self.cluster_labels = None |
|
|
self.feature_matrix = None |
|
|
|
|
|
|
|
|
self.legal_indicators = { |
|
|
'obligation_strength': r'\b(?:shall|must|required|mandatory|obligated|bound)\b', |
|
|
'prohibition_terms': r'\b(?:shall not|must not|prohibited|forbidden|restricted)\b', |
|
|
'conditional_risk': r'\b(?:if|unless|provided|subject to|in the event|failure to)\b', |
|
|
'liability_terms': r'\b(?:liable|responsibility|damages|penalty|loss|harm)\b', |
|
|
'temporal_urgency': r'\b(?:immediately|within|before|after|deadline|expir)\b', |
|
|
'monetary_terms': r'\$|USD|dollar|payment|fee|cost|expense|fine', |
|
|
'parties': r'\b(?:Party|Parties|Company|Corporation|Licensor|Licensee|Vendor|Customer)\b', |
|
|
'dates': r'\b(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},?\s+\d{4}|\d{1,2}[/-]\d{1,2}[/-]\d{2,4}' |
|
|
} |
|
|
|
|
|
|
|
|
self.complexity_indicators = { |
|
|
'modal_verbs': r'\b(?:shall|must|may|should|will|might|could|would)\b', |
|
|
'conditional_terms': r'\b(?:if|unless|provided|subject to|in the event|notwithstanding)\b', |
|
|
'legal_conjunctions': r'\b(?:whereas|therefore|furthermore|moreover|however)\b', |
|
|
'obligation_terms': r'\b(?:agrees?|undertakes?|covenants?|warrants?|represents?)\b' |
|
|
} |
|
|
|
|
|
def clean_clause_text(self, text: str) -> str: |
|
|
"""Clean and normalize clause text""" |
|
|
if not isinstance(text, str): |
|
|
return "" |
|
|
|
|
|
|
|
|
text = re.sub(r'\s+', ' ', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'[^\w\s.,;:()"-]', ' ', text) |
|
|
|
|
|
|
|
|
text = text.strip() |
|
|
|
|
|
return text |
|
|
|
|
|
def extract_risk_features(self, clause_text: str) -> Dict[str, float]: |
|
|
""" |
|
|
Extract numerical features that indicate risk levels (domain-agnostic) |
|
|
""" |
|
|
text_lower = clause_text.lower() |
|
|
words = text_lower.split() |
|
|
|
|
|
features = {} |
|
|
|
|
|
|
|
|
features['clause_length'] = len(words) |
|
|
features['sentence_count'] = len(re.split(r'[.!?]+', clause_text)) |
|
|
features['avg_word_length'] = np.mean([len(word) for word in words]) if words else 0 |
|
|
|
|
|
|
|
|
for pattern_name, pattern in self.legal_indicators.items(): |
|
|
matches = len(re.findall(pattern, text_lower)) |
|
|
features[f'{pattern_name}_count'] = matches |
|
|
features[f'{pattern_name}_density'] = matches / len(words) if words else 0 |
|
|
|
|
|
|
|
|
for pattern_name, pattern in self.complexity_indicators.items(): |
|
|
matches = len(re.findall(pattern, text_lower)) |
|
|
features[f'{pattern_name}_complexity'] = matches / len(words) if words else 0 |
|
|
|
|
|
|
|
|
features['obligation_strength'] = ( |
|
|
features.get('obligation_strength_density', 0) * 2 + |
|
|
features.get('modal_verbs_complexity', 0) |
|
|
) |
|
|
|
|
|
features['legal_complexity'] = ( |
|
|
features.get('conditional_terms_complexity', 0) + |
|
|
features.get('legal_conjunctions_complexity', 0) + |
|
|
features.get('obligation_terms_complexity', 0) |
|
|
) |
|
|
|
|
|
features['risk_intensity'] = ( |
|
|
features.get('liability_terms_density', 0) * 2 + |
|
|
features.get('prohibition_terms_density', 0) + |
|
|
features.get('conditional_risk_density', 0) |
|
|
) |
|
|
|
|
|
return features |
|
|
|
|
|
def discover_risk_patterns(self, clause_texts: List[str]) -> Dict[str, Any]: |
|
|
""" |
|
|
Discover risk patterns using unsupervised clustering. |
|
|
Returns discovered risk types and their characteristics. |
|
|
""" |
|
|
print(f"π Discovering risk patterns from {len(clause_texts)} clauses...") |
|
|
|
|
|
|
|
|
cleaned_texts = [self.clean_clause_text(text) for text in clause_texts] |
|
|
|
|
|
|
|
|
print("π Extracting TF-IDF features...") |
|
|
self.feature_matrix = self.tfidf_vectorizer.fit_transform(cleaned_texts) |
|
|
|
|
|
|
|
|
print(f"π― Clustering into {self.n_clusters} risk patterns...") |
|
|
self.cluster_labels = self.kmeans.fit_predict(self.feature_matrix) |
|
|
|
|
|
|
|
|
print("βοΈ Extracting legal risk features...") |
|
|
risk_features_list = [self.extract_risk_features(text) for text in clause_texts] |
|
|
|
|
|
|
|
|
self.discovered_patterns = self._analyze_clusters( |
|
|
cleaned_texts, self.cluster_labels, risk_features_list |
|
|
) |
|
|
|
|
|
print("β
Risk pattern discovery complete!") |
|
|
print(f"π Discovered {len(self.discovered_patterns)} risk patterns:") |
|
|
|
|
|
for i, (pattern_name, details) in enumerate(self.discovered_patterns.items()): |
|
|
print(f" {i+1}. {pattern_name}: {details['clause_count']} clauses") |
|
|
print(f" Key terms: {', '.join(details['key_terms'][:5])}") |
|
|
print(f" Risk intensity: {details['avg_risk_intensity']:.3f}") |
|
|
|
|
|
|
|
|
from sklearn.metrics import silhouette_score |
|
|
try: |
|
|
silhouette = silhouette_score(self.feature_matrix, self.cluster_labels) |
|
|
except: |
|
|
silhouette = 0.0 |
|
|
|
|
|
|
|
|
return { |
|
|
'method': 'K-Means_Clustering', |
|
|
'n_clusters': self.n_clusters, |
|
|
'discovered_patterns': self.discovered_patterns, |
|
|
'cluster_labels': self.cluster_labels, |
|
|
'quality_metrics': { |
|
|
'silhouette_score': silhouette, |
|
|
'n_patterns': len(self.discovered_patterns) |
|
|
} |
|
|
} |
|
|
|
|
|
def _analyze_clusters(self, texts: List[str], labels: np.ndarray, |
|
|
risk_features: List[Dict]) -> Dict[str, Any]: |
|
|
"""Analyze and name discovered clusters""" |
|
|
patterns = {} |
|
|
|
|
|
|
|
|
feature_names = self.tfidf_vectorizer.get_feature_names_out() |
|
|
|
|
|
for cluster_id in range(self.n_clusters): |
|
|
|
|
|
cluster_mask = labels == cluster_id |
|
|
cluster_texts = [texts[i] for i in range(len(texts)) if cluster_mask[i]] |
|
|
cluster_features = [risk_features[i] for i in range(len(risk_features)) if cluster_mask[i]] |
|
|
|
|
|
|
|
|
cluster_center = self.kmeans.cluster_centers_[cluster_id] |
|
|
top_indices = cluster_center.argsort()[-20:][::-1] |
|
|
top_terms = [feature_names[i] for i in top_indices] |
|
|
|
|
|
|
|
|
avg_features = {} |
|
|
if cluster_features: |
|
|
for key in cluster_features[0].keys(): |
|
|
avg_features[key] = np.mean([f.get(key, 0) for f in cluster_features]) |
|
|
|
|
|
|
|
|
cluster_name = self._generate_cluster_name(top_terms, avg_features) |
|
|
|
|
|
patterns[cluster_name] = { |
|
|
'cluster_id': cluster_id, |
|
|
'clause_count': len(cluster_texts), |
|
|
'key_terms': top_terms, |
|
|
'avg_risk_intensity': avg_features.get('risk_intensity', 0), |
|
|
'avg_legal_complexity': avg_features.get('legal_complexity', 0), |
|
|
'avg_obligation_strength': avg_features.get('obligation_strength', 0), |
|
|
'sample_clauses': cluster_texts[:3], |
|
|
'risk_features': avg_features |
|
|
} |
|
|
|
|
|
return patterns |
|
|
|
|
|
def _generate_cluster_name(self, top_terms: List[str], avg_features: Dict[str, float]) -> str: |
|
|
"""Generate meaningful names for discovered clusters""" |
|
|
|
|
|
term_analysis = { |
|
|
'liability': ['liable', 'liability', 'damages', 'loss', 'harm', 'injury'], |
|
|
'obligation': ['shall', 'must', 'required', 'obligation', 'duty'], |
|
|
'indemnity': ['indemnify', 'indemnification', 'defend', 'hold harmless'], |
|
|
'termination': ['terminate', 'termination', 'end', 'expire', 'breach'], |
|
|
'intellectual_property': ['intellectual', 'property', 'patent', 'copyright', 'trademark'], |
|
|
'confidentiality': ['confidential', 'confidentiality', 'non-disclosure', 'proprietary'], |
|
|
'compliance': ['comply', 'compliance', 'regulation', 'law', 'legal'] |
|
|
} |
|
|
|
|
|
|
|
|
theme_scores = {} |
|
|
for theme, keywords in term_analysis.items(): |
|
|
score = sum(1 for term in top_terms[:10] if any(kw in term.lower() for kw in keywords)) |
|
|
theme_scores[theme] = score |
|
|
|
|
|
|
|
|
best_theme = max(theme_scores, key=theme_scores.get) if theme_scores else 'general' |
|
|
|
|
|
|
|
|
risk_intensity = avg_features.get('risk_intensity', 0) |
|
|
if risk_intensity > 0.1: |
|
|
intensity = 'high_risk' |
|
|
elif risk_intensity > 0.05: |
|
|
intensity = 'moderate_risk' |
|
|
else: |
|
|
intensity = 'low_risk' |
|
|
|
|
|
return f"{intensity}_{best_theme}_pattern" |
|
|
|
|
|
def get_risk_labels(self, clause_texts: List[str]) -> List[int]: |
|
|
"""Get risk cluster labels for new clause texts""" |
|
|
if self.cluster_labels is None: |
|
|
raise ValueError("Must discover patterns first using discover_risk_patterns()") |
|
|
|
|
|
cleaned_texts = [self.clean_clause_text(text) for text in clause_texts] |
|
|
feature_matrix = self.tfidf_vectorizer.transform(cleaned_texts) |
|
|
|
|
|
return self.kmeans.predict(feature_matrix) |
|
|
|
|
|
def get_discovered_risk_names(self) -> List[str]: |
|
|
"""Get list of discovered risk pattern names""" |
|
|
if not self.discovered_patterns: |
|
|
raise ValueError("Must discover patterns first using discover_risk_patterns()") |
|
|
|
|
|
return list(self.discovered_patterns.keys()) |
|
|
|
|
|
|
|
|
class LDARiskDiscovery: |
|
|
""" |
|
|
LDA-based risk discovery system - wrapper around TopicModelingRiskDiscovery |
|
|
Provides a compatible interface with UnsupervisedRiskDiscovery while using LDA underneath. |
|
|
|
|
|
LDA (Latent Dirichlet Allocation) is superior for legal text because: |
|
|
- Discovers overlapping risk categories (clauses can belong to multiple topics) |
|
|
- Provides probability distributions over risk types |
|
|
- Better balance across discovered patterns |
|
|
- More interpretable topic-word distributions |
|
|
""" |
|
|
|
|
|
def __init__(self, n_clusters: int = 7, doc_topic_prior: float = 0.1, |
|
|
topic_word_prior: float = 0.01, max_iter: int = 20, |
|
|
max_features: int = 5000, learning_method: str = 'batch', |
|
|
random_state: int = 42): |
|
|
""" |
|
|
Initialize LDA risk discovery system. |
|
|
|
|
|
Args: |
|
|
n_clusters: Number of risk topics to discover |
|
|
doc_topic_prior: Alpha parameter (document-topic concentration, lower = more focused) |
|
|
topic_word_prior: Beta parameter (topic-word concentration, lower = more focused) |
|
|
max_iter: Maximum iterations for LDA training |
|
|
max_features: Vocabulary size for feature extraction |
|
|
learning_method: 'batch' (more accurate) or 'online' (faster for large datasets) |
|
|
random_state: Random seed for reproducibility |
|
|
""" |
|
|
from risk_discovery_alternatives import TopicModelingRiskDiscovery |
|
|
|
|
|
self.n_clusters = n_clusters |
|
|
self.random_state = random_state |
|
|
|
|
|
|
|
|
self.lda_backend = TopicModelingRiskDiscovery( |
|
|
n_topics=n_clusters, |
|
|
random_state=random_state |
|
|
) |
|
|
|
|
|
|
|
|
self.lda_backend.lda_model.doc_topic_prior = doc_topic_prior |
|
|
self.lda_backend.lda_model.topic_word_prior = topic_word_prior |
|
|
self.lda_backend.lda_model.max_iter = max_iter |
|
|
self.lda_backend.lda_model.learning_method = learning_method |
|
|
self.lda_backend.vectorizer.max_features = max_features |
|
|
|
|
|
|
|
|
self.discovered_patterns = {} |
|
|
self.cluster_labels = None |
|
|
self.feature_matrix = None |
|
|
|
|
|
|
|
|
self.legal_indicators = { |
|
|
'obligation_strength': r'\b(?:shall|must|required|mandatory|obligated|bound)\b', |
|
|
'prohibition_terms': r'\b(?:shall not|must not|prohibited|forbidden|restricted)\b', |
|
|
'conditional_risk': r'\b(?:if|unless|provided|subject to|in the event|failure to)\b', |
|
|
'liability_terms': r'\b(?:liable|responsibility|damages|penalty|loss|harm)\b', |
|
|
'temporal_urgency': r'\b(?:immediately|within|before|after|deadline|expir)\b', |
|
|
'monetary_terms': r'\$|USD|dollar|payment|fee|cost|expense|fine', |
|
|
'parties': r'\b(?:Party|Parties|Company|Corporation|Licensor|Licensee|Vendor|Customer)\b', |
|
|
'dates': r'\b(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},?\s+\d{4}|\d{1,2}[/-]\d{1,2}[/-]\d{2,4}' |
|
|
} |
|
|
|
|
|
|
|
|
self.complexity_indicators = { |
|
|
'modal_verbs': r'\b(?:shall|must|may|should|will|might|could|would)\b', |
|
|
'conditional_terms': r'\b(?:if|unless|provided|subject to|in the event|notwithstanding)\b', |
|
|
'legal_conjunctions': r'\b(?:whereas|therefore|furthermore|moreover|however)\b', |
|
|
'obligation_terms': r'\b(?:agrees?|undertakes?|covenants?|warrants?|represents?)\b' |
|
|
} |
|
|
|
|
|
def discover_risk_patterns(self, clause_texts: List[str]) -> Dict[str, Any]: |
|
|
""" |
|
|
Discover risk patterns using LDA topic modeling. |
|
|
Compatible with UnsupervisedRiskDiscovery interface. |
|
|
|
|
|
Args: |
|
|
clause_texts: List of legal clause texts |
|
|
|
|
|
Returns: |
|
|
Dictionary with discovered patterns and quality metrics |
|
|
""" |
|
|
print(f"π Discovering risk patterns using LDA (n_topics={self.n_clusters})...") |
|
|
print(" π LDA provides balanced, overlapping risk categories") |
|
|
print(" π― Best for legal text with multi-faceted risks") |
|
|
|
|
|
|
|
|
results = self.lda_backend.discover_risk_patterns(clause_texts) |
|
|
|
|
|
|
|
|
self.discovered_patterns = results.get('discovered_topics', {}) |
|
|
self.cluster_labels = results.get('topic_labels', None) |
|
|
self.feature_matrix = self.lda_backend.feature_matrix |
|
|
|
|
|
|
|
|
for topic_name, topic_info in self.discovered_patterns.items(): |
|
|
if 'keywords' not in topic_info and 'top_words' in topic_info: |
|
|
topic_info['keywords'] = topic_info['top_words'] |
|
|
|
|
|
print(f"β
LDA discovery complete: {len(self.discovered_patterns)} risk topics found") |
|
|
|
|
|
return results |
|
|
|
|
|
def get_risk_labels(self, clause_texts: List[str]) -> List[int]: |
|
|
""" |
|
|
Get dominant topic labels for new clause texts. |
|
|
Returns the most probable topic for each clause. |
|
|
|
|
|
Args: |
|
|
clause_texts: List of legal clause texts |
|
|
|
|
|
Returns: |
|
|
List of topic IDs (0 to n_clusters-1) |
|
|
""" |
|
|
if self.cluster_labels is None: |
|
|
raise ValueError("Must discover patterns first using discover_risk_patterns()") |
|
|
|
|
|
|
|
|
cleaned_texts = [self.lda_backend._clean_text(text) for text in clause_texts] |
|
|
feature_matrix = self.lda_backend.vectorizer.transform(cleaned_texts) |
|
|
|
|
|
|
|
|
doc_topic_dist = self.lda_backend.lda_model.transform(feature_matrix) |
|
|
|
|
|
|
|
|
labels = doc_topic_dist.argmax(axis=1).tolist() |
|
|
|
|
|
return labels |
|
|
|
|
|
def get_discovered_risk_names(self) -> List[str]: |
|
|
"""Get list of discovered risk topic names""" |
|
|
if not self.discovered_patterns: |
|
|
raise ValueError("Must discover patterns first using discover_risk_patterns()") |
|
|
|
|
|
return list(self.discovered_patterns.keys()) |
|
|
|
|
|
def get_topic_distribution(self, clause_texts: List[str]) -> np.ndarray: |
|
|
""" |
|
|
Get full probability distribution over topics for clauses. |
|
|
This is unique to LDA - shows membership in ALL topics with probabilities. |
|
|
|
|
|
Args: |
|
|
clause_texts: List of legal clause texts |
|
|
|
|
|
Returns: |
|
|
Array of shape (n_clauses, n_topics) with probability distributions |
|
|
""" |
|
|
cleaned = [self.lda_backend._clean_text(c) for c in clause_texts] |
|
|
feature_matrix = self.lda_backend.vectorizer.transform(cleaned) |
|
|
return self.lda_backend.lda_model.transform(feature_matrix) |
|
|
|
|
|
def clean_clause_text(self, text: str) -> str: |
|
|
"""Clean and normalize clause text - for compatibility with trainer""" |
|
|
if not isinstance(text, str): |
|
|
return "" |
|
|
|
|
|
|
|
|
text = re.sub(r'\s+', ' ', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'[^\w\s.,;:()"-]', ' ', text) |
|
|
|
|
|
|
|
|
text = text.strip() |
|
|
|
|
|
return text |
|
|
|
|
|
def extract_risk_features(self, clause_text: str) -> Dict[str, float]: |
|
|
""" |
|
|
Extract numerical features that indicate risk levels. |
|
|
Required by trainer for generating synthetic severity/importance scores. |
|
|
""" |
|
|
text_lower = clause_text.lower() |
|
|
words = text_lower.split() |
|
|
|
|
|
features = {} |
|
|
|
|
|
|
|
|
features['clause_length'] = len(words) |
|
|
features['sentence_count'] = len(re.split(r'[.!?]+', clause_text)) |
|
|
features['avg_word_length'] = np.mean([len(word) for word in words]) if words else 0 |
|
|
|
|
|
|
|
|
for pattern_name, pattern in self.legal_indicators.items(): |
|
|
matches = len(re.findall(pattern, text_lower)) |
|
|
features[f'{pattern_name}_count'] = matches |
|
|
features[f'{pattern_name}_density'] = matches / len(words) if words else 0 |
|
|
|
|
|
|
|
|
for pattern_name, pattern in self.complexity_indicators.items(): |
|
|
matches = len(re.findall(pattern, text_lower)) |
|
|
features[f'{pattern_name}_complexity'] = matches / len(words) if words else 0 |
|
|
|
|
|
|
|
|
features['obligation_strength'] = ( |
|
|
features.get('obligation_strength_density', 0) * 2 + |
|
|
features.get('modal_verbs_complexity', 0) |
|
|
) |
|
|
|
|
|
features['legal_complexity'] = ( |
|
|
features.get('conditional_terms_complexity', 0) + |
|
|
features.get('legal_conjunctions_complexity', 0) + |
|
|
features.get('obligation_terms_complexity', 0) |
|
|
) |
|
|
|
|
|
features['risk_intensity'] = ( |
|
|
features.get('liability_terms_density', 0) * 2 + |
|
|
features.get('prohibition_terms_density', 0) + |
|
|
features.get('conditional_risk_density', 0) |
|
|
) |
|
|
|
|
|
return features |