import uuid import difflib from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select from sqlalchemy import or_ from packages.core.models import EntityMapping, EntityReviewPool from packages.normalizers.company import clean_company_name class EntityResolutionEngine: """ 企业实体解析引擎。 负责名称清洗、去重映射与疑似重复池的管理。 """ def __init__(self, session: AsyncSession): self.session = session self.AUTO_MATCH_THRESHOLD = 0.92 # 高于此分数自动映射 self.REVIEW_THRESHOLD = 0.85 # 介于 0.85 ~ 0.92 之间进入人工确认池 def calculate_similarity(self, a: str, b: str) -> float: """ 计算两个字符串的相似度。 这里使用 Python 自带的 SequenceMatcher。 商业环境中可引入 TF-IDF, Jaro-Winkler 或基于 embedding 的相似度。 """ return difflib.SequenceMatcher(None, a, b).ratio() async def resolve_company_name(self, raw_name: str) -> str: """ 解析公司名称,返回标准化的主体名称。 如果能匹配到现有映射,返回标准名称。 如果在疑似区间,插入 review pool 并返回清洗后名称。 如果是全新名称,创建新实体并返回。 """ if not raw_name: return "" cleaned_name = clean_company_name(raw_name) if not cleaned_name: return "" # 1. 查找精确匹配的映射 stmt = select(EntityMapping).where(EntityMapping.original_name == cleaned_name) result = await self.session.execute(stmt) mapping = result.scalar_one_or_none() if mapping: return mapping.standard_name # 2. 如果没有精确匹配,尝试模糊查找 (这里用简单的 LIKE 前缀或者提取已有实体做比对) # 在真实海量数据场景,需要借助 Elasticsearch 的 Fuzzy 查询或向量搜索。 # 这里为了演示,假设我们查询标准名称首字母相同的若干记录做内存比对。 prefix = cleaned_name[:3] if len(prefix) < 3: # 名字太短,直接作为新实体 return await self._create_new_entity(cleaned_name) stmt = select(EntityMapping.standard_name, EntityMapping.standard_entity_id).where( EntityMapping.standard_name.like(f"{prefix}%") ).distinct() result = await self.session.execute(stmt) candidates = result.all() best_match = None best_score = 0.0 best_entity_id = None for cand_name, entity_id in candidates: score = self.calculate_similarity(cleaned_name, cand_name) if score > best_score: best_score = score best_match = cand_name best_entity_id = entity_id if best_score >= self.AUTO_MATCH_THRESHOLD: # 自动映射 await self._create_mapping(cleaned_name, best_entity_id, best_match, best_score) return best_match elif best_score >= self.REVIEW_THRESHOLD: # 进入人工确认池 await self._create_review_task(cleaned_name, best_match, best_entity_id, best_score) # 在确认前,先作为独立实体对待或返回清洗名称 return await self._create_new_entity(cleaned_name) else: # 分数太低,作为新实体 return await self._create_new_entity(cleaned_name) async def _create_new_entity(self, cleaned_name: str) -> str: entity_id = str(uuid.uuid4()) await self._create_mapping(cleaned_name, entity_id, cleaned_name, 1.0) return cleaned_name async def _create_mapping(self, original_name: str, entity_id: str, standard_name: str, score: float): mapping = EntityMapping( id=str(uuid.uuid4()), original_name=original_name, standard_entity_id=entity_id, standard_name=standard_name, confidence_score=score ) self.session.add(mapping) # 忽略唯一键冲突 (如果有并发情况) try: await self.session.flush() except Exception: await self.session.rollback() async def _create_review_task(self, source_name: str, target_name: str, target_entity_id: str, score: float): # 检查是否已存在 stmt = select(EntityReviewPool).where( EntityReviewPool.source_name == source_name, EntityReviewPool.target_entity_id == target_entity_id ) result = await self.session.execute(stmt) existing = result.scalar_one_or_none() if not existing: task = EntityReviewPool( id=str(uuid.uuid4()), source_name=source_name, target_name=target_name, target_entity_id=target_entity_id, similarity_score=score, status="PENDING" ) self.session.add(task) try: await self.session.flush() except Exception: await self.session.rollback()