chunking_test / modules /merger.py
leilaghomashchi's picture
Upload 5 files
dfbf6c3 verified
"""
🔄 Intelligent Entity Merge
ترکیب هوشمند mapping tables از chunks مختلف
"""
import logging
from typing import Dict, List, Optional, Tuple
from .normalizer import EntityNormalizer
logger = logging.getLogger(__name__)
class EntityMerger:
"""
کلاس برای ترکیب هوشمند mapping tables
این کلاس mapping های چندین chunk را با استفاده از:
- Normalization (نرمال‌سازی)
- Exact Matching (تطابق دقیق)
- Fuzzy Matching (تطابق تقریبی - فقط برای company)
ترکیب می‌کند و یک Global Mapping ایجاد می‌کند
"""
def __init__(self, fuzzy_threshold: float = 0.75):
"""
مقداردهی اولیه merger
Args:
fuzzy_threshold: حد آستانه برای fuzzy matching (0.0 تا 1.0)
پیش‌فرض: 0.75 (محافظه‌کارانه)
"""
if not 0.0 <= fuzzy_threshold <= 1.0:
raise ValueError("fuzzy_threshold باید بین 0.0 و 1.0 باشد")
self.normalizer = EntityNormalizer()
self.fuzzy_threshold = fuzzy_threshold
logger.info(
f"✅ EntityMerger initialized: "
f"fuzzy_threshold={fuzzy_threshold}"
)
def merge_mappings(
self,
mapping_tables: List[Dict]
) -> Dict:
"""
ترکیب چندین mapping table به یک global mapping
Args:
mapping_tables: لیست از mapping tables
[
{
"chunk_id": "chunk_01",
"mapping": {
"company-01": "شرکت پارس",
...
}
},
...
]
Returns:
{
"global_mapping": {
"company-01": "شرکت پارس",
"person-01": "علی احمدی",
...
},
"remapping": [
{
"chunk_id": "chunk_01",
"mapping": {} # خالی چون اولین chunk نیاز به remap نداره
},
{
"chunk_id": "chunk_02",
"mapping": {
"company-01": "company-03", # remap شده
"amount-01": "amount-06",
...
}
},
...
]
}
"""
logger.info(f"🔄 شروع merge {len(mapping_tables)} mapping tables...")
# بررسی‌های اولیه
if not mapping_tables:
logger.warning("⚠️ لیست mapping tables خالی است")
return {"global_mapping": {}, "remapping": []}
if len(mapping_tables) == 1:
# فقط یک table - نیازی به merge نیست
logger.info("✅ فقط یک table - بدون merge")
return {
"global_mapping": mapping_tables[0]["mapping"],
"remapping": [
{
"chunk_id": mapping_tables[0]["chunk_id"],
"mapping": {}
}
]
}
# شروع با اولین table به عنوان base
global_mapping = dict(mapping_tables[0]["mapping"])
global_counters = self._extract_counters(global_mapping)
logger.info(
f"📊 Base mapping (chunk 1): "
f"{len(global_mapping)} entities"
)
# اولین chunk نیاز به remapping نداره
remapping_list = [
{
"chunk_id": mapping_tables[0]["chunk_id"],
"mapping": {}
}
]
# Merge بقیه tables
for i, table_data in enumerate(mapping_tables[1:], start=2):
chunk_id = table_data["chunk_id"]
table_mapping = table_data["mapping"]
logger.info(f"🔄 Merging {chunk_id} ({i}/{len(mapping_tables)})...")
logger.info(f" Entities in this chunk: {len(table_mapping)}")
# Merge این table با global
chunk_remapping = self._merge_single_table(
table_mapping,
global_mapping,
global_counters
)
remapping_list.append({
"chunk_id": chunk_id,
"mapping": chunk_remapping
})
logger.info(f" ✅ Remapped {len(chunk_remapping)} entities")
logger.info(f"✅ Merge complete: {len(global_mapping)} total entities")
# لاگ خلاصه
self._log_summary(global_mapping, global_counters)
return {
"global_mapping": global_mapping,
"remapping": remapping_list
}
def _merge_single_table(
self,
table: Dict,
global_mapping: Dict,
global_counters: Dict
) -> Dict:
"""
Merge یک table با global mapping
Args:
table: mapping table جدید
global_mapping: mapping کل (به‌روزرسانی می‌شود)
global_counters: شمارنده‌های جاری (به‌روزرسانی می‌شود)
Returns:
remapping dictionary: {old_placeholder: new_placeholder}
"""
remapping = {}
# آمار matching
stats = {
"exact": 0,
"fuzzy": 0,
"new": 0
}
# گروه‌بندی entities بر اساس type
entities_by_type = self._group_by_type(table)
# پردازش هر نوع entity
for entity_type in ["person", "company", "amount", "percent"]:
if entity_type not in entities_by_type:
continue
entities = entities_by_type[entity_type]
for old_placeholder, original_value in entities.items():
# نرمال‌سازی
normalized = self.normalizer.normalize(original_value, entity_type)
# STEP 1: Exact Matching
matched_placeholder = self._exact_match(
entity_type,
normalized,
global_mapping
)
if matched_placeholder:
remapping[old_placeholder] = matched_placeholder
stats["exact"] += 1
continue
# STEP 2: Fuzzy Matching (فقط برای company)
if entity_type == "company":
matched_placeholder, score = self._fuzzy_match(
normalized,
global_mapping
)
if matched_placeholder:
remapping[old_placeholder] = matched_placeholder
stats["fuzzy"] += 1
continue
# STEP 3: New Entity
new_placeholder = self._create_new_placeholder(
entity_type,
global_counters
)
remapping[old_placeholder] = new_placeholder
global_mapping[new_placeholder] = original_value
stats["new"] += 1
# لاگ آمار
logger.info(
f" 📊 Matching stats: "
f"exact={stats['exact']}, "
f"fuzzy={stats['fuzzy']}, "
f"new={stats['new']}"
)
return remapping
def _exact_match(
self,
entity_type: str,
normalized_value: str,
global_mapping: Dict
) -> Optional[str]:
"""
تطابق دقیق (Exact Match)
Args:
entity_type: نوع entity
normalized_value: مقدار نرمال‌شده
global_mapping: mapping کل
Returns:
placeholder اگر match کرد، وگرنه None
"""
for placeholder, original in global_mapping.items():
# فقط placeholderهای همون type رو چک کن
if not placeholder.startswith(entity_type):
continue
# نرمال‌سازی مقدار global
global_normalized = self.normalizer.normalize(original, entity_type)
# تطابق دقیق
if global_normalized == normalized_value:
logger.debug(
f"✅ EXACT MATCH: '{normalized_value}' → {placeholder}"
)
return placeholder
return None
def _fuzzy_match(
self,
normalized_value: str,
global_mapping: Dict
) -> Tuple[Optional[str], float]:
"""
تطابق تقریبی (Fuzzy Match) - فقط برای company
از Token Overlap (Jaccard Similarity) استفاده می‌کند
Args:
normalized_value: نام company نرمال‌شده
global_mapping: mapping کل
Returns:
(placeholder, score) اگه match کرد، وگرنه (None, 0.0)
"""
best_match = None
best_score = 0.0
for placeholder, original in global_mapping.items():
# فقط company ها
if not placeholder.startswith("company"):
continue
# نرمال‌سازی
global_normalized = self.normalizer.normalize(original, "company")
# محاسبه similarity
score = self._token_overlap_score(
normalized_value,
global_normalized
)
# بهترین match
if score > best_score and score >= self.fuzzy_threshold:
best_score = score
best_match = placeholder
if best_match:
logger.debug(
f"✅ FUZZY MATCH: '{normalized_value}' → "
f"{best_match} (score={best_score:.2f})"
)
return (best_match, best_score) if best_match else (None, 0.0)
def _token_overlap_score(self, str1: str, str2: str) -> float:
"""
محاسبه Jaccard Similarity بین دو رشته
Jaccard = |intersection| / |union|
Args:
str1, str2: دو رشته برای مقایسه
Returns:
امتیاز similarity (0.0 تا 1.0)
Examples:
>>> _token_overlap_score("شرکت ملی نفت", "شرکت ملی نفت ایران")
0.75 # 3/4 = 0.75
"""
# تقسیم به tokens (کلمات)
tokens1 = set(str1.split())
tokens2 = set(str2.split())
# Jaccard similarity
intersection = tokens1 & tokens2
union = tokens1 | tokens2
if not union:
return 0.0
return len(intersection) / len(union)
def _group_by_type(self, mapping: Dict) -> Dict[str, Dict]:
"""
گروه‌بندی entities بر اساس type
Args:
mapping: یک mapping table
Returns:
{
"person": {"person-01": "علی", ...},
"company": {"company-01": "پارس", ...},
...
}
"""
groups = {}
for placeholder, value in mapping.items():
# استخراج type از placeholder (قبل از -)
entity_type = placeholder.split('-')[0]
if entity_type not in groups:
groups[entity_type] = {}
groups[entity_type][placeholder] = value
return groups
def _extract_counters(self, mapping: Dict) -> Dict[str, int]:
"""
استخراج counters فعلی از mapping
Args:
mapping: mapping table
Returns:
{"person": 2, "company": 3, ...}
"""
counters = {
"person": 0,
"company": 0,
"amount": 0,
"percent": 0
}
for placeholder in mapping.keys():
parts = placeholder.split('-')
if len(parts) != 2:
continue
entity_type = parts[0]
try:
num = int(parts[1])
except ValueError:
continue
if entity_type in counters:
counters[entity_type] = max(counters[entity_type], num)
return counters
def _create_new_placeholder(
self,
entity_type: str,
counters: Dict[str, int]
) -> str:
"""
ایجاد placeholder جدید
Args:
entity_type: نوع entity
counters: شمارنده‌های فعلی (به‌روزرسانی می‌شود)
Returns:
placeholder جدید (مثلاً "company-03")
"""
counters[entity_type] += 1
new_placeholder = f"{entity_type}-{counters[entity_type]:02d}"
logger.debug(f"🆕 NEW ENTITY: {new_placeholder}")
return new_placeholder
def _log_summary(self, global_mapping: Dict, counters: Dict):
"""لاگ خلاصه نتایج merge"""
logger.info("=" * 60)
logger.info("📊 Merge Summary:")
logger.info(f" Total entities: {len(global_mapping)}")
logger.info(f" - Persons: {counters['person']}")
logger.info(f" - Companies: {counters['company']}")
logger.info(f" - Amounts: {counters['amount']}")
logger.info(f" - Percents: {counters['percent']}")
logger.info("=" * 60)
# ✅ تست‌های سریع
if __name__ == "__main__":
print("=" * 60)
print("🧪 Testing Merger Module")
print("=" * 60)
merger = EntityMerger(fuzzy_threshold=0.75)
# تست 1: Merge ساده
print("\n📊 Test 1: Simple Merge")
table1 = {
"chunk_id": "chunk_01",
"mapping": {
"company-01": "شرکت پارس",
"company-02": "شرکت صبا",
"person-01": "علی احمدی",
"amount-01": "50 میلیارد ریال"
}
}
table2 = {
"chunk_id": "chunk_02",
"mapping": {
"company-01": "شرکت پارس", # فاصله اضافی - باید match بشه
"company-02": "شرکت جدید",
"person-01": "علی احمدی", # باید match بشه
"person-02": "مریم کریمی", # جدید
"amount-01": "100 میلیارد ریال" # جدید (عدد متفاوت)
}
}
result = merger.merge_mappings([table1, table2])
print("\nGlobal Mapping:")
for k, v in sorted(result['global_mapping'].items()):
print(f" {k}: {v}")
print("\nRemapping for chunk_02:")
remap = result['remapping'][1]['mapping']
for old, new in remap.items():
print(f" {old}{new}")
# تست 2: Fuzzy matching
print("\n📊 Test 2: Fuzzy Matching (Company)")
table3 = {
"chunk_id": "chunk_03",
"mapping": {
"company-01": "شرکت ملی نفت ایران",
}
}
table4 = {
"chunk_id": "chunk_04",
"mapping": {
"company-01": "شرکت ملی نفت", # substring - باید match بشه
}
}
result2 = merger.merge_mappings([table3, table4])
print("\nGlobal Mapping:")
for k, v in sorted(result2['global_mapping'].items()):
print(f" {k}: {v}")
print("\nRemapping for chunk_04:")
remap2 = result2['remapping'][1]['mapping']
for old, new in remap2.items():
print(f" {old}{new}")
print("\n" + "=" * 60)
print("✅ All tests completed!")
print("=" * 60)