Numidium / app /services /nlp /entity_extractor.py
Madras1's picture
Upload 59 files
3ecc1d1 verified
"""
Entity Extractor Service - LLM-based NER
Uses Cerebras API with Qwen 3 235B for intelligent entity and relationship extraction
"""
import json
import re
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
import httpx
from app.config import settings
@dataclass
class ExtractedEntity:
"""Represents an extracted entity"""
name: str
type: str # person, organization, location, event
role: Optional[str] = None
aliases: Optional[List[str]] = None
description: Optional[str] = None
latitude: Optional[float] = None
longitude: Optional[float] = None
event_date: Optional[str] = None # Date in ISO format (YYYY-MM-DD)
@dataclass
class ExtractedRelationship:
"""Represents a relationship between entities"""
source: str
target: str
relationship_type: str
context: Optional[str] = None
event_date: Optional[str] = None # Date in ISO format (YYYY-MM-DD)
@dataclass
class ExtractedEvent:
"""Represents an extracted event"""
description: str
event_type: Optional[str] = None
date: Optional[str] = None
location: Optional[str] = None
participants: Optional[List[str]] = None
@dataclass
class ExtractionResult:
"""Complete extraction result"""
entities: List[ExtractedEntity]
relationships: List[ExtractedRelationship]
events: List[ExtractedEvent]
raw_response: Optional[str] = None
EXTRACTION_PROMPT = """Você é um especialista em extração de informações estruturadas de textos.
Analise o texto fornecido e extraia TODAS as entidades, relacionamentos e eventos mencionados.
## Regras:
1. Identifique entidades: pessoas, organizações, locais, eventos
2. Para PESSOAS: inclua nome completo (se mencionado ou conhecido), cargo/função
3. Para ORGANIZAÇÕES: inclua nome oficial e siglas
4. Para LOCAIS: seja específico (cidade, país, endereço)
5. Identifique RELACIONAMENTOS entre entidades (quem trabalha onde, quem conhece quem, etc.)
6. Identifique EVENTOS mencionados (reuniões, anúncios, eleições, etc.)
7. EXTRAIA DATAS sempre que mencionadas (formato YYYY-MM-DD ou YYYY se só o ano)
## Formato de resposta (JSON válido):
```json
{{
"entities": [
{{
"name": "Nome Completo",
"type": "person|organization|location|event",
"role": "cargo ou função (opcional)",
"aliases": ["apelidos", "siglas"],
"description": "breve descrição se relevante",
"event_date": "YYYY-MM-DD ou YYYY (data relevante como nascimento, fundação, etc)"
}}
],
"relationships": [
{{
"source": "Nome da Entidade 1",
"target": "Nome da Entidade 2",
"relationship_type": "tipo de relação (trabalha em, preside, fundou, reuniu-se com, etc.)",
"context": "contexto da relação",
"event_date": "YYYY-MM-DD ou YYYY (quando o relacionamento aconteceu/iniciou)"
}}
],
"events": [
{{
"description": "O que aconteceu",
"event_type": "meeting|announcement|election|crime|etc",
"date": "YYYY-MM-DD ou YYYY",
"location": "local se mencionado",
"participants": ["lista de participantes"]
}}
]
}}
```
Retorne APENAS o JSON, sem texto adicional.
## Texto para análise:
{text}
"""
class EntityExtractor:
"""
LLM-based Entity Extractor using Cerebras API
"""
def __init__(self):
self.api_key = settings.cerebras_api_key
self.base_url = "https://api.cerebras.ai/v1"
self.model = "qwen-3-235b-a22b-instruct-2507"
self.timeout = 60.0
async def extract(self, text: str) -> ExtractionResult:
"""
Extract entities, relationships, and events from text using LLM
Args:
text: The text to analyze
Returns:
ExtractionResult with all extracted information
"""
if not self.api_key:
raise ValueError("CEREBRAS_API_KEY not configured. Please set the environment variable.")
if not text or len(text.strip()) < 10:
return ExtractionResult(entities=[], relationships=[], events=[])
# Prepare the prompt
prompt = EXTRACTION_PROMPT.format(text=text)
try:
# Call Cerebras API
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(
f"{self.base_url}/chat/completions",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
json={
"model": self.model,
"messages": [
{
"role": "system",
"content": "Você é um assistente especialista em extração de entidades e relacionamentos. Sempre responda em JSON válido."
},
{
"role": "user",
"content": prompt
}
],
"temperature": 0.1, # Low temperature for consistent extraction
"max_tokens": 4096
}
)
if response.status_code != 200:
error_text = response.text
print(f"Cerebras API error: {response.status_code} - {error_text}")
raise ValueError(f"Cerebras API error: {response.status_code}")
data = response.json()
# Parse the response
raw_content = data["choices"][0]["message"]["content"]
return self._parse_response(raw_content)
except httpx.TimeoutException:
print("Cerebras API timeout")
raise ValueError("API timeout - please try again with shorter text")
except httpx.RequestError as e:
print(f"Cerebras API request error: {e}")
raise ValueError(f"API connection error: {str(e)}")
except KeyError as e:
print(f"Unexpected API response format: {e}")
raise ValueError("Unexpected API response format")
def _parse_response(self, content: str) -> ExtractionResult:
"""Parse the LLM response into structured data"""
try:
# Try to extract JSON from the response
# Sometimes the model wraps it in ```json ... ```
json_match = re.search(r'```json\s*(.*?)\s*```', content, re.DOTALL)
if json_match:
json_str = json_match.group(1)
else:
# Try to find raw JSON
json_match = re.search(r'\{.*\}', content, re.DOTALL)
if json_match:
json_str = json_match.group(0)
else:
json_str = content
data = json.loads(json_str)
# Parse entities
entities = []
for e in data.get("entities", []):
entities.append(ExtractedEntity(
name=e.get("name", ""),
type=e.get("type", "unknown"),
role=e.get("role"),
aliases=e.get("aliases", []),
description=e.get("description"),
event_date=e.get("event_date")
))
# Parse relationships
relationships = []
for r in data.get("relationships", []):
relationships.append(ExtractedRelationship(
source=r.get("source", ""),
target=r.get("target", ""),
relationship_type=r.get("relationship_type", "related_to"),
context=r.get("context"),
event_date=r.get("event_date")
))
# Parse events
events = []
for ev in data.get("events", []):
events.append(ExtractedEvent(
description=ev.get("description", ""),
event_type=ev.get("event_type"),
date=ev.get("date"),
location=ev.get("location"),
participants=ev.get("participants", [])
))
return ExtractionResult(
entities=entities,
relationships=relationships,
events=events,
raw_response=content
)
except json.JSONDecodeError as e:
print(f"Failed to parse LLM response: {e}")
print(f"Raw content: {content}")
return ExtractionResult(
entities=[],
relationships=[],
events=[],
raw_response=content
)
def extract_sync(self, text: str) -> ExtractionResult:
"""
Synchronous version of extract for non-async contexts
"""
import asyncio
return asyncio.run(self.extract(text))
# Singleton instance
entity_extractor = EntityExtractor()