File size: 7,140 Bytes
d1564d4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
#!/usr/bin/env python3
"""
Test Entity Resolution

Quick test script to validate the entity resolution system on existing
Summit Digital Solutions data before rebuilding the full knowledge graph.
"""

import sys
import json
from pathlib import Path
from typing import Dict, List, Any

# Add app to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))

from app.core.entity_resolution import EntityResolver
from app.core.logging import setup_logging

# Set up logging
logger = setup_logging("test_entity_resolution", log_level="INFO")

def load_existing_entities(store_name: str = "summit-digital-solutions-inc") -> Dict[str, List[Dict]]:
    """Load existing entities from the knowledge graph"""
    entities_file = Path(__file__).parent.parent / "data" / "search_indexes" / "knowledge_graphs" / f"{store_name}_entities.json"
    
    if not entities_file.exists():
        raise FileNotFoundError(f"Entities file not found: {entities_file}")
    
    with open(entities_file, 'r') as f:
        data = json.load(f)
    
    return {
        'companies': data.get('companies', []),
        'people': data.get('people', []),
        'financial_metrics': data.get('financial_metrics', []),
        'documents': data.get('documents', [])
    }

def analyze_sample_entities(entities: Dict[str, List[Dict]], sample_size: int = 20):
    """Analyze a sample of entities to understand potential duplicates"""
    print("\nπŸ” Sample Entity Analysis:")
    print("=" * 50)
    
    for entity_type, entity_list in entities.items():
        if not entity_list:
            continue
            
        print(f"\n{entity_type.upper()} (showing first {sample_size}):")
        print("-" * 30)
        
        # Show sample entities with their key attributes
        sample_entities = entity_list[:sample_size]
        for i, entity in enumerate(sample_entities, 1):
            name = entity.get('name', 'N/A')
            confidence = entity.get('confidence', 0.0)
            source = entity.get('source', 'N/A')
            context = entity.get('context', '')[:100] + "..." if len(entity.get('context', '')) > 100 else entity.get('context', '')
            
            print(f"{i:2d}. {name}")
            print(f"    Confidence: {confidence:.3f}")
            print(f"    Source: {source}")
            print(f"    Context: {context}")
            print()

def find_potential_duplicates(entities: Dict[str, List[Dict]]) -> Dict[str, List[List[str]]]:
    """Find potential duplicates using simple string matching"""
    potential_duplicates = {}
    
    for entity_type, entity_list in entities.items():
        if len(entity_list) < 2:
            continue
            
        # Group by normalized names
        name_groups = {}
        for entity in entity_list:
            name = entity.get('name', '').strip().lower()
            # Simple normalization
            name = name.replace(',', '').replace('.', '').replace('inc', '').replace('corp', '').strip()
            
            if name not in name_groups:
                name_groups[name] = []
            name_groups[name].append(entity.get('name', ''))
        
        # Find groups with multiple entities
        duplicates = []
        for normalized_name, original_names in name_groups.items():
            if len(original_names) > 1:
                duplicates.append(original_names)
        
        if duplicates:
            potential_duplicates[entity_type] = duplicates
    
    return potential_duplicates

def test_entity_resolution():
    """Test the entity resolution system"""
    print("πŸ§ͺ Testing Entity Resolution System")
    print("=" * 40)
    
    try:
        # Load existing entities
        print("πŸ“₯ Loading existing entities...")
        entities = load_existing_entities()
        
        # Show original counts
        print("\nπŸ“Š Original Entity Counts:")
        total_original = 0
        for entity_type, entity_list in entities.items():
            count = len(entity_list)
            total_original += count
            print(f"  {entity_type}: {count}")
        print(f"  TOTAL: {total_original}")
        
        # Analyze sample entities
        analyze_sample_entities(entities)
        
        # Find potential duplicates using simple string matching
        print("\nπŸ” Potential Duplicates (simple string matching):")
        potential_duplicates = find_potential_duplicates(entities)
        for entity_type, duplicate_groups in potential_duplicates.items():
            print(f"\n{entity_type}:")
            for i, group in enumerate(duplicate_groups[:5], 1):  # Show first 5 groups
                print(f"  {i}. {group}")
        
        # Test entity resolution with a smaller sample first
        print("\nπŸ”¬ Testing Entity Resolution (sample):")
        sample_entities = {}
        for entity_type, entity_list in entities.items():
            # Take first 10 entities of each type for testing (smaller sample for speed)
            sample_entities[entity_type] = entity_list[:10]
        
        # Initialize resolver and test
        resolver = EntityResolver()
        
        print("πŸš€ Running entity resolution...")
        resolved_entities = resolver.resolve_entities(sample_entities)
        
        # Show results
        print("\nπŸ“ˆ Resolution Results (sample):")
        stats = resolver.get_resolution_stats(sample_entities, resolved_entities)
        
        print(f"Overall: {stats['total_before']} β†’ {stats['total_after']} entities "
              f"({stats['overall_reduction_percentage']:.1f}% reduction)")
        
        for entity_type, type_stats in stats['by_type'].items():
            if type_stats['duplicates_removed'] > 0:
                print(f"  {entity_type}: {type_stats['before']} β†’ {type_stats['after']} "
                      f"({type_stats['duplicates_removed']} duplicates, "
                      f"{type_stats['reduction_percentage']:.1f}% reduction)")
        
        # Show some examples of resolved entities
        print("\n✨ Example Resolved Entities:")
        for entity_type, entity_list in resolved_entities.items():
            merged_entities = [e for e in entity_list if e.get('cluster_size', 1) > 1]
            if merged_entities:
                print(f"\n{entity_type} (showing merged entities):")
                for entity in merged_entities[:3]:  # Show first 3 merged entities
                    print(f"  β€’ {entity['name']} (merged {entity['cluster_size']} entities)")
                    if entity.get('sources'):
                        print(f"    Sources: {len(entity['sources'])} documents")
                    if entity.get('merged_confidence'):
                        print(f"    Avg confidence: {entity['merged_confidence']:.3f}")
        
        print("\nβœ… Entity resolution test completed successfully!")
        
    except Exception as e:
        logger.error(f"Entity resolution test failed: {e}")
        import traceback
        traceback.print_exc()
        return False
    
    return True

if __name__ == "__main__":
    success = test_entity_resolution()
    sys.exit(0 if success else 1)