customs-data / packages /core /entity_resolution.py
3v324v23's picture
feat: 完成多国家海关数据源接入与核心功能全量迭代
8ca8917
Raw
History Blame Contribute Delete
5.35 kB
import uuid
import difflib
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy import or_
from packages.core.models import EntityMapping, EntityReviewPool
from packages.normalizers.company import clean_company_name
class EntityResolutionEngine:
"""
企业实体解析引擎。
负责名称清洗、去重映射与疑似重复池的管理。
"""
def __init__(self, session: AsyncSession):
self.session = session
self.AUTO_MATCH_THRESHOLD = 0.92 # 高于此分数自动映射
self.REVIEW_THRESHOLD = 0.85 # 介于 0.85 ~ 0.92 之间进入人工确认池
def calculate_similarity(self, a: str, b: str) -> float:
"""
计算两个字符串的相似度。
这里使用 Python 自带的 SequenceMatcher。
商业环境中可引入 TF-IDF, Jaro-Winkler 或基于 embedding 的相似度。
"""
return difflib.SequenceMatcher(None, a, b).ratio()
async def resolve_company_name(self, raw_name: str) -> str:
"""
解析公司名称,返回标准化的主体名称。
如果能匹配到现有映射,返回标准名称。
如果在疑似区间,插入 review pool 并返回清洗后名称。
如果是全新名称,创建新实体并返回。
"""
if not raw_name:
return ""
cleaned_name = clean_company_name(raw_name)
if not cleaned_name:
return ""
# 1. 查找精确匹配的映射
stmt = select(EntityMapping).where(EntityMapping.original_name == cleaned_name)
result = await self.session.execute(stmt)
mapping = result.scalar_one_or_none()
if mapping:
return mapping.standard_name
# 2. 如果没有精确匹配,尝试模糊查找 (这里用简单的 LIKE 前缀或者提取已有实体做比对)
# 在真实海量数据场景,需要借助 Elasticsearch 的 Fuzzy 查询或向量搜索。
# 这里为了演示,假设我们查询标准名称首字母相同的若干记录做内存比对。
prefix = cleaned_name[:3]
if len(prefix) < 3:
# 名字太短,直接作为新实体
return await self._create_new_entity(cleaned_name)
stmt = select(EntityMapping.standard_name, EntityMapping.standard_entity_id).where(
EntityMapping.standard_name.like(f"{prefix}%")
).distinct()
result = await self.session.execute(stmt)
candidates = result.all()
best_match = None
best_score = 0.0
best_entity_id = None
for cand_name, entity_id in candidates:
score = self.calculate_similarity(cleaned_name, cand_name)
if score > best_score:
best_score = score
best_match = cand_name
best_entity_id = entity_id
if best_score >= self.AUTO_MATCH_THRESHOLD:
# 自动映射
await self._create_mapping(cleaned_name, best_entity_id, best_match, best_score)
return best_match
elif best_score >= self.REVIEW_THRESHOLD:
# 进入人工确认池
await self._create_review_task(cleaned_name, best_match, best_entity_id, best_score)
# 在确认前,先作为独立实体对待或返回清洗名称
return await self._create_new_entity(cleaned_name)
else:
# 分数太低,作为新实体
return await self._create_new_entity(cleaned_name)
async def _create_new_entity(self, cleaned_name: str) -> str:
entity_id = str(uuid.uuid4())
await self._create_mapping(cleaned_name, entity_id, cleaned_name, 1.0)
return cleaned_name
async def _create_mapping(self, original_name: str, entity_id: str, standard_name: str, score: float):
mapping = EntityMapping(
id=str(uuid.uuid4()),
original_name=original_name,
standard_entity_id=entity_id,
standard_name=standard_name,
confidence_score=score
)
self.session.add(mapping)
# 忽略唯一键冲突 (如果有并发情况)
try:
await self.session.flush()
except Exception:
await self.session.rollback()
async def _create_review_task(self, source_name: str, target_name: str, target_entity_id: str, score: float):
# 检查是否已存在
stmt = select(EntityReviewPool).where(
EntityReviewPool.source_name == source_name,
EntityReviewPool.target_entity_id == target_entity_id
)
result = await self.session.execute(stmt)
existing = result.scalar_one_or_none()
if not existing:
task = EntityReviewPool(
id=str(uuid.uuid4()),
source_name=source_name,
target_name=target_name,
target_entity_id=target_entity_id,
similarity_score=score,
status="PENDING"
)
self.session.add(task)
try:
await self.session.flush()
except Exception:
await self.session.rollback()