Spaces:
Running
Running
| """ | |
| Knowledge Graph Comparator | |
| This module provides functionality to compare two knowledge graphs from the database | |
| and generate comprehensive comparison metrics including structural similarity, | |
| semantic similarity, and entity/relationship overlap analysis. | |
| """ | |
| import json | |
| import logging | |
| import numpy as np | |
| from typing import Dict, List, Any, Tuple, Set, Optional | |
| from dataclasses import dataclass | |
| import os | |
| import openai | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from scipy.optimize import linear_sum_assignment | |
| import time | |
| import hashlib | |
| import pickle | |
| # Configure OpenAI for embeddings | |
| openai.api_key = os.environ.get("OPENAI_API_KEY") | |
| class GraphComparisonMetrics: | |
| """Comprehensive metrics for comparing two knowledge graphs""" | |
| # Entity comparison metrics | |
| entity_overlap_count: int | |
| entity_unique_to_graph1: int | |
| entity_unique_to_graph2: int | |
| entity_overlap_ratio: float | |
| entity_semantic_similarity: float | |
| # Relation comparison metrics | |
| relation_overlap_count: int | |
| relation_unique_to_graph1: int | |
| relation_unique_to_graph2: int | |
| relation_overlap_ratio: float | |
| relation_semantic_similarity: float | |
| # Structural metrics | |
| graph1_density: float | |
| graph2_density: float | |
| density_difference: float | |
| common_patterns_count: int | |
| # Type distribution metrics | |
| entity_type_similarity: float | |
| relation_type_similarity: float | |
| # Overall similarity scores | |
| structural_similarity: float | |
| content_similarity: float | |
| overall_similarity: float | |
| # Additional statistics | |
| graph1_stats: Dict[str, Any] | |
| graph2_stats: Dict[str, Any] | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert metrics to dictionary for JSON serialization""" | |
| return { | |
| "entity_metrics": { | |
| "overlap_count": self.entity_overlap_count, | |
| "unique_to_graph1": self.entity_unique_to_graph1, | |
| "unique_to_graph2": self.entity_unique_to_graph2, | |
| "overlap_ratio": self.entity_overlap_ratio, | |
| "semantic_similarity": self.entity_semantic_similarity | |
| }, | |
| "relation_metrics": { | |
| "overlap_count": self.relation_overlap_count, | |
| "unique_to_graph1": self.relation_unique_to_graph1, | |
| "unique_to_graph2": self.relation_unique_to_graph2, | |
| "overlap_ratio": self.relation_overlap_ratio, | |
| "semantic_similarity": self.relation_semantic_similarity | |
| }, | |
| "structural_metrics": { | |
| "graph1_density": self.graph1_density, | |
| "graph2_density": self.graph2_density, | |
| "density_difference": self.density_difference, | |
| "common_patterns_count": self.common_patterns_count | |
| }, | |
| "type_distribution_metrics": { | |
| "entity_type_similarity": self.entity_type_similarity, | |
| "relation_type_similarity": self.relation_type_similarity | |
| }, | |
| "overall_metrics": { | |
| "structural_similarity": self.structural_similarity, | |
| "content_similarity": self.content_similarity, | |
| "overall_similarity": self.overall_similarity | |
| }, | |
| "graph_statistics": { | |
| "graph1_stats": self.graph1_stats, | |
| "graph2_stats": self.graph2_stats | |
| } | |
| } | |
| class KnowledgeGraphComparator: | |
| """Main class for comparing two knowledge graphs""" | |
| def __init__(self, similarity_threshold: float = 0.7, semantic_threshold: float = 0.75, use_cache: bool = True): | |
| """ | |
| Initialize the comparator. | |
| Args: | |
| similarity_threshold: Threshold for semantic similarity matching (0.7 = 70%) | |
| semantic_threshold: Threshold for semantic overlap detection for same-trace graphs (0.75 = 75%) | |
| Higher values = more strict/precise matching | |
| 0.9+ = Very high similarity (almost identical) | |
| 0.8-0.9 = High similarity (very likely same concept) | |
| 0.75-0.8 = Good similarity (probably same with minor variations) | |
| 0.65-0.75 = Moderate similarity (related but potentially different) | |
| 0.5-0.65 = Low similarity (loosely related) | |
| use_cache: Whether to use embedding cache (default: True) | |
| """ | |
| self.similarity_threshold = similarity_threshold | |
| self.semantic_threshold = semantic_threshold | |
| self.use_cache = use_cache | |
| # Initialize embedding cache | |
| self.embedding_cache = {} | |
| self.cache_file = "cache/embeddings_cache.pkl" | |
| if self.use_cache: | |
| self._load_embedding_cache() | |
| else: | |
| logging.info("Cache disabled for this comparison") | |
| def _load_embedding_cache(self): | |
| """Load embedding cache from file""" | |
| try: | |
| os.makedirs(os.path.dirname(self.cache_file), exist_ok=True) | |
| if os.path.exists(self.cache_file): | |
| with open(self.cache_file, 'rb') as f: | |
| self.embedding_cache = pickle.load(f) | |
| logging.info(f"Loaded {len(self.embedding_cache)} cached embeddings") | |
| else: | |
| self.embedding_cache = {} | |
| except Exception as e: | |
| logging.error(f"Error loading embedding cache: {e}") | |
| self.embedding_cache = {} | |
| def _save_embedding_cache(self): | |
| """Save embedding cache to file""" | |
| try: | |
| os.makedirs(os.path.dirname(self.cache_file), exist_ok=True) | |
| with open(self.cache_file, 'wb') as f: | |
| pickle.dump(self.embedding_cache, f) | |
| logging.debug(f"Saved {len(self.embedding_cache)} embeddings to cache") | |
| except Exception as e: | |
| logging.error(f"Error saving embedding cache: {e}") | |
| def _get_text_hash(self, text: str) -> str: | |
| """Get hash for text to use as cache key""" | |
| return hashlib.md5(text.encode('utf-8')).hexdigest() | |
| def clear_embedding_cache(self): | |
| """Clear all cached embeddings""" | |
| try: | |
| self.embedding_cache = {} | |
| if os.path.exists(self.cache_file): | |
| os.remove(self.cache_file) | |
| logging.info("Embedding cache cleared successfully") | |
| return True | |
| except Exception as e: | |
| logging.error(f"Error clearing embedding cache: {e}") | |
| return False | |
| def get_cache_info(self) -> Dict[str, Any]: | |
| """Get information about the current cache""" | |
| cache_size = len(self.embedding_cache) | |
| file_exists = os.path.exists(self.cache_file) | |
| file_size = 0 | |
| if file_exists: | |
| try: | |
| file_size = os.path.getsize(self.cache_file) | |
| except Exception: | |
| file_size = 0 | |
| return { | |
| "cache_entries": cache_size, | |
| "cache_file_exists": file_exists, | |
| "cache_file_size_bytes": file_size, | |
| "cache_file_size_mb": round(file_size / (1024 * 1024), 2) if file_size > 0 else 0 | |
| } | |
| def get_embedding(self, text: str) -> np.ndarray: | |
| """Get embedding for text using OpenAI text-embedding-3-small with optional caching""" | |
| if not text or not text.strip(): | |
| return np.zeros(1536) | |
| # Check cache first if caching is enabled | |
| text_hash = self._get_text_hash(text.strip()) | |
| if self.use_cache and text_hash in self.embedding_cache: | |
| return self.embedding_cache[text_hash] | |
| try: | |
| response = openai.embeddings.create( | |
| model="text-embedding-3-small", | |
| input=text.strip() | |
| ) | |
| embedding = np.array(response.data[0].embedding) | |
| # Cache the embedding if caching is enabled | |
| if self.use_cache: | |
| self.embedding_cache[text_hash] = embedding | |
| # Save cache periodically (every 10 new embeddings) | |
| if len(self.embedding_cache) % 10 == 0: | |
| self._save_embedding_cache() | |
| return embedding | |
| except Exception as e: | |
| logging.error(f"Error getting embedding for '{text}': {e}") | |
| # Return zero vector as fallback | |
| return np.zeros(1536) # text-embedding-3-small dimension | |
| def _get_embeddings_batch(self, texts: List[str], batch_name: str = "texts") -> List[np.ndarray]: | |
| """Get embeddings for multiple texts in batches with caching to improve performance""" | |
| embeddings = [] | |
| texts_to_fetch = [] | |
| text_to_index = {} | |
| start_time = time.time() | |
| # Check cache for existing embeddings (if caching is enabled) | |
| cache_hits = 0 | |
| for i, text in enumerate(texts): | |
| if not text or not text.strip(): | |
| embeddings.append(None) | |
| continue | |
| text_hash = self._get_text_hash(text.strip()) | |
| if self.use_cache and text_hash in self.embedding_cache: | |
| embeddings.append(self.embedding_cache[text_hash]) | |
| cache_hits += 1 | |
| else: | |
| # Mark for fetching | |
| embeddings.append(None) # Placeholder | |
| texts_to_fetch.append(text.strip()) | |
| text_to_index[text.strip()] = i | |
| cache_status = f"cache {'enabled' if self.use_cache else 'disabled'}" | |
| logging.info(f"Computing embeddings for {len(texts)} {batch_name} ({cache_status}): {cache_hits} cache hits, {len(texts_to_fetch)} API calls needed") | |
| if not texts_to_fetch: | |
| logging.info(f"All embeddings found in cache!") | |
| return embeddings | |
| # Process remaining texts in batches | |
| batch_size = 10 | |
| fetched_embeddings = {} | |
| for i in range(0, len(texts_to_fetch), batch_size): | |
| batch_start = time.time() | |
| batch = texts_to_fetch[i:i+batch_size] | |
| try: | |
| batch_num = i//batch_size + 1 | |
| total_batches = (len(texts_to_fetch) + batch_size - 1)//batch_size | |
| logging.info(f" Processing batch {batch_num}/{total_batches} ({len(batch)} texts)") | |
| api_start = time.time() | |
| response = openai.embeddings.create( | |
| model="text-embedding-3-small", | |
| input=batch | |
| ) | |
| api_time = time.time() - api_start | |
| for j, text in enumerate(batch): | |
| embedding = np.array(response.data[j].embedding) | |
| text_hash = self._get_text_hash(text) | |
| # Cache the embedding if caching is enabled | |
| if self.use_cache: | |
| self.embedding_cache[text_hash] = embedding | |
| fetched_embeddings[text] = embedding | |
| batch_time = time.time() - batch_start | |
| logging.info(f" Batch {batch_num} completed in {batch_time:.2f}s (API: {api_time:.2f}s)") | |
| except Exception as e: | |
| logging.error(f"Error getting embeddings for batch {batch_num}: {e}") | |
| # Set None for failed texts | |
| for text in batch: | |
| fetched_embeddings[text] = None | |
| # Fill in the fetched embeddings | |
| for text, embedding in fetched_embeddings.items(): | |
| if text in text_to_index: | |
| embeddings[text_to_index[text]] = embedding | |
| # Save cache after batch processing (if caching is enabled) | |
| if self.use_cache and len(texts_to_fetch) > 0: | |
| self._save_embedding_cache() | |
| total_time = time.time() - start_time | |
| successful_count = len([e for e in embeddings if e is not None]) | |
| logging.info(f"Completed embedding computation for {batch_name}: {successful_count}/{len(embeddings)} successful in {total_time:.2f}s ({cache_hits} from cache)") | |
| return embeddings | |
| def _calculate_similarity_from_embeddings(self, emb1: np.ndarray, emb2: np.ndarray) -> float: | |
| """Calculate cosine similarity from precomputed embeddings""" | |
| if emb1 is None or emb2 is None: | |
| return 0.0 | |
| try: | |
| # Reshape for sklearn | |
| emb1 = emb1.reshape(1, -1) | |
| emb2 = emb2.reshape(1, -1) | |
| similarity = cosine_similarity(emb1, emb2)[0][0] | |
| return float(similarity) | |
| except Exception as e: | |
| logging.error(f"Error calculating similarity from embeddings: {e}") | |
| return 0.0 | |
| def calculate_similarity(self, text1: str, text2: str) -> float: | |
| """Calculate cosine similarity between two texts""" | |
| emb1 = self.get_embedding(text1) | |
| emb2 = self.get_embedding(text2) | |
| # Reshape for sklearn | |
| emb1 = emb1.reshape(1, -1) | |
| emb2 = emb2.reshape(1, -1) | |
| similarity = cosine_similarity(emb1, emb2)[0][0] | |
| return float(similarity) | |
| def compare_graphs(self, graph1_data: Dict[str, Any], graph2_data: Dict[str, Any]) -> GraphComparisonMetrics: | |
| """ | |
| Compare two knowledge graphs and generate comprehensive metrics. | |
| Args: | |
| graph1_data: First knowledge graph data | |
| graph2_data: Second knowledge graph data | |
| Returns: | |
| Comprehensive comparison metrics | |
| """ | |
| start_time = time.time() | |
| logging.info(f"Starting graph comparison at {time.strftime('%H:%M:%S')}") | |
| # Extract entities and relations | |
| entities1 = graph1_data.get('entities', []) | |
| relations1 = graph1_data.get('relations', []) | |
| entities2 = graph2_data.get('entities', []) | |
| relations2 = graph2_data.get('relations', []) | |
| # Check if graphs are from the same trace | |
| trace_id1 = graph1_data.get('graph_info', {}).get('trace_id') | |
| trace_id2 = graph2_data.get('graph_info', {}).get('trace_id') | |
| kg_id1 = graph1_data.get('graph_info', {}).get('id') | |
| kg_id2 = graph2_data.get('graph_info', {}).get('id') | |
| is_same_trace = (trace_id1 and trace_id2 and trace_id1 == trace_id2 and kg_id1 != kg_id2) | |
| logging.info(f"Graph comparison debug:") | |
| logging.info(f" Graph 1 - trace_id: {trace_id1}, kg_id: {kg_id1}") | |
| logging.info(f" Graph 2 - trace_id: {trace_id2}, kg_id: {kg_id2}") | |
| logging.info(f" Same trace detected: {is_same_trace}") | |
| logging.info(f" Will use {'SEMANTIC' if is_same_trace else 'EXACT'} comparison") | |
| # Calculate entity metrics (use semantic comparison for same-trace graphs) | |
| entity_start = time.time() | |
| if is_same_trace: | |
| logging.info("Using semantic entity comparison...") | |
| entity_metrics = self._compare_entities_semantic(entities1, entities2) | |
| else: | |
| logging.info("Using exact entity comparison...") | |
| entity_metrics = self._compare_entities(entities1, entities2) | |
| entity_time = time.time() - entity_start | |
| # Calculate relation metrics (use semantic comparison for same-trace graphs) | |
| relation_start = time.time() | |
| if is_same_trace: | |
| logging.info("Using semantic relation comparison...") | |
| relation_metrics = self._compare_relations_semantic(relations1, relations2) | |
| else: | |
| logging.info("Using exact relation comparison...") | |
| relation_metrics = self._compare_relations(relations1, relations2) | |
| relation_time = time.time() - relation_start | |
| logging.info(f"Entity comparison results: overlap={entity_metrics['overlap_count']}, unique1={entity_metrics['unique_to_graph1']}, unique2={entity_metrics['unique_to_graph2']}") | |
| logging.info(f"Relation comparison results: overlap={relation_metrics['overlap_count']}, unique1={relation_metrics['unique_to_graph1']}, unique2={relation_metrics['unique_to_graph2']}") | |
| # Calculate structural metrics | |
| structural_start = time.time() | |
| structural_metrics = self._calculate_structural_metrics(entities1, relations1, entities2, relations2) | |
| structural_time = time.time() - structural_start | |
| # Calculate type distribution metrics | |
| type_start = time.time() | |
| type_metrics = self._calculate_type_distribution_metrics(entities1, relations1, entities2, relations2) | |
| type_time = time.time() - type_start | |
| # Calculate overall similarity scores | |
| overall_start = time.time() | |
| overall_metrics = self._calculate_overall_similarity( | |
| entity_metrics, relation_metrics, structural_metrics, type_metrics | |
| ) | |
| overall_time = time.time() - overall_start | |
| # Generate graph statistics | |
| stats_start = time.time() | |
| graph1_stats = self._generate_graph_stats(entities1, relations1, "Graph 1") | |
| graph2_stats = self._generate_graph_stats(entities2, relations2, "Graph 2") | |
| stats_time = time.time() - stats_start | |
| total_time = time.time() - start_time | |
| logging.info(f"Graph comparison timing breakdown:") | |
| logging.info(f" Entity comparison: {entity_time:.2f}s ({entity_time/total_time*100:.1f}%)") | |
| logging.info(f" Relation comparison: {relation_time:.2f}s ({relation_time/total_time*100:.1f}%)") | |
| logging.info(f" Structural metrics: {structural_time:.2f}s ({structural_time/total_time*100:.1f}%)") | |
| logging.info(f" Type distribution: {type_time:.2f}s ({type_time/total_time*100:.1f}%)") | |
| logging.info(f" Overall metrics: {overall_time:.2f}s ({overall_time/total_time*100:.1f}%)") | |
| logging.info(f" Graph statistics: {stats_time:.2f}s ({stats_time/total_time*100:.1f}%)") | |
| logging.info(f" TOTAL TIME: {total_time:.2f}s") | |
| return GraphComparisonMetrics( | |
| # Entity metrics | |
| entity_overlap_count=entity_metrics['overlap_count'], | |
| entity_unique_to_graph1=entity_metrics['unique_to_graph1'], | |
| entity_unique_to_graph2=entity_metrics['unique_to_graph2'], | |
| entity_overlap_ratio=entity_metrics['overlap_ratio'], | |
| entity_semantic_similarity=entity_metrics['semantic_similarity'], | |
| # Relation metrics | |
| relation_overlap_count=relation_metrics['overlap_count'], | |
| relation_unique_to_graph1=relation_metrics['unique_to_graph1'], | |
| relation_unique_to_graph2=relation_metrics['unique_to_graph2'], | |
| relation_overlap_ratio=relation_metrics['overlap_ratio'], | |
| relation_semantic_similarity=relation_metrics['semantic_similarity'], | |
| # Structural metrics | |
| graph1_density=structural_metrics['graph1_density'], | |
| graph2_density=structural_metrics['graph2_density'], | |
| density_difference=structural_metrics['density_difference'], | |
| common_patterns_count=structural_metrics['common_patterns_count'], | |
| # Type distribution metrics | |
| entity_type_similarity=type_metrics['entity_type_similarity'], | |
| relation_type_similarity=type_metrics['relation_type_similarity'], | |
| # Overall similarity scores | |
| structural_similarity=overall_metrics['structural_similarity'], | |
| content_similarity=overall_metrics['content_similarity'], | |
| overall_similarity=overall_metrics['overall_similarity'], | |
| # Additional statistics | |
| graph1_stats=graph1_stats, | |
| graph2_stats=graph2_stats | |
| ) | |
| def _compare_entities(self, entities1: List[Dict], entities2: List[Dict]) -> Dict[str, Any]: | |
| """Compare entities between two graphs""" | |
| # Create entity signatures for comparison | |
| def create_entity_signature(entity): | |
| return f"{entity.get('type', '')} {entity.get('name', '')}".strip().lower() | |
| # Get entity sets | |
| sig1_set = {create_entity_signature(e) for e in entities1} | |
| sig2_set = {create_entity_signature(e) for e in entities2} | |
| # Calculate overlap | |
| overlap = sig1_set & sig2_set | |
| unique_to_1 = sig1_set - sig2_set | |
| unique_to_2 = sig2_set - sig1_set | |
| # Calculate overlap ratio | |
| total_unique = len(sig1_set | sig2_set) | |
| overlap_ratio = len(overlap) / total_unique if total_unique > 0 else 0.0 | |
| # Calculate semantic similarity using embeddings | |
| semantic_similarity = self._calculate_entity_semantic_similarity(entities1, entities2) | |
| return { | |
| 'overlap_count': len(overlap), | |
| 'unique_to_graph1': len(unique_to_1), | |
| 'unique_to_graph2': len(unique_to_2), | |
| 'overlap_ratio': overlap_ratio, | |
| 'semantic_similarity': semantic_similarity | |
| } | |
| def _compare_relations(self, relations1: List[Dict], relations2: List[Dict]) -> Dict[str, Any]: | |
| """Compare relations between two graphs""" | |
| # Create relation signatures for comparison | |
| def create_relation_signature(relation): | |
| return f"{relation.get('type', '')} {relation.get('description', '')}".strip().lower() | |
| # Get relation sets | |
| sig1_set = {create_relation_signature(r) for r in relations1} | |
| sig2_set = {create_relation_signature(r) for r in relations2} | |
| # Calculate overlap | |
| overlap = sig1_set & sig2_set | |
| unique_to_1 = sig1_set - sig2_set | |
| unique_to_2 = sig2_set - sig1_set | |
| # Calculate overlap ratio | |
| total_unique = len(sig1_set | sig2_set) | |
| overlap_ratio = len(overlap) / total_unique if total_unique > 0 else 0.0 | |
| # Calculate semantic similarity | |
| semantic_similarity = self._calculate_relation_semantic_similarity(relations1, relations2) | |
| return { | |
| 'overlap_count': len(overlap), | |
| 'unique_to_graph1': len(unique_to_1), | |
| 'unique_to_graph2': len(unique_to_2), | |
| 'overlap_ratio': overlap_ratio, | |
| 'semantic_similarity': semantic_similarity | |
| } | |
| def _calculate_entity_semantic_similarity(self, entities1: List[Dict], entities2: List[Dict]) -> float: | |
| """Calculate semantic similarity between entity sets using embeddings""" | |
| if not entities1 or not entities2: | |
| return 0.0 | |
| # Create text representations for entities | |
| texts1 = [f"{e.get('type', '')} {e.get('name', '')} {e.get('description', '')}".strip() for e in entities1] | |
| texts2 = [f"{e.get('type', '')} {e.get('name', '')} {e.get('description', '')}".strip() for e in entities2] | |
| # Calculate similarity matrix | |
| similarities = [] | |
| for text1 in texts1: | |
| best_sim = 0.0 | |
| for text2 in texts2: | |
| sim = self.calculate_similarity(text1, text2) | |
| best_sim = max(best_sim, sim) | |
| similarities.append(best_sim) | |
| return np.mean(similarities) if similarities else 0.0 | |
| def _calculate_relation_semantic_similarity(self, relations1: List[Dict], relations2: List[Dict]) -> float: | |
| """Calculate semantic similarity between relation sets using embeddings""" | |
| if not relations1 or not relations2: | |
| return 0.0 | |
| # Create text representations for relations | |
| texts1 = [f"{r.get('type', '')} {r.get('description', '')}".strip() for r in relations1] | |
| texts2 = [f"{r.get('type', '')} {r.get('description', '')}".strip() for r in relations2] | |
| # Calculate similarity matrix | |
| similarities = [] | |
| for text1 in texts1: | |
| best_sim = 0.0 | |
| for text2 in texts2: | |
| sim = self.calculate_similarity(text1, text2) | |
| best_sim = max(best_sim, sim) | |
| similarities.append(best_sim) | |
| return np.mean(similarities) if similarities else 0.0 | |
| def _calculate_structural_metrics(self, entities1: List[Dict], relations1: List[Dict], | |
| entities2: List[Dict], relations2: List[Dict]) -> Dict[str, Any]: | |
| """Calculate structural similarity metrics""" | |
| # Calculate graph densities | |
| n1 = len(entities1) | |
| e1 = len(relations1) | |
| density1 = (2 * e1) / (n1 * (n1 - 1)) if n1 > 1 else 0.0 | |
| n2 = len(entities2) | |
| e2 = len(relations2) | |
| density2 = (2 * e2) / (n2 * (n2 - 1)) if n2 > 1 else 0.0 | |
| density_difference = abs(density1 - density2) | |
| # Find common patterns (simple heuristic based on relation types) | |
| pattern1 = self._extract_patterns(relations1) | |
| pattern2 = self._extract_patterns(relations2) | |
| common_patterns = len(set(pattern1) & set(pattern2)) | |
| return { | |
| 'graph1_density': density1, | |
| 'graph2_density': density2, | |
| 'density_difference': density_difference, | |
| 'common_patterns_count': common_patterns | |
| } | |
| def _extract_patterns(self, relations: List[Dict]) -> List[str]: | |
| """Extract structural patterns from relations""" | |
| patterns = [] | |
| for relation in relations: | |
| pattern = f"{relation.get('type', 'UNKNOWN')}" | |
| patterns.append(pattern) | |
| return patterns | |
| def _calculate_type_distribution_metrics(self, entities1: List[Dict], relations1: List[Dict], | |
| entities2: List[Dict], relations2: List[Dict]) -> Dict[str, Any]: | |
| """Calculate type distribution similarity metrics""" | |
| # Entity type distributions | |
| entity_types1 = {} | |
| for entity in entities1: | |
| etype = entity.get('type', 'Unknown') | |
| entity_types1[etype] = entity_types1.get(etype, 0) + 1 | |
| entity_types2 = {} | |
| for entity in entities2: | |
| etype = entity.get('type', 'Unknown') | |
| entity_types2[etype] = entity_types2.get(etype, 0) + 1 | |
| # Relation type distributions | |
| relation_types1 = {} | |
| for relation in relations1: | |
| rtype = relation.get('type', 'Unknown') | |
| relation_types1[rtype] = relation_types1.get(rtype, 0) + 1 | |
| relation_types2 = {} | |
| for relation in relations2: | |
| rtype = relation.get('type', 'Unknown') | |
| relation_types2[rtype] = relation_types2.get(rtype, 0) + 1 | |
| # Calculate similarity using cosine similarity of type distributions | |
| entity_type_similarity = self._calculate_distribution_similarity(entity_types1, entity_types2) | |
| relation_type_similarity = self._calculate_distribution_similarity(relation_types1, relation_types2) | |
| return { | |
| 'entity_type_similarity': entity_type_similarity, | |
| 'relation_type_similarity': relation_type_similarity | |
| } | |
| def _calculate_distribution_similarity(self, dist1: Dict[str, int], dist2: Dict[str, int]) -> float: | |
| """Calculate similarity between two distributions using cosine similarity""" | |
| if not dist1 and not dist2: | |
| return 1.0 | |
| if not dist1 or not dist2: | |
| return 0.0 | |
| # Get all unique keys | |
| all_keys = set(dist1.keys()) | set(dist2.keys()) | |
| # Create vectors | |
| vec1 = np.array([dist1.get(key, 0) for key in all_keys]) | |
| vec2 = np.array([dist2.get(key, 0) for key in all_keys]) | |
| # Calculate cosine similarity | |
| if np.sum(vec1) == 0 or np.sum(vec2) == 0: | |
| return 0.0 | |
| vec1 = vec1.reshape(1, -1) | |
| vec2 = vec2.reshape(1, -1) | |
| similarity = cosine_similarity(vec1, vec2)[0][0] | |
| return float(similarity) | |
| def _calculate_overall_similarity(self, entity_metrics: Dict, relation_metrics: Dict, | |
| structural_metrics: Dict, type_metrics: Dict) -> Dict[str, Any]: | |
| """Calculate overall similarity scores""" | |
| # Structural similarity (combination of density and type distribution) | |
| structural_sim = ( | |
| (1 - structural_metrics['density_difference']) * 0.3 + | |
| type_metrics['entity_type_similarity'] * 0.35 + | |
| type_metrics['relation_type_similarity'] * 0.35 | |
| ) | |
| # Content similarity (combination of entity and relation overlaps) | |
| content_sim = ( | |
| entity_metrics['overlap_ratio'] * 0.4 + | |
| relation_metrics['overlap_ratio'] * 0.3 + | |
| entity_metrics['semantic_similarity'] * 0.15 + | |
| relation_metrics['semantic_similarity'] * 0.15 | |
| ) | |
| # Overall similarity (weighted combination) | |
| overall_sim = structural_sim * 0.4 + content_sim * 0.6 | |
| return { | |
| 'structural_similarity': max(0.0, min(1.0, structural_sim)), | |
| 'content_similarity': max(0.0, min(1.0, content_sim)), | |
| 'overall_similarity': max(0.0, min(1.0, overall_sim)) | |
| } | |
| def _generate_graph_stats(self, entities: List[Dict], relations: List[Dict], graph_name: str) -> Dict[str, Any]: | |
| """Generate comprehensive statistics for a graph""" | |
| # Entity type counts | |
| entity_types = {} | |
| for entity in entities: | |
| etype = entity.get('type', 'Unknown') | |
| entity_types[etype] = entity_types.get(etype, 0) + 1 | |
| # Relation type counts | |
| relation_types = {} | |
| for relation in relations: | |
| rtype = relation.get('type', 'Unknown') | |
| relation_types[rtype] = relation_types.get(rtype, 0) + 1 | |
| # Calculate basic metrics | |
| n_entities = len(entities) | |
| n_relations = len(relations) | |
| density = (2 * n_relations) / (n_entities * (n_entities - 1)) if n_entities > 1 else 0.0 | |
| return { | |
| 'name': graph_name, | |
| 'entity_count': n_entities, | |
| 'relation_count': n_relations, | |
| 'density': density, | |
| 'entity_types': entity_types, | |
| 'relation_types': relation_types, | |
| 'avg_relations_per_entity': n_relations / n_entities if n_entities > 0 else 0.0 | |
| } | |
| def _compare_entities_semantic(self, entities1: List[Dict], entities2: List[Dict]) -> Dict[str, Any]: | |
| """Compare entities using semantic similarity for overlap detection""" | |
| if not entities1 or not entities2: | |
| return { | |
| 'overlap_count': 0, | |
| 'unique_to_graph1': len(entities1), | |
| 'unique_to_graph2': len(entities2), | |
| 'overlap_ratio': 0.0, | |
| 'semantic_similarity': 0.0 | |
| } | |
| logging.info(f"Starting semantic entity comparison: {len(entities1)} entities in graph1, {len(entities2)} entities in graph2") | |
| logging.info(f"Total potential comparisons: {len(entities1)} x {len(entities2)} = {len(entities1) * len(entities2)}") | |
| # Pre-compute all text representations | |
| logging.info("Pre-computing text representations for entities...") | |
| texts1 = [] | |
| texts2 = [] | |
| for i, entity1 in enumerate(entities1): | |
| type1 = entity1.get('type', '').strip() | |
| name1 = entity1.get('name', '').strip() | |
| desc1 = entity1.get('description', '').strip() | |
| text1_parts = [type1, name1] | |
| if desc1: | |
| text1_parts.append(desc1) | |
| text1 = ' '.join(filter(None, text1_parts)).strip() | |
| texts1.append(text1) | |
| if i % 5 == 0 or i == len(entities1) - 1: | |
| logging.info(f" Processed {i+1}/{len(entities1)} entities from graph1") | |
| for i, entity2 in enumerate(entities2): | |
| type2 = entity2.get('type', '').strip() | |
| name2 = entity2.get('name', '').strip() | |
| desc2 = entity2.get('description', '').strip() | |
| text2_parts = [type2, name2] | |
| if desc2: | |
| text2_parts.append(desc2) | |
| text2 = ' '.join(filter(None, text2_parts)).strip() | |
| texts2.append(text2) | |
| if i % 5 == 0 or i == len(entities2) - 1: | |
| logging.info(f" Processed {i+1}/{len(entities2)} entities from graph2") | |
| # Batch compute embeddings | |
| logging.info("Computing embeddings in batches...") | |
| embeddings1 = self._get_embeddings_batch(texts1, "graph1 entities") | |
| embeddings2 = self._get_embeddings_batch(texts2, "graph2 entities") | |
| # Find semantic matches using similarity threshold | |
| logging.info(f"Finding semantic matches with threshold {self.semantic_threshold}...") | |
| matched_entities1 = set() | |
| matched_entities2 = set() | |
| overlap_count = 0 | |
| total_comparisons = 0 | |
| for i, (entity1, text1, emb1) in enumerate(zip(entities1, texts1, embeddings1)): | |
| if not text1 or emb1 is None: # Skip entities with no meaningful text or failed embeddings | |
| continue | |
| best_match_idx = None | |
| best_similarity = 0.0 | |
| for j, (entity2, text2, emb2) in enumerate(zip(entities2, texts2, embeddings2)): | |
| if j in matched_entities2 or not text2 or emb2 is None: | |
| continue | |
| # Calculate similarity using precomputed embeddings | |
| similarity = self._calculate_similarity_from_embeddings(emb1, emb2) | |
| total_comparisons += 1 | |
| if similarity >= self.semantic_threshold and similarity > best_similarity: | |
| best_similarity = similarity | |
| best_match_idx = j | |
| if best_match_idx is not None: | |
| matched_entities1.add(i) | |
| matched_entities2.add(best_match_idx) | |
| overlap_count += 1 | |
| logging.info(f" Match found: entity {i} ('{texts1[i][:50]}...') -> entity {best_match_idx} ('{texts2[best_match_idx][:50]}...'), similarity: {best_similarity:.3f}") | |
| if i % 5 == 0 or i == len(entities1) - 1: | |
| logging.info(f" Processed {i+1}/{len(entities1)} entities from graph1, found {overlap_count} matches so far") | |
| unique_to_1 = len(entities1) - overlap_count | |
| unique_to_2 = len(entities2) - overlap_count | |
| # Calculate overlap ratio | |
| total_unique = len(entities1) + len(entities2) - overlap_count | |
| overlap_ratio = overlap_count / total_unique if total_unique > 0 else 0.0 | |
| # Calculate semantic similarity using existing method | |
| semantic_similarity = self._calculate_entity_semantic_similarity(entities1, entities2) | |
| logging.info(f"Entity semantic comparison completed:") | |
| logging.info(f" Total comparisons made: {total_comparisons}") | |
| logging.info(f" Overlaps found: {overlap_count}") | |
| logging.info(f" Unique to graph1: {unique_to_1}") | |
| logging.info(f" Unique to graph2: {unique_to_2}") | |
| logging.info(f" Overlap ratio: {overlap_ratio:.3f}") | |
| return { | |
| 'overlap_count': overlap_count, | |
| 'unique_to_graph1': unique_to_1, | |
| 'unique_to_graph2': unique_to_2, | |
| 'overlap_ratio': overlap_ratio, | |
| 'semantic_similarity': semantic_similarity | |
| } | |
| def _compare_relations_semantic(self, relations1: List[Dict], relations2: List[Dict]) -> Dict[str, Any]: | |
| """Compare relations using semantic similarity for overlap detection""" | |
| if not relations1 or not relations2: | |
| return { | |
| 'overlap_count': 0, | |
| 'unique_to_graph1': len(relations1), | |
| 'unique_to_graph2': len(relations2), | |
| 'overlap_ratio': 0.0, | |
| 'semantic_similarity': 0.0 | |
| } | |
| logging.info(f"Starting semantic relation comparison: {len(relations1)} relations in graph1, {len(relations2)} relations in graph2") | |
| logging.info(f"Total potential comparisons: {len(relations1)} x {len(relations2)} = {len(relations1) * len(relations2)}") | |
| # Pre-compute all text representations | |
| logging.info("Pre-computing text representations for relations...") | |
| texts1 = [] | |
| texts2 = [] | |
| for i, relation1 in enumerate(relations1): | |
| type1 = relation1.get('type', '').strip() | |
| desc1 = relation1.get('description', '').strip() | |
| source1 = relation1.get('source', '').strip() | |
| target1 = relation1.get('target', '').strip() | |
| text1_parts = [type1] | |
| if desc1: | |
| text1_parts.append(desc1) | |
| if source1 and target1: | |
| text1_parts.append(f"from {source1} to {target1}") | |
| elif source1: | |
| text1_parts.append(f"from {source1}") | |
| elif target1: | |
| text1_parts.append(f"to {target1}") | |
| text1 = ' '.join(filter(None, text1_parts)).strip() | |
| texts1.append(text1) | |
| if i % 5 == 0 or i == len(relations1) - 1: | |
| logging.info(f" Processed {i+1}/{len(relations1)} relations from graph1") | |
| for i, relation2 in enumerate(relations2): | |
| type2 = relation2.get('type', '').strip() | |
| desc2 = relation2.get('description', '').strip() | |
| source2 = relation2.get('source', '').strip() | |
| target2 = relation2.get('target', '').strip() | |
| text2_parts = [type2] | |
| if desc2: | |
| text2_parts.append(desc2) | |
| if source2 and target2: | |
| text2_parts.append(f"from {source2} to {target2}") | |
| elif source2: | |
| text2_parts.append(f"from {source2}") | |
| elif target2: | |
| text2_parts.append(f"to {target2}") | |
| text2 = ' '.join(filter(None, text2_parts)).strip() | |
| texts2.append(text2) | |
| if i % 5 == 0 or i == len(relations2) - 1: | |
| logging.info(f" Processed {i+1}/{len(relations2)} relations from graph2") | |
| # Batch compute embeddings | |
| logging.info("Computing embeddings in batches...") | |
| embeddings1 = self._get_embeddings_batch(texts1, "graph1 relations") | |
| embeddings2 = self._get_embeddings_batch(texts2, "graph2 relations") | |
| # Find semantic matches using similarity threshold | |
| logging.info(f"Finding semantic matches with threshold {self.semantic_threshold}...") | |
| matched_relations1 = set() | |
| matched_relations2 = set() | |
| overlap_count = 0 | |
| total_comparisons = 0 | |
| for i, (relation1, text1, emb1) in enumerate(zip(relations1, texts1, embeddings1)): | |
| if not text1 or emb1 is None: # Skip relations with no meaningful text or failed embeddings | |
| continue | |
| best_match_idx = None | |
| best_similarity = 0.0 | |
| for j, (relation2, text2, emb2) in enumerate(zip(relations2, texts2, embeddings2)): | |
| if j in matched_relations2 or not text2 or emb2 is None: | |
| continue | |
| # Calculate similarity using precomputed embeddings | |
| similarity = self._calculate_similarity_from_embeddings(emb1, emb2) | |
| total_comparisons += 1 | |
| if similarity >= self.semantic_threshold and similarity > best_similarity: | |
| best_similarity = similarity | |
| best_match_idx = j | |
| if best_match_idx is not None: | |
| matched_relations1.add(i) | |
| matched_relations2.add(best_match_idx) | |
| overlap_count += 1 | |
| logging.info(f" Match found: relation {i} ('{texts1[i][:50]}...') -> relation {best_match_idx} ('{texts2[best_match_idx][:50]}...'), similarity: {best_similarity:.3f}") | |
| if i % 5 == 0 or i == len(relations1) - 1: | |
| logging.info(f" Processed {i+1}/{len(relations1)} relations from graph1, found {overlap_count} matches so far") | |
| unique_to_1 = len(relations1) - overlap_count | |
| unique_to_2 = len(relations2) - overlap_count | |
| # Calculate overlap ratio | |
| total_unique = len(relations1) + len(relations2) - overlap_count | |
| overlap_ratio = overlap_count / total_unique if total_unique > 0 else 0.0 | |
| # Calculate semantic similarity using existing method | |
| semantic_similarity = self._calculate_relation_semantic_similarity(relations1, relations2) | |
| logging.info(f"Relation semantic comparison completed:") | |
| logging.info(f" Total comparisons made: {total_comparisons}") | |
| logging.info(f" Overlaps found: {overlap_count}") | |
| logging.info(f" Unique to graph1: {unique_to_1}") | |
| logging.info(f" Unique to graph2: {unique_to_2}") | |
| logging.info(f" Overlap ratio: {overlap_ratio:.3f}") | |
| return { | |
| 'overlap_count': overlap_count, | |
| 'unique_to_graph1': unique_to_1, | |
| 'unique_to_graph2': unique_to_2, | |
| 'overlap_ratio': overlap_ratio, | |
| 'semantic_similarity': semantic_similarity | |
| } | |
| def compare_knowledge_graphs(graph1_data: Dict[str, Any], graph2_data: Dict[str, Any], | |
| similarity_threshold: float = 0.7, semantic_threshold: float = 0.75, | |
| use_cache: bool = True) -> GraphComparisonMetrics: | |
| """ | |
| Convenience function to compare two knowledge graphs. | |
| Args: | |
| graph1_data: First knowledge graph data | |
| graph2_data: Second knowledge graph data | |
| similarity_threshold: Threshold for semantic similarity matching (0.7 = 70%) | |
| semantic_threshold: Threshold for semantic overlap detection (0.75 = 75%) | |
| use_cache: Whether to use embedding cache (default: True) | |
| Returns: | |
| Comprehensive comparison metrics | |
| """ | |
| comparator = KnowledgeGraphComparator( | |
| similarity_threshold=similarity_threshold, | |
| semantic_threshold=semantic_threshold, | |
| use_cache=use_cache | |
| ) | |
| return comparator.compare_graphs(graph1_data, graph2_data) |