""" Entity Extractor Service - LLM-based NER Uses Cerebras API with Qwen 3 235B for intelligent entity and relationship extraction """ import json import re from typing import Dict, List, Optional, Any from dataclasses import dataclass import httpx from app.config import settings @dataclass class ExtractedEntity: """Represents an extracted entity""" name: str type: str # person, organization, location, event role: Optional[str] = None aliases: Optional[List[str]] = None description: Optional[str] = None latitude: Optional[float] = None longitude: Optional[float] = None event_date: Optional[str] = None # Date in ISO format (YYYY-MM-DD) @dataclass class ExtractedRelationship: """Represents a relationship between entities""" source: str target: str relationship_type: str context: Optional[str] = None event_date: Optional[str] = None # Date in ISO format (YYYY-MM-DD) @dataclass class ExtractedEvent: """Represents an extracted event""" description: str event_type: Optional[str] = None date: Optional[str] = None location: Optional[str] = None participants: Optional[List[str]] = None @dataclass class ExtractionResult: """Complete extraction result""" entities: List[ExtractedEntity] relationships: List[ExtractedRelationship] events: List[ExtractedEvent] raw_response: Optional[str] = None EXTRACTION_PROMPT = """Você é um especialista em extração de informações estruturadas de textos. Analise o texto fornecido e extraia TODAS as entidades, relacionamentos e eventos mencionados. ## Regras: 1. Identifique entidades: pessoas, organizações, locais, eventos 2. Para PESSOAS: inclua nome completo (se mencionado ou conhecido), cargo/função 3. Para ORGANIZAÇÕES: inclua nome oficial e siglas 4. Para LOCAIS: seja específico (cidade, país, endereço) 5. Identifique RELACIONAMENTOS entre entidades (quem trabalha onde, quem conhece quem, etc.) 6. Identifique EVENTOS mencionados (reuniões, anúncios, eleições, etc.) 7. EXTRAIA DATAS sempre que mencionadas (formato YYYY-MM-DD ou YYYY se só o ano) ## Formato de resposta (JSON válido): ```json {{ "entities": [ {{ "name": "Nome Completo", "type": "person|organization|location|event", "role": "cargo ou função (opcional)", "aliases": ["apelidos", "siglas"], "description": "breve descrição se relevante", "event_date": "YYYY-MM-DD ou YYYY (data relevante como nascimento, fundação, etc)" }} ], "relationships": [ {{ "source": "Nome da Entidade 1", "target": "Nome da Entidade 2", "relationship_type": "tipo de relação (trabalha em, preside, fundou, reuniu-se com, etc.)", "context": "contexto da relação", "event_date": "YYYY-MM-DD ou YYYY (quando o relacionamento aconteceu/iniciou)" }} ], "events": [ {{ "description": "O que aconteceu", "event_type": "meeting|announcement|election|crime|etc", "date": "YYYY-MM-DD ou YYYY", "location": "local se mencionado", "participants": ["lista de participantes"] }} ] }} ``` Retorne APENAS o JSON, sem texto adicional. ## Texto para análise: {text} """ class EntityExtractor: """ LLM-based Entity Extractor using Cerebras API """ def __init__(self): self.api_key = settings.cerebras_api_key self.base_url = "https://api.cerebras.ai/v1" self.model = "qwen-3-235b-a22b-instruct-2507" self.timeout = 60.0 async def extract(self, text: str) -> ExtractionResult: """ Extract entities, relationships, and events from text using LLM Args: text: The text to analyze Returns: ExtractionResult with all extracted information """ if not self.api_key: raise ValueError("CEREBRAS_API_KEY not configured. Please set the environment variable.") if not text or len(text.strip()) < 10: return ExtractionResult(entities=[], relationships=[], events=[]) # Prepare the prompt prompt = EXTRACTION_PROMPT.format(text=text) try: # Call Cerebras API async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.post( f"{self.base_url}/chat/completions", headers={ "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" }, json={ "model": self.model, "messages": [ { "role": "system", "content": "Você é um assistente especialista em extração de entidades e relacionamentos. Sempre responda em JSON válido." }, { "role": "user", "content": prompt } ], "temperature": 0.1, # Low temperature for consistent extraction "max_tokens": 4096 } ) if response.status_code != 200: error_text = response.text print(f"Cerebras API error: {response.status_code} - {error_text}") raise ValueError(f"Cerebras API error: {response.status_code}") data = response.json() # Parse the response raw_content = data["choices"][0]["message"]["content"] return self._parse_response(raw_content) except httpx.TimeoutException: print("Cerebras API timeout") raise ValueError("API timeout - please try again with shorter text") except httpx.RequestError as e: print(f"Cerebras API request error: {e}") raise ValueError(f"API connection error: {str(e)}") except KeyError as e: print(f"Unexpected API response format: {e}") raise ValueError("Unexpected API response format") def _parse_response(self, content: str) -> ExtractionResult: """Parse the LLM response into structured data""" try: # Try to extract JSON from the response # Sometimes the model wraps it in ```json ... ``` json_match = re.search(r'```json\s*(.*?)\s*```', content, re.DOTALL) if json_match: json_str = json_match.group(1) else: # Try to find raw JSON json_match = re.search(r'\{.*\}', content, re.DOTALL) if json_match: json_str = json_match.group(0) else: json_str = content data = json.loads(json_str) # Parse entities entities = [] for e in data.get("entities", []): entities.append(ExtractedEntity( name=e.get("name", ""), type=e.get("type", "unknown"), role=e.get("role"), aliases=e.get("aliases", []), description=e.get("description"), event_date=e.get("event_date") )) # Parse relationships relationships = [] for r in data.get("relationships", []): relationships.append(ExtractedRelationship( source=r.get("source", ""), target=r.get("target", ""), relationship_type=r.get("relationship_type", "related_to"), context=r.get("context"), event_date=r.get("event_date") )) # Parse events events = [] for ev in data.get("events", []): events.append(ExtractedEvent( description=ev.get("description", ""), event_type=ev.get("event_type"), date=ev.get("date"), location=ev.get("location"), participants=ev.get("participants", []) )) return ExtractionResult( entities=entities, relationships=relationships, events=events, raw_response=content ) except json.JSONDecodeError as e: print(f"Failed to parse LLM response: {e}") print(f"Raw content: {content}") return ExtractionResult( entities=[], relationships=[], events=[], raw_response=content ) def extract_sync(self, text: str) -> ExtractionResult: """ Synchronous version of extract for non-async contexts """ import asyncio return asyncio.run(self.extract(text)) # Singleton instance entity_extractor = EntityExtractor()