Spaces:
Paused
Paused
File size: 16,770 Bytes
399f3c6 2cb7544 a576aa9 90b33eb 94a7032 3141e61 90b33eb 399f3c6 a576aa9 399f3c6 8da7c51 2cb7544 8da7c51 2cb7544 a576aa9 2cb7544 a576aa9 8da7c51 399f3c6 2cb7544 399f3c6 2cb7544 399f3c6 2cb7544 399f3c6 2cb7544 399f3c6 2cb7544 399f3c6 a576aa9 8da7c51 a576aa9 8da7c51 a576aa9 8da7c51 a576aa9 8da7c51 a576aa9 8da7c51 a576aa9 8da7c51 a576aa9 2cb7544 399f3c6 a576aa9 399f3c6 2cb7544 399f3c6 a576aa9 2cb7544 399f3c6 2cb7544 399f3c6 2cb7544 399f3c6 25e7f71 399f3c6 25e7f71 399f3c6 25e7f71 399f3c6 25e7f71 399f3c6 25e7f71 399f3c6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 |
"""
实体和关系提取模块
使用LLM从文档中提取实体、关系和属性,构建知识图谱的基础
"""
from typing import List, Dict, Tuple
import time
import asyncio
import aiohttp
import json
try:
from langchain_core.prompts import PromptTemplate
except ImportError:
try:
from langchain_core.prompts import PromptTemplate
except ImportError:
from langchain.prompts import PromptTemplate
from langchain_community.chat_models import ChatOllama
from langchain_core.output_parsers import JsonOutputParser
from config import LOCAL_LLM
class EntityExtractor:
"""实体提取器 - 使用LLM从文本中提取实体(支持异步批处理)"""
def __init__(self, timeout: int = 180, max_retries: int = 3, enable_async: bool = True):
"""初始化实体提取器
Args:
timeout: LLM调用超时时间(秒)- 默认180秒以应对首次模型加载
max_retries: 失败重试次数
enable_async: 是否启用异步处理(默认启用)
"""
self.llm = ChatOllama(
model=LOCAL_LLM,
format="json",
temperature=0,
timeout=timeout # 添加超时设置
)
self.max_retries = max_retries
self.enable_async = enable_async
self.ollama_url = "http://localhost:11434/api/generate"
self.timeout = timeout # 保存超时设置供异步使用
# 实体提取提示模板
self.entity_prompt = PromptTemplate(
template="""你是一个专业的实体识别专家。从以下文本中提取所有重要的实体。
实体类型包括:
- PERSON: 人物、作者、研究者
- ORGANIZATION: 组织、机构、公司
- CONCEPT: 技术概念、算法、方法论
- TECHNOLOGY: 具体技术、工具、框架
- PAPER: 论文、出版物
- EVENT: 事件、会议
文本内容:
{text}
请以JSON格式返回,包含以下字段:
{{
"entities": [
{{
"name": "实体名称",
"type": "实体类型",
"description": "简短描述"
}}
]
}}
不要包含前言或解释,只返回JSON。
""",
input_variables=["text"]
)
# 关系提取提示模板
self.relation_prompt = PromptTemplate(
template="""你是一个关系抽取专家。从文本中识别实体之间的关系。
已识别的实体:
{entities}
文本内容:
{text}
请识别实体之间的关系,以JSON格式返回:
{{
"relations": [
{{
"source": "源实体名称",
"target": "目标实体名称",
"relation_type": "关系类型",
"description": "关系描述"
}}
]
}}
关系类型包括: AUTHOR_OF, USES, BASED_ON, RELATED_TO, PART_OF, APPLIES_TO, IMPROVES, CITES
不要包含前言或解释,只返回JSON。
""",
input_variables=["text", "entities"]
)
self.entity_chain = self.entity_prompt | self.llm | JsonOutputParser()
self.relation_chain = self.relation_prompt | self.llm | JsonOutputParser()
def extract_entities(self, text: str) -> List[Dict]:
"""
从文本中提取实体(带重试机制)
Args:
text: 输入文本
Returns:
实体列表
"""
for attempt in range(self.max_retries):
try:
print(f" 🔄 提取实体 (尝试 {attempt + 1}/{self.max_retries})...", end="")
result = self.entity_chain.invoke({"text": text[:2000]}) # 限制长度
entities = result.get("entities", [])
print(f" ✅ 提取到 {len(entities)} 个实体")
return entities
except TimeoutError as e:
print(f" ⏱️ 超时")
if attempt < self.max_retries - 1:
wait_time = (attempt + 1) * 2
print(f" ⏳ 等待 {wait_time} 秒后重试...")
time.sleep(wait_time)
else:
print(f" ❌ 实体提取最终失败: 超时")
return []
except Exception as e:
print(f" ❌ 错误: {str(e)[:100]}")
if attempt < self.max_retries - 1:
time.sleep(1)
else:
print(f" ❌ 实体提取最终失败: {e}")
return []
return []
def extract_relations(self, text: str, entities: List[Dict]) -> List[Dict]:
"""
从文本中提取实体关系(带重试机制)
Args:
text: 输入文本
entities: 已识别的实体列表
Returns:
关系列表
"""
if not entities:
print(" ⚠️ 无实体,跳过关系提取")
return []
for attempt in range(self.max_retries):
try:
print(f" 🔄 提取关系 (尝试 {attempt + 1}/{self.max_retries})...", end="")
entity_names = [e["name"] for e in entities]
result = self.relation_chain.invoke({
"text": text[:2000],
"entities": ", ".join(entity_names)
})
relations = result.get("relations", [])
print(f" ✅ 提取到 {len(relations)} 个关系")
return relations
except TimeoutError as e:
print(f" ⏱️ 超时")
if attempt < self.max_retries - 1:
wait_time = (attempt + 1) * 2
print(f" ⏳ 等待 {wait_time} 秒后重试...")
time.sleep(wait_time)
else:
print(f" ❌ 关系提取最终失败: 超时")
return []
except Exception as e:
print(f" ❌ 错误: {str(e)[:100]}")
if attempt < self.max_retries - 1:
time.sleep(1)
else:
print(f" ❌ 关系提取最终失败: {e}")
return []
return []
async def _async_llm_call(self, prompt: str, session: aiohttp.ClientSession, attempt: int = 0) -> Dict:
"""异步调用 Ollama API"""
try:
timeout = aiohttp.ClientTimeout(
total=self.timeout, # 总超时
connect=30, # 连接超时 30 秒
sock_read=self.timeout # 读取超时
)
async with session.post(
self.ollama_url,
json={
"model": LOCAL_LLM,
"prompt": prompt,
"format": "json",
"stream": False,
"options": {"temperature": 0}
},
timeout=timeout
) as response:
if response.status == 200:
result = await response.json()
return json.loads(result.get('response', '{}'))
else:
raise Exception(f"API返回错误: {response.status}")
except (asyncio.TimeoutError, aiohttp.ClientError) as e:
if attempt < self.max_retries - 1:
wait_time = (attempt + 1) * 3
await asyncio.sleep(wait_time)
return await self._async_llm_call(prompt, session, attempt + 1)
raise Exception(f"连接失败: {str(e)}")
except Exception as e:
if attempt < self.max_retries - 1:
await asyncio.sleep(2)
return await self._async_llm_call(prompt, session, attempt + 1)
raise
async def _extract_entities_async(self, text: str, doc_index: int, session: aiohttp.ClientSession) -> List[Dict]:
"""异步提取实体"""
prompt = self.entity_prompt.format(text=text[:2000])
for attempt in range(self.max_retries):
try:
print(f" [文档 #{doc_index + 1}] 🔄 提取实体 (尝试 {attempt + 1}/{self.max_retries})...", end="")
result = await self._async_llm_call(prompt, session, attempt)
entities = result.get("entities", [])
print(f" ✅ {len(entities)} 个实体")
return entities
except Exception as e:
print(f" ❌ {str(e)[:50]}")
if attempt == self.max_retries - 1:
return []
return []
async def _extract_relations_async(self, text: str, entities: List[Dict], doc_index: int, session: aiohttp.ClientSession) -> List[Dict]:
"""异步提取关系"""
if not entities:
return []
entity_names = [e["name"] for e in entities]
prompt = self.relation_prompt.format(
text=text[:2000],
entities=", ".join(entity_names)
)
for attempt in range(self.max_retries):
try:
print(f" [文档 #{doc_index + 1}] 🔄 提取关系 (尝试 {attempt + 1}/{self.max_retries})...", end="")
result = await self._async_llm_call(prompt, session, attempt)
relations = result.get("relations", [])
print(f" ✅ {len(relations)} 个关系")
return relations
except Exception as e:
print(f" ❌ {str(e)[:50]}")
if attempt == self.max_retries - 1:
return []
return []
async def _extract_from_document_async(self, document_text: str, doc_index: int, session: aiohttp.ClientSession) -> Dict:
"""异步处理单个文档"""
print(f"\n🔍 [文档 #{doc_index + 1}] 开始异步提取...")
# 并发提取实体和关系(先实体,再关系)
entities = await self._extract_entities_async(document_text, doc_index, session)
relations = await self._extract_relations_async(document_text, entities, doc_index, session)
print(f"📊 [文档 #{doc_index + 1}] 完成: {len(entities)} 实体, {len(relations)} 关系")
return {
"entities": entities,
"relations": relations
}
async def extract_batch_async(self, documents: List[Tuple[str, int]]) -> List[Dict]:
"""异步批量处理多个文档
Args:
documents: 文档列表,每个元素为 (document_text, doc_index) 元组
Returns:
提取结果列表
"""
async with aiohttp.ClientSession() as session:
tasks = [
self._extract_from_document_async(doc_text, doc_idx, session)
for doc_text, doc_idx in documents
]
# 并发执行所有任务
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理异常结果
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"⚠️ 文档 #{documents[i][1] + 1} 处理失败: {result}")
processed_results.append({"entities": [], "relations": []})
else:
processed_results.append(result)
return processed_results
def extract_from_document(self, document_text: str, doc_index: int = 0) -> Dict:
"""
从单个文档中提取实体和关系(同步接口,保持向后兼容)
Args:
document_text: 文档文本
doc_index: 文档索引(用于日志)
Returns:
包含实体和关系的字典
"""
# 同步方式调用(保持向后兼容)
print(f"\n🔍 文档 #{doc_index + 1}: 开始提取...")
entities = self.extract_entities(document_text)
relations = self.extract_relations(document_text, entities)
print(f"📊 文档 #{doc_index + 1} 完成: {len(entities)} 实体, {len(relations)} 关系")
return {
"entities": entities,
"relations": relations
}
class EntityDeduplicator:
"""实体去重和合并"""
def __init__(self):
self.llm = ChatOllama(model=LOCAL_LLM, format="json", temperature=0)
self.merge_prompt = PromptTemplate(
template="""判断以下两个实体是否指向同一个对象:
实体1: {entity1_name} - {entity1_desc}
实体2: {entity2_name} - {entity2_desc}
如果是同一个对象,返回:
{{
"is_same": true,
"canonical_name": "标准名称",
"reason": "原因"
}}
如果不是,返回:
{{
"is_same": false,
"reason": "原因"
}}
只返回JSON,不要其他内容。
""",
input_variables=["entity1_name", "entity1_desc", "entity2_name", "entity2_desc"]
)
self.merge_chain = self.merge_prompt | self.llm | JsonOutputParser()
def _is_same_entity(self, entity1: Dict, entity2: Dict) -> bool:
"""
使用LLM判断两个实体是否指向同一个对象
Args:
entity1: 实体1字典
entity2: 实体2字典
Returns:
bool: 是否相同
"""
try:
# 准备输入
input_data = {
"entity1_name": entity1["name"],
"entity1_desc": entity1.get("description", "无描述"),
"entity2_name": entity2["name"],
"entity2_desc": entity2.get("description", "无描述")
}
# 调用LLM
result = self.merge_chain.invoke(input_data)
# 解析结果
return result.get("is_same", False)
except Exception as e:
print(f" ⚠️ LLM判重失败 ({entity1['name']} vs {entity2['name']}): {e}")
return False
def deduplicate_entities(self, entities: List[Dict]) -> Dict:
"""
去重实体列表
Args:
entities: 实体列表
Returns:
包含entities和mapping的字典
"""
if len(entities) <= 1:
# 返回字典格式,保持一致性
entity_mapping = {entity["name"]: entity["name"] for entity in entities} if entities else {}
return {
"entities": entities,
"mapping": entity_mapping
}
print(f"🔄 开始去重 {len(entities)} 个实体...")
# 基于名称和LLM的智能去重
unique_entities = {}
entity_mapping = {} # 映射别名到标准名称
for entity in entities:
name = entity["name"].lower().strip()
# 查找是否有相似实体
merged = False
for canonical_name, canonical_entity in unique_entities.items():
# 1. 简单的字符串匹配(作为预筛选)
# 如果名称完全相同,或者是子串关系,则考虑合并
is_substring = name in canonical_name or canonical_name in name
if name == canonical_name:
# 完全匹配(忽略大小写),直接合并
entity_mapping[entity["name"]] = canonical_entity["name"]
merged = True
break
elif is_substring:
# 子串匹配,使用LLM进行智能确认
# 例如:"Python" 和 "Python Programming Language" -> 合并
# "Java" 和 "JavaScript" -> 不合并
if self._is_same_entity(entity, canonical_entity):
print(f" ✨ 合并: {entity['name']} -> {canonical_entity['name']}")
entity_mapping[entity["name"]] = canonical_entity["name"]
merged = True
break
if not merged:
unique_entities[name] = entity
entity_mapping[entity["name"]] = entity["name"]
print(f"✅ 去重完成,剩余 {len(unique_entities)} 个唯一实体")
return {
"entities": list(unique_entities.values()),
"mapping": entity_mapping
}
def initialize_entity_extractor():
"""初始化实体提取器"""
return EntityExtractor()
|