File size: 11,908 Bytes
a968971 b70d82f 9fb3deb b70d82f 9cbbfac 9fb3deb cc3f780 a968971 b70d82f cc3f780 b70d82f a968971 b70d82f a968971 c1b1880 a968971 9fb3deb b70d82f cfc197c 9fb3deb b70d82f 9fb3deb b70d82f cfc197c b70d82f cfc197c b70d82f 9fb3deb b70d82f cfc197c b70d82f cfc197c 9fb3deb b70d82f a968971 b70d82f 2e93420 fe271ee b70d82f fe271ee b70d82f cfc197c b70d82f 2e93420 b70d82f a968971 b70d82f a968971 b70d82f a968971 b70d82f a968971 b70d82f c1b1880 b70d82f a968971 b70d82f 2e93420 cfc197c b70d82f cc3f780 b70d82f fe271ee b70d82f a968971 b70d82f a968971 b70d82f fe271ee b70d82f fe271ee b70d82f cfc197c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 | import os
import json
import time
from typing import List, Optional, Dict, Any
from pydantic import BaseModel, Field
from langchain_core.messages import SystemMessage, HumanMessage
from langchain_groq import ChatGroq
from dotenv import load_dotenv
load_dotenv()
# --- MODELLI PYDANTIC (Contratti Formali) ---
# PASS 1 - Livello 1
class MacroCategoryCandidate(BaseModel):
category: str = Field(description="URI della macro-categoria (es. arco:CulturalProperty)")
reasoning: str = Field(description="Perché questa macro-categoria è appropriata per l'entità")
class EntityMacroClassification(BaseModel):
name: str = Field(description="Nome dell'entità come appare nel testo")
candidates: List[MacroCategoryCandidate] = Field(
description="1-2 macro-categorie candidate, ordinate per preferenza (la prima è la più probabile)",
min_length=1,
max_length=2
)
class MacroClassificationResult(BaseModel):
"""Output del Livello 1"""
entities: List[EntityMacroClassification]
# PASS 1 - Livello 2
class TypedEntity(BaseModel):
name: str = Field(description="Nome dell'entità come appare nel testo")
type: str = Field(description="URI del tipo ontologico finale (es. arco:ArchaeologicalProperty)")
class TypeInferenceResult(BaseModel):
"""Output del Livello 2"""
entities: List[TypedEntity]
# PASS 2 - Extraction
class GraphTriple(BaseModel):
subject: str
subject_type: str = Field(description="Tipo ontologico del soggetto (da Pass 1)")
predicate: str
object: str
object_type: str = Field(description="Tipo ontologico dell'oggetto (da Pass 1)")
evidence: str = Field(description="Span testuale esatto dal chunk da cui la relazione è estratta")
reasoning: str = Field(description="Perché questo predicato è stato scelto per questa coppia di entità")
source: Optional[str] = Field(None) # Mantenuto per compatibilità con il batching Neo4j a valle
class KnowledgeGraphExtraction(BaseModel):
triples: List[GraphTriple]
class NeuroSymbolicExtractor:
def __init__(self, index_path="./ontology/domain_index.json"):
print("🧠 Inizializzazione TDDT Extractor (Type-Driven Domain Traversal)...")
# google_api_key = os.getenv("GOOGLE_API_KEY")
# if not google_api_key:
# raise ValueError("❌ GOOGLE_API_KEY mancante. Richiesta per Gemini 2.0 Flash.")
# # Inizializzo l'LLM primario. Temperatura 0 per massimizzare il determinismo.
# self.llm = ChatGoogleGenerativeAI(
# model="gemini-2.0-flash",
# temperature=0,
# api_key=google_api_key
# )
groq_api_key = os.getenv("GROQ_API_KEY")
if not groq_api_key:
raise ValueError("❌ GROQ_API_KEY mancante nel file .env.")
# Inizializzo l'LLM primario su Groq.
self.llm = ChatGroq(
model="meta-llama/llama-4-scout-17b-16e-instruct",
temperature=0,
api_key=groq_api_key,
max_retries=5 # Aumentiamo i retry interni di LangChain
)
# Inizializzo le chain con structured output
self.chain_pass1_l1 = self.llm.with_structured_output(MacroClassificationResult)
self.chain_pass1_l2 = self.llm.with_structured_output(TypeInferenceResult)
self.chain_pass2 = self.llm.with_structured_output(KnowledgeGraphExtraction)
# Caricamento del Domain Index in RAM
self.domain_index = {"classes": {}, "properties_by_domain": {}}
if os.path.exists(index_path):
with open(index_path, 'r', encoding='utf-8') as f:
self.domain_index = json.load(f)
print(f"✅ Domain Index caricato: {len(self.domain_index['classes'])} classi disponibili.")
else:
print(f"⚠️ Attenzione: Domain Index non trovato al percorso {index_path}")
self.root_classes = self._extract_root_classes()
def _extract_root_classes(self) -> Dict[str, Any]:
"""Estrae il primo livello ontologico per la macro-categorizzazione."""
roots = {}
for uri, data in self.domain_index["classes"].items():
# Consideriamo root le classi senza padri o figlie dirette di owl:Thing / l0:Entity
if not data["parents"] or "owl:Thing" in data["parents"] or "l0:Entity" in data["parents"]:
roots[uri] = data
return roots
def _get_subclasses(self, parent_uris: List[str]) -> Dict[str, Any]:
"""Recupera tutte le sottoclassi dirette (e se stesse) dai rami indicati."""
subclasses = {}
for uri, data in self.domain_index["classes"].items():
if uri in parent_uris or any(p in parent_uris for p in data["parents"]):
subclasses[uri] = data
return subclasses
def _execute_with_retry(self, chain, prompt_messages, max_retries=4):
"""Self-correction loop con Exponential Backoff per Rate Limits."""
base_delay = 5
for attempt in range(max_retries):
try:
result = chain.invoke(prompt_messages)
return result
except Exception as e:
error_msg = str(e).upper()
print(error_msg)
if "429" in error_msg or "RESOURCE_EXHAUSTED" in error_msg:
wait_time = base_delay * (2 ** attempt)
print(f"⏳ [Rate Limit] Quota superata. Attendo {wait_time}s prima di riprovare (Tentativo {attempt+1}/{max_retries})...")
time.sleep(wait_time)
else:
print(f"⚠️ Errore (Tentativo {attempt+1}/{max_retries}): {e}")
if attempt == max_retries - 1:
print("❌ Fallimento critico del task LLM.")
return None
return None
def extract(self, text_chunk: str, source_id: str = "unknown") -> KnowledgeGraphExtraction:
print(f"\n🧩 Processing {source_id} (TDDT Mode)...")
# ==========================================
# PASS 1 - LIVELLO 1: Macro-Categorizzazione
# ==========================================
roots_text = "\n".join([f"- {uri} — \"{data['label']}: {data['description']}\"" for uri, data in self.root_classes.items()])
sys_l1 = f"""Sei un estrattore esperto di entità semantiche per il dominio dei Beni Culturali. Il tuo unico compito è individuare le entità rilevanti nel testo e classificarle.
MACRO-CATEGORIE DISPONIBILI:
{roots_text}
REGOLE DI ESTRAZIONE (TASSATIVE E OBBLIGATORIE):
1. DIVIETO DI ALLUCINAZIONE URI: Usa ESCLUSIVAMENTE gli URI esatti elencati sopra. È severamente vietato usare etichette inventate come "Person", "Location" o "Group". Se devi categorizzare una persona, usa l'URI corrispondente agli Agenti (es. core:Agent o l0:Agent).
2. RUMORE EDITORIALE: IGNORA e non estrarre MAI riferimenti alla struttura del libro o alle immagini. È vietato estrarre entità che contengono o sono composte da: "Capitolo", "Sezione", "Tavola", "Fig.", "Figura", "Pagina", "Pag.".
3. Estrai SOLO veri monumenti storici, luoghi geografici reali, personaggi storici, popoli e concetti architettonici.
4. Puoi assegnare fino a 2 candidati per entità, ordinandoli per confidenza logica.
"""
res_l1: MacroClassificationResult = self._execute_with_retry(
self.chain_pass1_l1,
[SystemMessage(content=sys_l1), HumanMessage(content=text_chunk)]
)
if not res_l1 or not res_l1.entities:
print(" -> Nessuna entità trovata al Livello 1.")
return KnowledgeGraphExtraction(triples=[])
# ==========================================
# PASS 1 - LIVELLO 2: Specializzazione
# ==========================================
# Raccogliamo tutti i rami candidati da esplorare
candidate_uris = set()
for ent in res_l1.entities:
for cand in ent.candidates:
candidate_uris.add(cand.category)
subclasses = self._get_subclasses(list(candidate_uris))
# Raggruppo le sottoclassi per visualizzarle ordinate nel prompt
subs_text_blocks = []
for parent in candidate_uris:
subs_text_blocks.append(f"\n[{parent} →]")
children = {k: v for k, v in subclasses.items() if parent in v["parents"] or k == parent}
for uri, data in children.items():
subs_text_blocks.append(f"- {uri} — \"{data['label']}: {data['description']}\"")
subs_text = "\n".join(subs_text_blocks)
ent_text = "\n".join([f"- '{e.name}': " + ", ".join([f"{c.category}" for c in e.candidates]) for e in res_l1.entities])
sys_l2 = f"""Per ciascuna entità identificata, scegli il sotto-tipo più specifico tra quelli elencati.
Se non c'è un sotto-tipo rilevante per un'entità, conferma la sua macro-categoria.
ENTITÀ IDENTIFICATE (con macro-categorie candidate):
{ent_text}
SOTTO-TIPI DISPONIBILI:
{subs_text}"""
res_l2: TypeInferenceResult = self._execute_with_retry(
self.chain_pass1_l2,
[SystemMessage(content=sys_l2), HumanMessage(content=text_chunk)]
)
if not res_l2 or not res_l2.entities:
return KnowledgeGraphExtraction(triples=[])
# ==========================================
# PASS 2: Estrazione Relazionale
# ==========================================
# Mappa dei tipi finali
typed_entities_map = {e.name: e.type.strip() for e in res_l2.entities}
# Recupero deterministico delle proprietà
valid_properties = []
seen_props = set()
for ent_type in typed_entities_map.values():
props = self.domain_index["properties_by_domain"].get(ent_type, [])
for p in props:
if p["id"] not in seen_props:
valid_properties.append(f"- {p['id']}: {p['inherited_from']} → {p['range']} (Label: {p['label']})")
seen_props.add(p["id"])
props_text = "\n".join(valid_properties) if valid_properties else "- (Nessuna proprietà specifica trovata. Usa skos:related)"
ent_final_text = "\n".join([f"- {name} ({uri_type})" for name, uri_type in typed_entities_map.items()])
sys_ext = f"""Estrai le relazioni semantiche tra le entità presenti nel testo.
ENTITÀ IDENTIFICATE (con il loro tipo):
{ent_final_text}
PROPRIETÀ CONSENTITE (con vincoli domain → range):
{props_text}
- skos:related: Qualsiasi → Qualsiasi (Usa SOLO se nessuna proprietà sopra descrive accuratamente il legame)
REGOLE CRITICHE E OBBLIGATORIE:
1. Usa SOLO le proprietà elencate sopra.
2. Usa ESCLUSIVAMENTE le entità presenti nella lista "ENTITÀ IDENTIFICATE". È severamente vietato inventare o aggiungere entità non presenti in questo elenco.
3. I campi 'subject_type' e 'object_type' sono OBBLIGATORI. Devi sempre compilarli copiando esattamente il tipo indicato tra parentesi nella lista delle entità.
4. Rispetta rigorosamente i vincoli ontologici: il tipo del 'subject' DEVE essere compatibile con il domain, e il tipo dell''object' con il range.
5. Compila sempre i campi 'evidence' citando esattamente il testo, e 'reasoning' spiegando la scelta logica.
"""
final_res: KnowledgeGraphExtraction = self._execute_with_retry(
self.chain_pass2,
[SystemMessage(content=sys_ext), HumanMessage(content=text_chunk)]
)
if final_res and final_res.triples:
# Propago il source_id prima di inviare l'output
for t in final_res.triples:
t.source = source_id
return final_res
return KnowledgeGraphExtraction(triples=[]) |