Spaces:
Sleeping
Sleeping
| import uuid | |
| import difflib | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from sqlalchemy.future import select | |
| from sqlalchemy import or_ | |
| from packages.core.models import EntityMapping, EntityReviewPool | |
| from packages.normalizers.company import clean_company_name | |
| class EntityResolutionEngine: | |
| """ | |
| 企业实体解析引擎。 | |
| 负责名称清洗、去重映射与疑似重复池的管理。 | |
| """ | |
| def __init__(self, session: AsyncSession): | |
| self.session = session | |
| self.AUTO_MATCH_THRESHOLD = 0.92 # 高于此分数自动映射 | |
| self.REVIEW_THRESHOLD = 0.85 # 介于 0.85 ~ 0.92 之间进入人工确认池 | |
| def calculate_similarity(self, a: str, b: str) -> float: | |
| """ | |
| 计算两个字符串的相似度。 | |
| 这里使用 Python 自带的 SequenceMatcher。 | |
| 商业环境中可引入 TF-IDF, Jaro-Winkler 或基于 embedding 的相似度。 | |
| """ | |
| return difflib.SequenceMatcher(None, a, b).ratio() | |
| async def resolve_company_name(self, raw_name: str) -> str: | |
| """ | |
| 解析公司名称,返回标准化的主体名称。 | |
| 如果能匹配到现有映射,返回标准名称。 | |
| 如果在疑似区间,插入 review pool 并返回清洗后名称。 | |
| 如果是全新名称,创建新实体并返回。 | |
| """ | |
| if not raw_name: | |
| return "" | |
| cleaned_name = clean_company_name(raw_name) | |
| if not cleaned_name: | |
| return "" | |
| # 1. 查找精确匹配的映射 | |
| stmt = select(EntityMapping).where(EntityMapping.original_name == cleaned_name) | |
| result = await self.session.execute(stmt) | |
| mapping = result.scalar_one_or_none() | |
| if mapping: | |
| return mapping.standard_name | |
| # 2. 如果没有精确匹配,尝试模糊查找 (这里用简单的 LIKE 前缀或者提取已有实体做比对) | |
| # 在真实海量数据场景,需要借助 Elasticsearch 的 Fuzzy 查询或向量搜索。 | |
| # 这里为了演示,假设我们查询标准名称首字母相同的若干记录做内存比对。 | |
| prefix = cleaned_name[:3] | |
| if len(prefix) < 3: | |
| # 名字太短,直接作为新实体 | |
| return await self._create_new_entity(cleaned_name) | |
| stmt = select(EntityMapping.standard_name, EntityMapping.standard_entity_id).where( | |
| EntityMapping.standard_name.like(f"{prefix}%") | |
| ).distinct() | |
| result = await self.session.execute(stmt) | |
| candidates = result.all() | |
| best_match = None | |
| best_score = 0.0 | |
| best_entity_id = None | |
| for cand_name, entity_id in candidates: | |
| score = self.calculate_similarity(cleaned_name, cand_name) | |
| if score > best_score: | |
| best_score = score | |
| best_match = cand_name | |
| best_entity_id = entity_id | |
| if best_score >= self.AUTO_MATCH_THRESHOLD: | |
| # 自动映射 | |
| await self._create_mapping(cleaned_name, best_entity_id, best_match, best_score) | |
| return best_match | |
| elif best_score >= self.REVIEW_THRESHOLD: | |
| # 进入人工确认池 | |
| await self._create_review_task(cleaned_name, best_match, best_entity_id, best_score) | |
| # 在确认前,先作为独立实体对待或返回清洗名称 | |
| return await self._create_new_entity(cleaned_name) | |
| else: | |
| # 分数太低,作为新实体 | |
| return await self._create_new_entity(cleaned_name) | |
| async def _create_new_entity(self, cleaned_name: str) -> str: | |
| entity_id = str(uuid.uuid4()) | |
| await self._create_mapping(cleaned_name, entity_id, cleaned_name, 1.0) | |
| return cleaned_name | |
| async def _create_mapping(self, original_name: str, entity_id: str, standard_name: str, score: float): | |
| mapping = EntityMapping( | |
| id=str(uuid.uuid4()), | |
| original_name=original_name, | |
| standard_entity_id=entity_id, | |
| standard_name=standard_name, | |
| confidence_score=score | |
| ) | |
| self.session.add(mapping) | |
| # 忽略唯一键冲突 (如果有并发情况) | |
| try: | |
| await self.session.flush() | |
| except Exception: | |
| await self.session.rollback() | |
| async def _create_review_task(self, source_name: str, target_name: str, target_entity_id: str, score: float): | |
| # 检查是否已存在 | |
| stmt = select(EntityReviewPool).where( | |
| EntityReviewPool.source_name == source_name, | |
| EntityReviewPool.target_entity_id == target_entity_id | |
| ) | |
| result = await self.session.execute(stmt) | |
| existing = result.scalar_one_or_none() | |
| if not existing: | |
| task = EntityReviewPool( | |
| id=str(uuid.uuid4()), | |
| source_name=source_name, | |
| target_name=target_name, | |
| target_entity_id=target_entity_id, | |
| similarity_score=score, | |
| status="PENDING" | |
| ) | |
| self.session.add(task) | |
| try: | |
| await self.session.flush() | |
| except Exception: | |
| await self.session.rollback() | |