Spaces:
Build error
Build error
| """ | |
| 🔄 Intelligent Entity Merge | |
| ترکیب هوشمند mapping tables از chunks مختلف | |
| """ | |
| import logging | |
| from typing import Dict, List, Optional, Tuple | |
| from .normalizer import EntityNormalizer | |
| logger = logging.getLogger(__name__) | |
| class EntityMerger: | |
| """ | |
| کلاس برای ترکیب هوشمند mapping tables | |
| این کلاس mapping های چندین chunk را با استفاده از: | |
| - Normalization (نرمالسازی) | |
| - Exact Matching (تطابق دقیق) | |
| - Fuzzy Matching (تطابق تقریبی - فقط برای company) | |
| ترکیب میکند و یک Global Mapping ایجاد میکند | |
| """ | |
| def __init__(self, fuzzy_threshold: float = 0.75): | |
| """ | |
| مقداردهی اولیه merger | |
| Args: | |
| fuzzy_threshold: حد آستانه برای fuzzy matching (0.0 تا 1.0) | |
| پیشفرض: 0.75 (محافظهکارانه) | |
| """ | |
| if not 0.0 <= fuzzy_threshold <= 1.0: | |
| raise ValueError("fuzzy_threshold باید بین 0.0 و 1.0 باشد") | |
| self.normalizer = EntityNormalizer() | |
| self.fuzzy_threshold = fuzzy_threshold | |
| logger.info( | |
| f"✅ EntityMerger initialized: " | |
| f"fuzzy_threshold={fuzzy_threshold}" | |
| ) | |
| def merge_mappings( | |
| self, | |
| mapping_tables: List[Dict] | |
| ) -> Dict: | |
| """ | |
| ترکیب چندین mapping table به یک global mapping | |
| Args: | |
| mapping_tables: لیست از mapping tables | |
| [ | |
| { | |
| "chunk_id": "chunk_01", | |
| "mapping": { | |
| "company-01": "شرکت پارس", | |
| ... | |
| } | |
| }, | |
| ... | |
| ] | |
| Returns: | |
| { | |
| "global_mapping": { | |
| "company-01": "شرکت پارس", | |
| "person-01": "علی احمدی", | |
| ... | |
| }, | |
| "remapping": [ | |
| { | |
| "chunk_id": "chunk_01", | |
| "mapping": {} # خالی چون اولین chunk نیاز به remap نداره | |
| }, | |
| { | |
| "chunk_id": "chunk_02", | |
| "mapping": { | |
| "company-01": "company-03", # remap شده | |
| "amount-01": "amount-06", | |
| ... | |
| } | |
| }, | |
| ... | |
| ] | |
| } | |
| """ | |
| logger.info(f"🔄 شروع merge {len(mapping_tables)} mapping tables...") | |
| # بررسیهای اولیه | |
| if not mapping_tables: | |
| logger.warning("⚠️ لیست mapping tables خالی است") | |
| return {"global_mapping": {}, "remapping": []} | |
| if len(mapping_tables) == 1: | |
| # فقط یک table - نیازی به merge نیست | |
| logger.info("✅ فقط یک table - بدون merge") | |
| return { | |
| "global_mapping": mapping_tables[0]["mapping"], | |
| "remapping": [ | |
| { | |
| "chunk_id": mapping_tables[0]["chunk_id"], | |
| "mapping": {} | |
| } | |
| ] | |
| } | |
| # شروع با اولین table به عنوان base | |
| global_mapping = dict(mapping_tables[0]["mapping"]) | |
| global_counters = self._extract_counters(global_mapping) | |
| logger.info( | |
| f"📊 Base mapping (chunk 1): " | |
| f"{len(global_mapping)} entities" | |
| ) | |
| # اولین chunk نیاز به remapping نداره | |
| remapping_list = [ | |
| { | |
| "chunk_id": mapping_tables[0]["chunk_id"], | |
| "mapping": {} | |
| } | |
| ] | |
| # Merge بقیه tables | |
| for i, table_data in enumerate(mapping_tables[1:], start=2): | |
| chunk_id = table_data["chunk_id"] | |
| table_mapping = table_data["mapping"] | |
| logger.info(f"🔄 Merging {chunk_id} ({i}/{len(mapping_tables)})...") | |
| logger.info(f" Entities in this chunk: {len(table_mapping)}") | |
| # Merge این table با global | |
| chunk_remapping = self._merge_single_table( | |
| table_mapping, | |
| global_mapping, | |
| global_counters | |
| ) | |
| remapping_list.append({ | |
| "chunk_id": chunk_id, | |
| "mapping": chunk_remapping | |
| }) | |
| logger.info(f" ✅ Remapped {len(chunk_remapping)} entities") | |
| logger.info(f"✅ Merge complete: {len(global_mapping)} total entities") | |
| # لاگ خلاصه | |
| self._log_summary(global_mapping, global_counters) | |
| return { | |
| "global_mapping": global_mapping, | |
| "remapping": remapping_list | |
| } | |
| def _merge_single_table( | |
| self, | |
| table: Dict, | |
| global_mapping: Dict, | |
| global_counters: Dict | |
| ) -> Dict: | |
| """ | |
| Merge یک table با global mapping | |
| Args: | |
| table: mapping table جدید | |
| global_mapping: mapping کل (بهروزرسانی میشود) | |
| global_counters: شمارندههای جاری (بهروزرسانی میشود) | |
| Returns: | |
| remapping dictionary: {old_placeholder: new_placeholder} | |
| """ | |
| remapping = {} | |
| # آمار matching | |
| stats = { | |
| "exact": 0, | |
| "fuzzy": 0, | |
| "new": 0 | |
| } | |
| # گروهبندی entities بر اساس type | |
| entities_by_type = self._group_by_type(table) | |
| # پردازش هر نوع entity | |
| for entity_type in ["person", "company", "amount", "percent"]: | |
| if entity_type not in entities_by_type: | |
| continue | |
| entities = entities_by_type[entity_type] | |
| for old_placeholder, original_value in entities.items(): | |
| # نرمالسازی | |
| normalized = self.normalizer.normalize(original_value, entity_type) | |
| # STEP 1: Exact Matching | |
| matched_placeholder = self._exact_match( | |
| entity_type, | |
| normalized, | |
| global_mapping | |
| ) | |
| if matched_placeholder: | |
| remapping[old_placeholder] = matched_placeholder | |
| stats["exact"] += 1 | |
| continue | |
| # STEP 2: Fuzzy Matching (فقط برای company) | |
| if entity_type == "company": | |
| matched_placeholder, score = self._fuzzy_match( | |
| normalized, | |
| global_mapping | |
| ) | |
| if matched_placeholder: | |
| remapping[old_placeholder] = matched_placeholder | |
| stats["fuzzy"] += 1 | |
| continue | |
| # STEP 3: New Entity | |
| new_placeholder = self._create_new_placeholder( | |
| entity_type, | |
| global_counters | |
| ) | |
| remapping[old_placeholder] = new_placeholder | |
| global_mapping[new_placeholder] = original_value | |
| stats["new"] += 1 | |
| # لاگ آمار | |
| logger.info( | |
| f" 📊 Matching stats: " | |
| f"exact={stats['exact']}, " | |
| f"fuzzy={stats['fuzzy']}, " | |
| f"new={stats['new']}" | |
| ) | |
| return remapping | |
| def _exact_match( | |
| self, | |
| entity_type: str, | |
| normalized_value: str, | |
| global_mapping: Dict | |
| ) -> Optional[str]: | |
| """ | |
| تطابق دقیق (Exact Match) | |
| Args: | |
| entity_type: نوع entity | |
| normalized_value: مقدار نرمالشده | |
| global_mapping: mapping کل | |
| Returns: | |
| placeholder اگر match کرد، وگرنه None | |
| """ | |
| for placeholder, original in global_mapping.items(): | |
| # فقط placeholderهای همون type رو چک کن | |
| if not placeholder.startswith(entity_type): | |
| continue | |
| # نرمالسازی مقدار global | |
| global_normalized = self.normalizer.normalize(original, entity_type) | |
| # تطابق دقیق | |
| if global_normalized == normalized_value: | |
| logger.debug( | |
| f"✅ EXACT MATCH: '{normalized_value}' → {placeholder}" | |
| ) | |
| return placeholder | |
| return None | |
| def _fuzzy_match( | |
| self, | |
| normalized_value: str, | |
| global_mapping: Dict | |
| ) -> Tuple[Optional[str], float]: | |
| """ | |
| تطابق تقریبی (Fuzzy Match) - فقط برای company | |
| از Token Overlap (Jaccard Similarity) استفاده میکند | |
| Args: | |
| normalized_value: نام company نرمالشده | |
| global_mapping: mapping کل | |
| Returns: | |
| (placeholder, score) اگه match کرد، وگرنه (None, 0.0) | |
| """ | |
| best_match = None | |
| best_score = 0.0 | |
| for placeholder, original in global_mapping.items(): | |
| # فقط company ها | |
| if not placeholder.startswith("company"): | |
| continue | |
| # نرمالسازی | |
| global_normalized = self.normalizer.normalize(original, "company") | |
| # محاسبه similarity | |
| score = self._token_overlap_score( | |
| normalized_value, | |
| global_normalized | |
| ) | |
| # بهترین match | |
| if score > best_score and score >= self.fuzzy_threshold: | |
| best_score = score | |
| best_match = placeholder | |
| if best_match: | |
| logger.debug( | |
| f"✅ FUZZY MATCH: '{normalized_value}' → " | |
| f"{best_match} (score={best_score:.2f})" | |
| ) | |
| return (best_match, best_score) if best_match else (None, 0.0) | |
| def _token_overlap_score(self, str1: str, str2: str) -> float: | |
| """ | |
| محاسبه Jaccard Similarity بین دو رشته | |
| Jaccard = |intersection| / |union| | |
| Args: | |
| str1, str2: دو رشته برای مقایسه | |
| Returns: | |
| امتیاز similarity (0.0 تا 1.0) | |
| Examples: | |
| >>> _token_overlap_score("شرکت ملی نفت", "شرکت ملی نفت ایران") | |
| 0.75 # 3/4 = 0.75 | |
| """ | |
| # تقسیم به tokens (کلمات) | |
| tokens1 = set(str1.split()) | |
| tokens2 = set(str2.split()) | |
| # Jaccard similarity | |
| intersection = tokens1 & tokens2 | |
| union = tokens1 | tokens2 | |
| if not union: | |
| return 0.0 | |
| return len(intersection) / len(union) | |
| def _group_by_type(self, mapping: Dict) -> Dict[str, Dict]: | |
| """ | |
| گروهبندی entities بر اساس type | |
| Args: | |
| mapping: یک mapping table | |
| Returns: | |
| { | |
| "person": {"person-01": "علی", ...}, | |
| "company": {"company-01": "پارس", ...}, | |
| ... | |
| } | |
| """ | |
| groups = {} | |
| for placeholder, value in mapping.items(): | |
| # استخراج type از placeholder (قبل از -) | |
| entity_type = placeholder.split('-')[0] | |
| if entity_type not in groups: | |
| groups[entity_type] = {} | |
| groups[entity_type][placeholder] = value | |
| return groups | |
| def _extract_counters(self, mapping: Dict) -> Dict[str, int]: | |
| """ | |
| استخراج counters فعلی از mapping | |
| Args: | |
| mapping: mapping table | |
| Returns: | |
| {"person": 2, "company": 3, ...} | |
| """ | |
| counters = { | |
| "person": 0, | |
| "company": 0, | |
| "amount": 0, | |
| "percent": 0 | |
| } | |
| for placeholder in mapping.keys(): | |
| parts = placeholder.split('-') | |
| if len(parts) != 2: | |
| continue | |
| entity_type = parts[0] | |
| try: | |
| num = int(parts[1]) | |
| except ValueError: | |
| continue | |
| if entity_type in counters: | |
| counters[entity_type] = max(counters[entity_type], num) | |
| return counters | |
| def _create_new_placeholder( | |
| self, | |
| entity_type: str, | |
| counters: Dict[str, int] | |
| ) -> str: | |
| """ | |
| ایجاد placeholder جدید | |
| Args: | |
| entity_type: نوع entity | |
| counters: شمارندههای فعلی (بهروزرسانی میشود) | |
| Returns: | |
| placeholder جدید (مثلاً "company-03") | |
| """ | |
| counters[entity_type] += 1 | |
| new_placeholder = f"{entity_type}-{counters[entity_type]:02d}" | |
| logger.debug(f"🆕 NEW ENTITY: {new_placeholder}") | |
| return new_placeholder | |
| def _log_summary(self, global_mapping: Dict, counters: Dict): | |
| """لاگ خلاصه نتایج merge""" | |
| logger.info("=" * 60) | |
| logger.info("📊 Merge Summary:") | |
| logger.info(f" Total entities: {len(global_mapping)}") | |
| logger.info(f" - Persons: {counters['person']}") | |
| logger.info(f" - Companies: {counters['company']}") | |
| logger.info(f" - Amounts: {counters['amount']}") | |
| logger.info(f" - Percents: {counters['percent']}") | |
| logger.info("=" * 60) | |
| # ✅ تستهای سریع | |
| if __name__ == "__main__": | |
| print("=" * 60) | |
| print("🧪 Testing Merger Module") | |
| print("=" * 60) | |
| merger = EntityMerger(fuzzy_threshold=0.75) | |
| # تست 1: Merge ساده | |
| print("\n📊 Test 1: Simple Merge") | |
| table1 = { | |
| "chunk_id": "chunk_01", | |
| "mapping": { | |
| "company-01": "شرکت پارس", | |
| "company-02": "شرکت صبا", | |
| "person-01": "علی احمدی", | |
| "amount-01": "50 میلیارد ریال" | |
| } | |
| } | |
| table2 = { | |
| "chunk_id": "chunk_02", | |
| "mapping": { | |
| "company-01": "شرکت پارس", # فاصله اضافی - باید match بشه | |
| "company-02": "شرکت جدید", | |
| "person-01": "علی احمدی", # باید match بشه | |
| "person-02": "مریم کریمی", # جدید | |
| "amount-01": "100 میلیارد ریال" # جدید (عدد متفاوت) | |
| } | |
| } | |
| result = merger.merge_mappings([table1, table2]) | |
| print("\nGlobal Mapping:") | |
| for k, v in sorted(result['global_mapping'].items()): | |
| print(f" {k}: {v}") | |
| print("\nRemapping for chunk_02:") | |
| remap = result['remapping'][1]['mapping'] | |
| for old, new in remap.items(): | |
| print(f" {old} → {new}") | |
| # تست 2: Fuzzy matching | |
| print("\n📊 Test 2: Fuzzy Matching (Company)") | |
| table3 = { | |
| "chunk_id": "chunk_03", | |
| "mapping": { | |
| "company-01": "شرکت ملی نفت ایران", | |
| } | |
| } | |
| table4 = { | |
| "chunk_id": "chunk_04", | |
| "mapping": { | |
| "company-01": "شرکت ملی نفت", # substring - باید match بشه | |
| } | |
| } | |
| result2 = merger.merge_mappings([table3, table4]) | |
| print("\nGlobal Mapping:") | |
| for k, v in sorted(result2['global_mapping'].items()): | |
| print(f" {k}: {v}") | |
| print("\nRemapping for chunk_04:") | |
| remap2 = result2['remapping'][1]['mapping'] | |
| for old, new in remap2.items(): | |
| print(f" {old} → {new}") | |
| print("\n" + "=" * 60) | |
| print("✅ All tests completed!") | |
| print("=" * 60) | |