File size: 9,684 Bytes
a968971 cfc197c a968971 cfc197c a968971 cfc197c a968971 cfc197c a968971 cfc197c a968971 cfc197c a968971 cfc197c a968971 cfc197c a968971 cfc197c a968971 cfc197c a968971 cfc197c a968971 cfc197c a968971 cfc197c a968971 cfc197c a968971 cfc197c a968971 cfc197c a968971 cfc197c a968971 cfc197c a968971 cfc197c a968971 cfc197c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 | import json
import os
import numpy as np
from typing import List, Optional
from pydantic import BaseModel, Field, ValidationError
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
# Gestione Multi-Backend (Locale vs Cloud)
from langchain_ollama import ChatOllama
from langchain_huggingface import HuggingFaceEmbeddings, ChatHuggingFace, HuggingFaceEndpoint
from sklearn.metrics.pairwise import cosine_similarity
# --- 1. DEFINIZIONE DELLO SCHEMA ---
class GraphTriple(BaseModel):
subject: str = Field(..., description="Entità sorgente (Canonical).")
predicate: str = Field(..., description="Relazione (snake_case).")
object: str = Field(..., description="Entità target.")
confidence: float = Field(..., description="Confidenza (0.0 - 1.0).")
source: Optional[str] = Field(None, description="ID del documento o chunk.")
class KnowledgeGraphExtraction(BaseModel):
reasoning: Optional[str] = Field(None, description="Breve ragionamento logico.")
triples: List[GraphTriple]
# --- 2. ESTRATTORE DINAMICO (Dynamic Few-Shot) ---
class NeuroSymbolicExtractor:
def __init__(self, model_name="llama3", temperature=0, gold_standard_path=None):
hf_token = os.getenv("HF_TOKEN")
if hf_token:
print("☁️ Rilevato ambiente Cloud (HF Spaces). Utilizzo HuggingFace Inference API.")
repo_id = "meta-llama/Meta-Llama-3-8B-Instruct"
try:
endpoint = HuggingFaceEndpoint(
repo_id=repo_id,
task="text-generation",
max_new_tokens=1024,
temperature=0.1,
huggingfacehub_api_token=hf_token
)
self.llm = ChatHuggingFace(llm=endpoint)
print(f"✅ Connesso a {repo_id} via API.")
except Exception as e:
print(f"❌ Errore connessione HF API: {e}. Fallback su CPU locale (sconsigliato).")
raise e
else:
print(f"🏠 Ambiente Locale rilevato. Inizializzazione Ollama: {model_name}...")
try:
self.llm = ChatOllama(
model=model_name,
temperature=temperature,
format="json",
base_url="http://localhost:11434"
)
except Exception as e:
print(f"⚠️ Errore Ollama: {e}")
# 2. Modello Embedding per la selezione dinamica
print("🧠 Caricamento modello embedding per Dynamic Selection...")
self.embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2")
# 3. Caricamento e Indicizzazione Gold Standard
self.examples = []
self.example_embeddings = None
if gold_standard_path and os.path.exists(gold_standard_path):
print(f"🌟 Indicizzazione vettoriale Gold Standard da: {gold_standard_path}")
self._index_examples(gold_standard_path)
else:
# Crea una lista vuota per evitare crash se il path non esiste
print("⚠️ Nessun Gold Standard trovato. Modalità Zero-Shot.")
# Template Specializzato (Prompt Engineering)
self.system_template_base = """Sei l'Agente Cognitivo (AC) del sistema Canusium xCH.
Il tuo compito è trasformare il testo non strutturato in un Digital Twin Graph (RDF).
SCHEMA JSON RICHIESTO:
{{
"reasoning": "Spiega brevemente perché hai scelto queste classi/relazioni...",
"triples": [
{{"subject": "Entità", "predicate": "prefix:Relazione", "object": "Entità", "confidence": 0.95}}
]
}}
ONTOLOGIA DI RIFERIMENTO (Usa questi prefissi):
- xchh: (Heritage) -> Per oggetti fisici, siti, reperti (es. xchh:HeritageObject, xchh:Site).
- crm: (CIDOC-CRM) -> Per relazioni standard (es. crm:P55_has_current_location, crm:P4_has_time-span).
- xche: (Experience) -> Per sessioni AR/VR, visitatori, interazioni (es. xche:ExperienceSession).
- xcha: (Agents) -> Per agenti umani o artificiali.
- skos: -> Per concetti generici o gerarchie.
ESEMPI CONTESTUALI (Dynamic Few-Shot):
{selected_examples}
REGOLE DI CONFIDENZA (Trust Layer):
- 1.0 (Fatto Curato): Informazione esplicita e certa nel testo.
- 0.8 - 0.9 (Inferenza): Deduzione logica forte ma non esplicita.
- < 0.7 (Ipotesi): Associazione probabile ma incerta (da marcare per revisione umana).
Canonicalizza i nomi (es. "Il Parco" -> "Parco Archeologico di Canne").
Rispondi ESCLUSIVAMENTE con un JSON valido.
"""
def _index_examples(self, path: str):
"""Carica il JSON e calcola i vettori per ogni esempio."""
try:
with open(path, 'r', encoding='utf-8') as f:
self.examples = json.load(f)
# Estraiamo solo il testo di input per calcolare l'embedding
texts = [ex['text'] for ex in self.examples]
self.example_embeddings = self.embedding_model.embed_documents(texts)
print(f"✅ Indicizzati {len(self.examples)} esempi di Gold Standard.")
except Exception as e:
print(f"❌ Errore indicizzazione Gold Standard: {e}")
self.examples = []
def _get_relevant_examples(self, query_text: str, k=2) -> str:
"""
Trova i k esempi più simili semanticamente al chunk attuale.
"""
if not self.examples or self.example_embeddings is None:
return "Nessun esempio disponibile."
# 1. Embed del chunk attuale
query_embedding = self.embedding_model.embed_query(query_text)
# 2. Calcolo similarità coseno
similarities = cosine_similarity([query_embedding], self.example_embeddings)[0]
# 3. Selezione dei top-k
top_k_indices = np.argsort(similarities)[-k:][::-1]
formatted_text = ""
for i, idx in enumerate(top_k_indices):
ex = self.examples[idx]
sim_score = similarities[idx]
formatted_text += f"\n--- ESEMPIO RILEVANTE #{i+1} (Sim: {sim_score:.2f}) ---\n"
formatted_text += f"INPUT: {ex['text']}\n"
# Gestione sicura nel caso triples manchi
triples_out = ex.get('triples', [])
formatted_text += f"OUTPUT: {json.dumps({'triples': triples_out}, ensure_ascii=False)}\n"
return formatted_text
def extract(self, text_chunk: str, source_id: str = "unknown", max_retries=3) -> KnowledgeGraphExtraction:
print(f"🧠 Processing {source_id} (Dynamic Mode)...")
# Selezione Esempi
relevant_examples_str = self._get_relevant_examples(text_chunk, k=2)
# Costruzione Prompt Finale
final_sys_text = self.system_template_base.format(selected_examples=relevant_examples_str)
sys_msg = SystemMessage(content=final_sys_text)
prompt = ChatPromptTemplate.from_messages([
sys_msg,
("human", "{text}")
])
chain = prompt | self.llm
for attempt in range(max_retries):
try:
response = chain.invoke({"text": text_chunk})
# Parsing della risposta (diversa tra Ollama e HF)
content = response.content
# Pulizia base se il modello chiacchiera prima del JSON
if "```json" in content:
content = content.split("```json")[1].split("```")[0].strip()
elif "```" in content:
content = content.split("```")[1].split("```")[0].strip()
data = json.loads(content)
# Normalizzazione output
if isinstance(data, list):
validated_data = KnowledgeGraphExtraction(triples=data, reasoning="Direct list output")
else:
# Filtra campi extra che il modello potrebbe inventare
triples = [GraphTriple(**t) for t in data.get("triples", [])]
validated_data = KnowledgeGraphExtraction(
reasoning=data.get("reasoning", "N/A"),
triples=triples
)
for t in validated_data.triples:
t.source = source_id
return validated_data
except (json.JSONDecodeError, ValidationError) as e:
print(f"⚠️ Errore Validazione (Tentativo {attempt+1}/{max_retries}): {e}")
# SELF-CORRECTION LOOP
prev_content = locals().get('content', 'No content')
correction_prompt = ChatPromptTemplate.from_messages([
sys_msg,
HumanMessage(content=text_chunk),
AIMessage(content=prev_content),
HumanMessage(content=f"Errore nel JSON precedente: {e}. Correggi e restituisci SOLO JSON valido senza markdown.")
])
chain = correction_prompt | self.llm
except Exception as e:
print(f"❌ Errore critico: {e}")
break
return KnowledgeGraphExtraction(triples=[]) |