""" 🔄 Intelligent Entity Merge ترکیب هوشمند mapping tables از chunks مختلف """ import logging from typing import Dict, List, Optional, Tuple from .normalizer import EntityNormalizer logger = logging.getLogger(__name__) class EntityMerger: """ کلاس برای ترکیب هوشمند mapping tables این کلاس mapping های چندین chunk را با استفاده از: - Normalization (نرمال‌سازی) - Exact Matching (تطابق دقیق) - Fuzzy Matching (تطابق تقریبی - فقط برای company) ترکیب می‌کند و یک Global Mapping ایجاد می‌کند """ def __init__(self, fuzzy_threshold: float = 0.75): """ مقداردهی اولیه merger Args: fuzzy_threshold: حد آستانه برای fuzzy matching (0.0 تا 1.0) پیش‌فرض: 0.75 (محافظه‌کارانه) """ if not 0.0 <= fuzzy_threshold <= 1.0: raise ValueError("fuzzy_threshold باید بین 0.0 و 1.0 باشد") self.normalizer = EntityNormalizer() self.fuzzy_threshold = fuzzy_threshold logger.info( f"✅ EntityMerger initialized: " f"fuzzy_threshold={fuzzy_threshold}" ) def merge_mappings( self, mapping_tables: List[Dict] ) -> Dict: """ ترکیب چندین mapping table به یک global mapping Args: mapping_tables: لیست از mapping tables [ { "chunk_id": "chunk_01", "mapping": { "company-01": "شرکت پارس", ... } }, ... ] Returns: { "global_mapping": { "company-01": "شرکت پارس", "person-01": "علی احمدی", ... }, "remapping": [ { "chunk_id": "chunk_01", "mapping": {} # خالی چون اولین chunk نیاز به remap نداره }, { "chunk_id": "chunk_02", "mapping": { "company-01": "company-03", # remap شده "amount-01": "amount-06", ... } }, ... ] } """ logger.info(f"🔄 شروع merge {len(mapping_tables)} mapping tables...") # بررسی‌های اولیه if not mapping_tables: logger.warning("⚠️ لیست mapping tables خالی است") return {"global_mapping": {}, "remapping": []} if len(mapping_tables) == 1: # فقط یک table - نیازی به merge نیست logger.info("✅ فقط یک table - بدون merge") return { "global_mapping": mapping_tables[0]["mapping"], "remapping": [ { "chunk_id": mapping_tables[0]["chunk_id"], "mapping": {} } ] } # شروع با اولین table به عنوان base global_mapping = dict(mapping_tables[0]["mapping"]) global_counters = self._extract_counters(global_mapping) logger.info( f"📊 Base mapping (chunk 1): " f"{len(global_mapping)} entities" ) # اولین chunk نیاز به remapping نداره remapping_list = [ { "chunk_id": mapping_tables[0]["chunk_id"], "mapping": {} } ] # Merge بقیه tables for i, table_data in enumerate(mapping_tables[1:], start=2): chunk_id = table_data["chunk_id"] table_mapping = table_data["mapping"] logger.info(f"🔄 Merging {chunk_id} ({i}/{len(mapping_tables)})...") logger.info(f" Entities in this chunk: {len(table_mapping)}") # Merge این table با global chunk_remapping = self._merge_single_table( table_mapping, global_mapping, global_counters ) remapping_list.append({ "chunk_id": chunk_id, "mapping": chunk_remapping }) logger.info(f" ✅ Remapped {len(chunk_remapping)} entities") logger.info(f"✅ Merge complete: {len(global_mapping)} total entities") # لاگ خلاصه self._log_summary(global_mapping, global_counters) return { "global_mapping": global_mapping, "remapping": remapping_list } def _merge_single_table( self, table: Dict, global_mapping: Dict, global_counters: Dict ) -> Dict: """ Merge یک table با global mapping Args: table: mapping table جدید global_mapping: mapping کل (به‌روزرسانی می‌شود) global_counters: شمارنده‌های جاری (به‌روزرسانی می‌شود) Returns: remapping dictionary: {old_placeholder: new_placeholder} """ remapping = {} # آمار matching stats = { "exact": 0, "fuzzy": 0, "new": 0 } # گروه‌بندی entities بر اساس type entities_by_type = self._group_by_type(table) # پردازش هر نوع entity for entity_type in ["person", "company", "amount", "percent"]: if entity_type not in entities_by_type: continue entities = entities_by_type[entity_type] for old_placeholder, original_value in entities.items(): # نرمال‌سازی normalized = self.normalizer.normalize(original_value, entity_type) # STEP 1: Exact Matching matched_placeholder = self._exact_match( entity_type, normalized, global_mapping ) if matched_placeholder: remapping[old_placeholder] = matched_placeholder stats["exact"] += 1 continue # STEP 2: Fuzzy Matching (فقط برای company) if entity_type == "company": matched_placeholder, score = self._fuzzy_match( normalized, global_mapping ) if matched_placeholder: remapping[old_placeholder] = matched_placeholder stats["fuzzy"] += 1 continue # STEP 3: New Entity new_placeholder = self._create_new_placeholder( entity_type, global_counters ) remapping[old_placeholder] = new_placeholder global_mapping[new_placeholder] = original_value stats["new"] += 1 # لاگ آمار logger.info( f" 📊 Matching stats: " f"exact={stats['exact']}, " f"fuzzy={stats['fuzzy']}, " f"new={stats['new']}" ) return remapping def _exact_match( self, entity_type: str, normalized_value: str, global_mapping: Dict ) -> Optional[str]: """ تطابق دقیق (Exact Match) Args: entity_type: نوع entity normalized_value: مقدار نرمال‌شده global_mapping: mapping کل Returns: placeholder اگر match کرد، وگرنه None """ for placeholder, original in global_mapping.items(): # فقط placeholderهای همون type رو چک کن if not placeholder.startswith(entity_type): continue # نرمال‌سازی مقدار global global_normalized = self.normalizer.normalize(original, entity_type) # تطابق دقیق if global_normalized == normalized_value: logger.debug( f"✅ EXACT MATCH: '{normalized_value}' → {placeholder}" ) return placeholder return None def _fuzzy_match( self, normalized_value: str, global_mapping: Dict ) -> Tuple[Optional[str], float]: """ تطابق تقریبی (Fuzzy Match) - فقط برای company از Token Overlap (Jaccard Similarity) استفاده می‌کند Args: normalized_value: نام company نرمال‌شده global_mapping: mapping کل Returns: (placeholder, score) اگه match کرد، وگرنه (None, 0.0) """ best_match = None best_score = 0.0 for placeholder, original in global_mapping.items(): # فقط company ها if not placeholder.startswith("company"): continue # نرمال‌سازی global_normalized = self.normalizer.normalize(original, "company") # محاسبه similarity score = self._token_overlap_score( normalized_value, global_normalized ) # بهترین match if score > best_score and score >= self.fuzzy_threshold: best_score = score best_match = placeholder if best_match: logger.debug( f"✅ FUZZY MATCH: '{normalized_value}' → " f"{best_match} (score={best_score:.2f})" ) return (best_match, best_score) if best_match else (None, 0.0) def _token_overlap_score(self, str1: str, str2: str) -> float: """ محاسبه Jaccard Similarity بین دو رشته Jaccard = |intersection| / |union| Args: str1, str2: دو رشته برای مقایسه Returns: امتیاز similarity (0.0 تا 1.0) Examples: >>> _token_overlap_score("شرکت ملی نفت", "شرکت ملی نفت ایران") 0.75 # 3/4 = 0.75 """ # تقسیم به tokens (کلمات) tokens1 = set(str1.split()) tokens2 = set(str2.split()) # Jaccard similarity intersection = tokens1 & tokens2 union = tokens1 | tokens2 if not union: return 0.0 return len(intersection) / len(union) def _group_by_type(self, mapping: Dict) -> Dict[str, Dict]: """ گروه‌بندی entities بر اساس type Args: mapping: یک mapping table Returns: { "person": {"person-01": "علی", ...}, "company": {"company-01": "پارس", ...}, ... } """ groups = {} for placeholder, value in mapping.items(): # استخراج type از placeholder (قبل از -) entity_type = placeholder.split('-')[0] if entity_type not in groups: groups[entity_type] = {} groups[entity_type][placeholder] = value return groups def _extract_counters(self, mapping: Dict) -> Dict[str, int]: """ استخراج counters فعلی از mapping Args: mapping: mapping table Returns: {"person": 2, "company": 3, ...} """ counters = { "person": 0, "company": 0, "amount": 0, "percent": 0 } for placeholder in mapping.keys(): parts = placeholder.split('-') if len(parts) != 2: continue entity_type = parts[0] try: num = int(parts[1]) except ValueError: continue if entity_type in counters: counters[entity_type] = max(counters[entity_type], num) return counters def _create_new_placeholder( self, entity_type: str, counters: Dict[str, int] ) -> str: """ ایجاد placeholder جدید Args: entity_type: نوع entity counters: شمارنده‌های فعلی (به‌روزرسانی می‌شود) Returns: placeholder جدید (مثلاً "company-03") """ counters[entity_type] += 1 new_placeholder = f"{entity_type}-{counters[entity_type]:02d}" logger.debug(f"🆕 NEW ENTITY: {new_placeholder}") return new_placeholder def _log_summary(self, global_mapping: Dict, counters: Dict): """لاگ خلاصه نتایج merge""" logger.info("=" * 60) logger.info("📊 Merge Summary:") logger.info(f" Total entities: {len(global_mapping)}") logger.info(f" - Persons: {counters['person']}") logger.info(f" - Companies: {counters['company']}") logger.info(f" - Amounts: {counters['amount']}") logger.info(f" - Percents: {counters['percent']}") logger.info("=" * 60) # ✅ تست‌های سریع if __name__ == "__main__": print("=" * 60) print("🧪 Testing Merger Module") print("=" * 60) merger = EntityMerger(fuzzy_threshold=0.75) # تست 1: Merge ساده print("\n📊 Test 1: Simple Merge") table1 = { "chunk_id": "chunk_01", "mapping": { "company-01": "شرکت پارس", "company-02": "شرکت صبا", "person-01": "علی احمدی", "amount-01": "50 میلیارد ریال" } } table2 = { "chunk_id": "chunk_02", "mapping": { "company-01": "شرکت پارس", # فاصله اضافی - باید match بشه "company-02": "شرکت جدید", "person-01": "علی احمدی", # باید match بشه "person-02": "مریم کریمی", # جدید "amount-01": "100 میلیارد ریال" # جدید (عدد متفاوت) } } result = merger.merge_mappings([table1, table2]) print("\nGlobal Mapping:") for k, v in sorted(result['global_mapping'].items()): print(f" {k}: {v}") print("\nRemapping for chunk_02:") remap = result['remapping'][1]['mapping'] for old, new in remap.items(): print(f" {old} → {new}") # تست 2: Fuzzy matching print("\n📊 Test 2: Fuzzy Matching (Company)") table3 = { "chunk_id": "chunk_03", "mapping": { "company-01": "شرکت ملی نفت ایران", } } table4 = { "chunk_id": "chunk_04", "mapping": { "company-01": "شرکت ملی نفت", # substring - باید match بشه } } result2 = merger.merge_mappings([table3, table4]) print("\nGlobal Mapping:") for k, v in sorted(result2['global_mapping'].items()): print(f" {k}: {v}") print("\nRemapping for chunk_04:") remap2 = result2['remapping'][1]['mapping'] for old, new in remap2.items(): print(f" {old} → {new}") print("\n" + "=" * 60) print("✅ All tests completed!") print("=" * 60)