File size: 11,908 Bytes
a968971
b70d82f
9fb3deb
b70d82f
9cbbfac
 
9fb3deb
cc3f780
a968971
b70d82f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cc3f780
b70d82f
 
 
 
 
 
 
 
 
 
 
 
 
 
a968971
b70d82f
 
 
 
 
 
 
 
a968971
 
 
 
c1b1880
a968971
9fb3deb
b70d82f
cfc197c
9fb3deb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b70d82f
9fb3deb
 
b70d82f
cfc197c
b70d82f
 
 
 
 
 
 
 
 
 
 
cfc197c
b70d82f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9fb3deb
 
 
 
b70d82f
cfc197c
b70d82f
 
cfc197c
9fb3deb
 
 
 
 
 
 
 
 
b70d82f
 
 
 
 
 
 
a968971
b70d82f
 
 
 
2e93420
fe271ee
b70d82f
 
fe271ee
 
 
 
 
 
 
 
b70d82f
 
 
 
cfc197c
b70d82f
 
 
 
 
 
 
 
 
 
 
 
2e93420
b70d82f
a968971
b70d82f
 
 
 
 
 
 
 
 
a968971
b70d82f
a968971
b70d82f
 
a968971
b70d82f
 
c1b1880
b70d82f
 
a968971
b70d82f
 
 
2e93420
cfc197c
b70d82f
 
cc3f780
b70d82f
 
 
 
fe271ee
b70d82f
 
 
 
 
 
 
 
 
 
a968971
b70d82f
 
a968971
b70d82f
 
 
 
 
 
 
 
 
fe271ee
b70d82f
fe271ee
 
 
 
b70d82f
 
 
 
 
 
 
 
 
 
 
 
cfc197c
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
import os
import json
import time
from typing import List, Optional, Dict, Any
from pydantic import BaseModel, Field
from langchain_core.messages import SystemMessage, HumanMessage
from langchain_groq import ChatGroq
from dotenv import load_dotenv

load_dotenv()

# --- MODELLI PYDANTIC (Contratti Formali) ---

# PASS 1 - Livello 1
class MacroCategoryCandidate(BaseModel):
    category: str = Field(description="URI della macro-categoria (es. arco:CulturalProperty)")
    reasoning: str = Field(description="Perché questa macro-categoria è appropriata per l'entità")

class EntityMacroClassification(BaseModel):
    name: str = Field(description="Nome dell'entità come appare nel testo")
    candidates: List[MacroCategoryCandidate] = Field(
        description="1-2 macro-categorie candidate, ordinate per preferenza (la prima è la più probabile)",
        min_length=1,
        max_length=2
    )

class MacroClassificationResult(BaseModel):
    """Output del Livello 1"""
    entities: List[EntityMacroClassification]

# PASS 1 - Livello 2
class TypedEntity(BaseModel):
    name: str = Field(description="Nome dell'entità come appare nel testo")
    type: str = Field(description="URI del tipo ontologico finale (es. arco:ArchaeologicalProperty)")

class TypeInferenceResult(BaseModel):
    """Output del Livello 2"""
    entities: List[TypedEntity]

# PASS 2 - Extraction
class GraphTriple(BaseModel):
    subject: str
    subject_type: str = Field(description="Tipo ontologico del soggetto (da Pass 1)")
    predicate: str
    object: str
    object_type: str = Field(description="Tipo ontologico dell'oggetto (da Pass 1)")
    evidence: str = Field(description="Span testuale esatto dal chunk da cui la relazione è estratta")
    reasoning: str = Field(description="Perché questo predicato è stato scelto per questa coppia di entità")
    source: Optional[str] = Field(None) # Mantenuto per compatibilità con il batching Neo4j a valle

class KnowledgeGraphExtraction(BaseModel):
    triples: List[GraphTriple]


class NeuroSymbolicExtractor:
    def __init__(self, index_path="./ontology/domain_index.json"):
        print("🧠 Inizializzazione TDDT Extractor (Type-Driven Domain Traversal)...")
        
        # google_api_key = os.getenv("GOOGLE_API_KEY")
        # if not google_api_key:
        #     raise ValueError("❌ GOOGLE_API_KEY mancante. Richiesta per Gemini 2.0 Flash.")

        # # Inizializzo l'LLM primario. Temperatura 0 per massimizzare il determinismo.
        # self.llm = ChatGoogleGenerativeAI(
        #     model="gemini-2.0-flash", 
        #     temperature=0,
        #     api_key=google_api_key
        # )

        groq_api_key = os.getenv("GROQ_API_KEY")
        if not groq_api_key:
            raise ValueError("❌ GROQ_API_KEY mancante nel file .env.")

        # Inizializzo l'LLM primario su Groq. 
        self.llm = ChatGroq(
            model="meta-llama/llama-4-scout-17b-16e-instruct", 
            temperature=0,
            api_key=groq_api_key,
            max_retries=5 # Aumentiamo i retry interni di LangChain
        )
        
        # Inizializzo le chain con structured output
        self.chain_pass1_l1 = self.llm.with_structured_output(MacroClassificationResult)
        self.chain_pass1_l2 = self.llm.with_structured_output(TypeInferenceResult)
        self.chain_pass2 = self.llm.with_structured_output(KnowledgeGraphExtraction)

        # Caricamento del Domain Index in RAM
        self.domain_index = {"classes": {}, "properties_by_domain": {}}
        if os.path.exists(index_path):
            with open(index_path, 'r', encoding='utf-8') as f:
                self.domain_index = json.load(f)
            print(f"✅ Domain Index caricato: {len(self.domain_index['classes'])} classi disponibili.")
        else:
            print(f"⚠️ Attenzione: Domain Index non trovato al percorso {index_path}")

        self.root_classes = self._extract_root_classes()

    def _extract_root_classes(self) -> Dict[str, Any]:
        """Estrae il primo livello ontologico per la macro-categorizzazione."""
        roots = {}
        for uri, data in self.domain_index["classes"].items():
            # Consideriamo root le classi senza padri o figlie dirette di owl:Thing / l0:Entity
            if not data["parents"] or "owl:Thing" in data["parents"] or "l0:Entity" in data["parents"]:
                roots[uri] = data
        return roots

    def _get_subclasses(self, parent_uris: List[str]) -> Dict[str, Any]:
        """Recupera tutte le sottoclassi dirette (e se stesse) dai rami indicati."""
        subclasses = {}
        for uri, data in self.domain_index["classes"].items():
            if uri in parent_uris or any(p in parent_uris for p in data["parents"]):
                subclasses[uri] = data
        return subclasses

    def _execute_with_retry(self, chain, prompt_messages, max_retries=4):
        """Self-correction loop con Exponential Backoff per Rate Limits."""
        base_delay = 5 
        
        for attempt in range(max_retries):
            try:
                result = chain.invoke(prompt_messages)
                return result
            except Exception as e:
                error_msg = str(e).upper()
                print(error_msg)
                if "429" in error_msg or "RESOURCE_EXHAUSTED" in error_msg:
                    wait_time = base_delay * (2 ** attempt) 
                    print(f"⏳ [Rate Limit] Quota superata. Attendo {wait_time}s prima di riprovare (Tentativo {attempt+1}/{max_retries})...")
                    time.sleep(wait_time)
                else:
                    print(f"⚠️ Errore (Tentativo {attempt+1}/{max_retries}): {e}")
                    
                if attempt == max_retries - 1:
                    print("❌ Fallimento critico del task LLM.")
                    return None
        return None

    def extract(self, text_chunk: str, source_id: str = "unknown") -> KnowledgeGraphExtraction:
        print(f"\n🧩 Processing {source_id} (TDDT Mode)...")
        
        # ==========================================
        # PASS 1 - LIVELLO 1: Macro-Categorizzazione
        # ==========================================
        roots_text = "\n".join([f"- {uri} — \"{data['label']}: {data['description']}\"" for uri, data in self.root_classes.items()])
        
        sys_l1 = f"""Sei un estrattore esperto di entità semantiche per il dominio dei Beni Culturali. Il tuo unico compito è individuare le entità rilevanti nel testo e classificarle.

MACRO-CATEGORIE DISPONIBILI:
{roots_text}

REGOLE DI ESTRAZIONE (TASSATIVE E OBBLIGATORIE):
1. DIVIETO DI ALLUCINAZIONE URI: Usa ESCLUSIVAMENTE gli URI esatti elencati sopra. È severamente vietato usare etichette inventate come "Person", "Location" o "Group". Se devi categorizzare una persona, usa l'URI corrispondente agli Agenti (es. core:Agent o l0:Agent).
2. RUMORE EDITORIALE: IGNORA e non estrarre MAI riferimenti alla struttura del libro o alle immagini. È vietato estrarre entità che contengono o sono composte da: "Capitolo", "Sezione", "Tavola", "Fig.", "Figura", "Pagina", "Pag.".
3. Estrai SOLO veri monumenti storici, luoghi geografici reali, personaggi storici, popoli e concetti architettonici.
4. Puoi assegnare fino a 2 candidati per entità, ordinandoli per confidenza logica.
"""
        res_l1: MacroClassificationResult = self._execute_with_retry(
            self.chain_pass1_l1, 
            [SystemMessage(content=sys_l1), HumanMessage(content=text_chunk)]
        )
        
        if not res_l1 or not res_l1.entities:
            print("   -> Nessuna entità trovata al Livello 1.")
            return KnowledgeGraphExtraction(triples=[])

        # ==========================================
        # PASS 1 - LIVELLO 2: Specializzazione
        # ==========================================
        # Raccogliamo tutti i rami candidati da esplorare
        candidate_uris = set()
        for ent in res_l1.entities:
            for cand in ent.candidates:
                candidate_uris.add(cand.category)
        
        subclasses = self._get_subclasses(list(candidate_uris))
        
        # Raggruppo le sottoclassi per visualizzarle ordinate nel prompt
        subs_text_blocks = []
        for parent in candidate_uris:
            subs_text_blocks.append(f"\n[{parent} →]")
            children = {k: v for k, v in subclasses.items() if parent in v["parents"] or k == parent}
            for uri, data in children.items():
                subs_text_blocks.append(f"- {uri} — \"{data['label']}: {data['description']}\"")
                
        subs_text = "\n".join(subs_text_blocks)
        
        ent_text = "\n".join([f"- '{e.name}': " + ", ".join([f"{c.category}" for c in e.candidates]) for e in res_l1.entities])
        
        sys_l2 = f"""Per ciascuna entità identificata, scegli il sotto-tipo più specifico tra quelli elencati.
Se non c'è un sotto-tipo rilevante per un'entità, conferma la sua macro-categoria.

ENTITÀ IDENTIFICATE (con macro-categorie candidate):
{ent_text}

SOTTO-TIPI DISPONIBILI:
{subs_text}"""
        
        res_l2: TypeInferenceResult = self._execute_with_retry(
            self.chain_pass1_l2, 
            [SystemMessage(content=sys_l2), HumanMessage(content=text_chunk)]
        )

        if not res_l2 or not res_l2.entities:
            return KnowledgeGraphExtraction(triples=[])

        # ==========================================
        # PASS 2: Estrazione Relazionale
        # ==========================================
        # Mappa dei tipi finali
        typed_entities_map = {e.name: e.type.strip() for e in res_l2.entities}
        
        # Recupero deterministico delle proprietà
        valid_properties = []
        seen_props = set()
        for ent_type in typed_entities_map.values():
            props = self.domain_index["properties_by_domain"].get(ent_type, [])
            for p in props:
                if p["id"] not in seen_props:
                    valid_properties.append(f"- {p['id']}: {p['inherited_from']}{p['range']} (Label: {p['label']})")
                    seen_props.add(p["id"])
                    
        props_text = "\n".join(valid_properties) if valid_properties else "- (Nessuna proprietà specifica trovata. Usa skos:related)"
        ent_final_text = "\n".join([f"- {name} ({uri_type})" for name, uri_type in typed_entities_map.items()])

        sys_ext = f"""Estrai le relazioni semantiche tra le entità presenti nel testo.

ENTITÀ IDENTIFICATE (con il loro tipo):
{ent_final_text}

PROPRIETÀ CONSENTITE (con vincoli domain → range):
{props_text}
- skos:related: Qualsiasi → Qualsiasi (Usa SOLO se nessuna proprietà sopra descrive accuratamente il legame)

REGOLE CRITICHE E OBBLIGATORIE:
1. Usa SOLO le proprietà elencate sopra.
2. Usa ESCLUSIVAMENTE le entità presenti nella lista "ENTITÀ IDENTIFICATE". È severamente vietato inventare o aggiungere entità non presenti in questo elenco.
3. I campi 'subject_type' e 'object_type' sono OBBLIGATORI. Devi sempre compilarli copiando esattamente il tipo indicato tra parentesi nella lista delle entità.
4. Rispetta rigorosamente i vincoli ontologici: il tipo del 'subject' DEVE essere compatibile con il domain, e il tipo dell''object' con il range.
5. Compila sempre i campi 'evidence' citando esattamente il testo, e 'reasoning' spiegando la scelta logica.
"""
        
        final_res: KnowledgeGraphExtraction = self._execute_with_retry(
            self.chain_pass2,
            [SystemMessage(content=sys_ext), HumanMessage(content=text_chunk)]
        )
        
        if final_res and final_res.triples:
            # Propago il source_id prima di inviare l'output
            for t in final_res.triples:
                t.source = source_id
            return final_res
            
        return KnowledgeGraphExtraction(triples=[])