import os import json import time from typing import List, Optional, Dict, Any from pydantic import BaseModel, Field from langchain_core.messages import SystemMessage, HumanMessage from langchain_groq import ChatGroq from dotenv import load_dotenv load_dotenv() # --- MODELLI PYDANTIC (Contratti Formali) --- # PASS 1 - Livello 1 class MacroCategoryCandidate(BaseModel): category: str = Field(description="URI della macro-categoria (es. arco:CulturalProperty)") reasoning: str = Field(description="Perché questa macro-categoria è appropriata per l'entità") class EntityMacroClassification(BaseModel): name: str = Field(description="Nome dell'entità come appare nel testo") candidates: List[MacroCategoryCandidate] = Field( description="1-2 macro-categorie candidate, ordinate per preferenza (la prima è la più probabile)", min_length=1, max_length=2 ) class MacroClassificationResult(BaseModel): """Output del Livello 1""" entities: List[EntityMacroClassification] # PASS 1 - Livello 2 class TypedEntity(BaseModel): name: str = Field(description="Nome dell'entità come appare nel testo") type: str = Field(description="URI del tipo ontologico finale (es. arco:ArchaeologicalProperty)") class TypeInferenceResult(BaseModel): """Output del Livello 2""" entities: List[TypedEntity] # PASS 2 - Extraction class GraphTriple(BaseModel): subject: str subject_type: str = Field(description="Tipo ontologico del soggetto (da Pass 1)") predicate: str object: str object_type: str = Field(description="Tipo ontologico dell'oggetto (da Pass 1)") evidence: str = Field(description="Span testuale esatto dal chunk da cui la relazione è estratta") reasoning: str = Field(description="Perché questo predicato è stato scelto per questa coppia di entità") source: Optional[str] = Field(None) # Mantenuto per compatibilità con il batching Neo4j a valle class KnowledgeGraphExtraction(BaseModel): triples: List[GraphTriple] class NeuroSymbolicExtractor: def __init__(self, index_path="./ontology/domain_index.json"): print("🧠 Inizializzazione TDDT Extractor (Type-Driven Domain Traversal)...") # google_api_key = os.getenv("GOOGLE_API_KEY") # if not google_api_key: # raise ValueError("❌ GOOGLE_API_KEY mancante. Richiesta per Gemini 2.0 Flash.") # # Inizializzo l'LLM primario. Temperatura 0 per massimizzare il determinismo. # self.llm = ChatGoogleGenerativeAI( # model="gemini-2.0-flash", # temperature=0, # api_key=google_api_key # ) groq_api_key = os.getenv("GROQ_API_KEY") if not groq_api_key: raise ValueError("❌ GROQ_API_KEY mancante nel file .env.") # Inizializzo l'LLM primario su Groq. self.llm = ChatGroq( model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0, api_key=groq_api_key, max_retries=5 # Aumentiamo i retry interni di LangChain ) # Inizializzo le chain con structured output self.chain_pass1_l1 = self.llm.with_structured_output(MacroClassificationResult) self.chain_pass1_l2 = self.llm.with_structured_output(TypeInferenceResult) self.chain_pass2 = self.llm.with_structured_output(KnowledgeGraphExtraction) # Caricamento del Domain Index in RAM self.domain_index = {"classes": {}, "properties_by_domain": {}} if os.path.exists(index_path): with open(index_path, 'r', encoding='utf-8') as f: self.domain_index = json.load(f) print(f"✅ Domain Index caricato: {len(self.domain_index['classes'])} classi disponibili.") else: print(f"⚠️ Attenzione: Domain Index non trovato al percorso {index_path}") self.root_classes = self._extract_root_classes() def _extract_root_classes(self) -> Dict[str, Any]: """Estrae il primo livello ontologico per la macro-categorizzazione.""" roots = {} for uri, data in self.domain_index["classes"].items(): # Consideriamo root le classi senza padri o figlie dirette di owl:Thing / l0:Entity if not data["parents"] or "owl:Thing" in data["parents"] or "l0:Entity" in data["parents"]: roots[uri] = data return roots def _get_subclasses(self, parent_uris: List[str]) -> Dict[str, Any]: """Recupera tutte le sottoclassi dirette (e se stesse) dai rami indicati.""" subclasses = {} for uri, data in self.domain_index["classes"].items(): if uri in parent_uris or any(p in parent_uris for p in data["parents"]): subclasses[uri] = data return subclasses def _execute_with_retry(self, chain, prompt_messages, max_retries=4): """Self-correction loop con Exponential Backoff per Rate Limits.""" base_delay = 5 for attempt in range(max_retries): try: result = chain.invoke(prompt_messages) return result except Exception as e: error_msg = str(e).upper() print(error_msg) if "429" in error_msg or "RESOURCE_EXHAUSTED" in error_msg: wait_time = base_delay * (2 ** attempt) print(f"⏳ [Rate Limit] Quota superata. Attendo {wait_time}s prima di riprovare (Tentativo {attempt+1}/{max_retries})...") time.sleep(wait_time) else: print(f"⚠️ Errore (Tentativo {attempt+1}/{max_retries}): {e}") if attempt == max_retries - 1: print("❌ Fallimento critico del task LLM.") return None return None def extract(self, text_chunk: str, source_id: str = "unknown") -> KnowledgeGraphExtraction: print(f"\n🧩 Processing {source_id} (TDDT Mode)...") # ========================================== # PASS 1 - LIVELLO 1: Macro-Categorizzazione # ========================================== roots_text = "\n".join([f"- {uri} — \"{data['label']}: {data['description']}\"" for uri, data in self.root_classes.items()]) sys_l1 = f"""Sei un estrattore esperto di entità semantiche per il dominio dei Beni Culturali. Il tuo unico compito è individuare le entità rilevanti nel testo e classificarle. MACRO-CATEGORIE DISPONIBILI: {roots_text} REGOLE DI ESTRAZIONE (TASSATIVE E OBBLIGATORIE): 1. DIVIETO DI ALLUCINAZIONE URI: Usa ESCLUSIVAMENTE gli URI esatti elencati sopra. È severamente vietato usare etichette inventate come "Person", "Location" o "Group". Se devi categorizzare una persona, usa l'URI corrispondente agli Agenti (es. core:Agent o l0:Agent). 2. RUMORE EDITORIALE: IGNORA e non estrarre MAI riferimenti alla struttura del libro o alle immagini. È vietato estrarre entità che contengono o sono composte da: "Capitolo", "Sezione", "Tavola", "Fig.", "Figura", "Pagina", "Pag.". 3. Estrai SOLO veri monumenti storici, luoghi geografici reali, personaggi storici, popoli e concetti architettonici. 4. Puoi assegnare fino a 2 candidati per entità, ordinandoli per confidenza logica. """ res_l1: MacroClassificationResult = self._execute_with_retry( self.chain_pass1_l1, [SystemMessage(content=sys_l1), HumanMessage(content=text_chunk)] ) if not res_l1 or not res_l1.entities: print(" -> Nessuna entità trovata al Livello 1.") return KnowledgeGraphExtraction(triples=[]) # ========================================== # PASS 1 - LIVELLO 2: Specializzazione # ========================================== # Raccogliamo tutti i rami candidati da esplorare candidate_uris = set() for ent in res_l1.entities: for cand in ent.candidates: candidate_uris.add(cand.category) subclasses = self._get_subclasses(list(candidate_uris)) # Raggruppo le sottoclassi per visualizzarle ordinate nel prompt subs_text_blocks = [] for parent in candidate_uris: subs_text_blocks.append(f"\n[{parent} →]") children = {k: v for k, v in subclasses.items() if parent in v["parents"] or k == parent} for uri, data in children.items(): subs_text_blocks.append(f"- {uri} — \"{data['label']}: {data['description']}\"") subs_text = "\n".join(subs_text_blocks) ent_text = "\n".join([f"- '{e.name}': " + ", ".join([f"{c.category}" for c in e.candidates]) for e in res_l1.entities]) sys_l2 = f"""Per ciascuna entità identificata, scegli il sotto-tipo più specifico tra quelli elencati. Se non c'è un sotto-tipo rilevante per un'entità, conferma la sua macro-categoria. ENTITÀ IDENTIFICATE (con macro-categorie candidate): {ent_text} SOTTO-TIPI DISPONIBILI: {subs_text}""" res_l2: TypeInferenceResult = self._execute_with_retry( self.chain_pass1_l2, [SystemMessage(content=sys_l2), HumanMessage(content=text_chunk)] ) if not res_l2 or not res_l2.entities: return KnowledgeGraphExtraction(triples=[]) # ========================================== # PASS 2: Estrazione Relazionale # ========================================== # Mappa dei tipi finali typed_entities_map = {e.name: e.type.strip() for e in res_l2.entities} # Recupero deterministico delle proprietà valid_properties = [] seen_props = set() for ent_type in typed_entities_map.values(): props = self.domain_index["properties_by_domain"].get(ent_type, []) for p in props: if p["id"] not in seen_props: valid_properties.append(f"- {p['id']}: {p['inherited_from']} → {p['range']} (Label: {p['label']})") seen_props.add(p["id"]) props_text = "\n".join(valid_properties) if valid_properties else "- (Nessuna proprietà specifica trovata. Usa skos:related)" ent_final_text = "\n".join([f"- {name} ({uri_type})" for name, uri_type in typed_entities_map.items()]) sys_ext = f"""Estrai le relazioni semantiche tra le entità presenti nel testo. ENTITÀ IDENTIFICATE (con il loro tipo): {ent_final_text} PROPRIETÀ CONSENTITE (con vincoli domain → range): {props_text} - skos:related: Qualsiasi → Qualsiasi (Usa SOLO se nessuna proprietà sopra descrive accuratamente il legame) REGOLE CRITICHE E OBBLIGATORIE: 1. Usa SOLO le proprietà elencate sopra. 2. Usa ESCLUSIVAMENTE le entità presenti nella lista "ENTITÀ IDENTIFICATE". È severamente vietato inventare o aggiungere entità non presenti in questo elenco. 3. I campi 'subject_type' e 'object_type' sono OBBLIGATORI. Devi sempre compilarli copiando esattamente il tipo indicato tra parentesi nella lista delle entità. 4. Rispetta rigorosamente i vincoli ontologici: il tipo del 'subject' DEVE essere compatibile con il domain, e il tipo dell''object' con il range. 5. Compila sempre i campi 'evidence' citando esattamente il testo, e 'reasoning' spiegando la scelta logica. """ final_res: KnowledgeGraphExtraction = self._execute_with_retry( self.chain_pass2, [SystemMessage(content=sys_ext), HumanMessage(content=text_chunk)] ) if final_res and final_res.triples: # Propago il source_id prima di inviare l'output for t in final_res.triples: t.source = source_id return final_res return KnowledgeGraphExtraction(triples=[])