| import json |
| import os |
| import numpy as np |
| from typing import List, Optional |
| from pydantic import BaseModel, Field, ValidationError |
| from langchain_core.prompts import ChatPromptTemplate |
| from langchain_core.messages import SystemMessage, HumanMessage, AIMessage |
|
|
| |
| from langchain_ollama import ChatOllama |
| from langchain_huggingface import HuggingFaceEmbeddings, ChatHuggingFace, HuggingFaceEndpoint |
| from sklearn.metrics.pairwise import cosine_similarity |
|
|
| |
| class GraphTriple(BaseModel): |
| subject: str = Field(..., description="Entità sorgente (Canonical).") |
| predicate: str = Field(..., description="Relazione (snake_case).") |
| object: str = Field(..., description="Entità target.") |
| confidence: float = Field(..., description="Confidenza (0.0 - 1.0).") |
| source: Optional[str] = Field(None, description="ID del documento o chunk.") |
|
|
| class KnowledgeGraphExtraction(BaseModel): |
| reasoning: Optional[str] = Field(None, description="Breve ragionamento logico.") |
| triples: List[GraphTriple] |
|
|
| |
| class NeuroSymbolicExtractor: |
| def __init__(self, model_name="llama3", temperature=0, gold_standard_path=None): |
| |
| hf_token = os.getenv("HF_TOKEN") |
| |
| if hf_token: |
| print("☁️ Rilevato ambiente Cloud (HF Spaces). Utilizzo HuggingFace Inference API.") |
| repo_id = "meta-llama/Meta-Llama-3-8B-Instruct" |
| |
| try: |
| endpoint = HuggingFaceEndpoint( |
| repo_id=repo_id, |
| task="text-generation", |
| max_new_tokens=1024, |
| temperature=0.1, |
| huggingfacehub_api_token=hf_token |
| ) |
| self.llm = ChatHuggingFace(llm=endpoint) |
| print(f"✅ Connesso a {repo_id} via API.") |
| except Exception as e: |
| print(f"❌ Errore connessione HF API: {e}. Fallback su CPU locale (sconsigliato).") |
| raise e |
| else: |
| print(f"🏠 Ambiente Locale rilevato. Inizializzazione Ollama: {model_name}...") |
| try: |
| self.llm = ChatOllama( |
| model=model_name, |
| temperature=temperature, |
| format="json", |
| base_url="http://localhost:11434" |
| ) |
| except Exception as e: |
| print(f"⚠️ Errore Ollama: {e}") |
|
|
| |
| print("🧠 Caricamento modello embedding per Dynamic Selection...") |
| self.embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2") |
|
|
| |
| self.examples = [] |
| self.example_embeddings = None |
| |
| if gold_standard_path and os.path.exists(gold_standard_path): |
| print(f"🌟 Indicizzazione vettoriale Gold Standard da: {gold_standard_path}") |
| self._index_examples(gold_standard_path) |
| else: |
| |
| print("⚠️ Nessun Gold Standard trovato. Modalità Zero-Shot.") |
|
|
| |
| self.system_template_base = """Sei l'Agente Cognitivo (AC) del sistema Canusium xCH. |
| Il tuo compito è trasformare il testo non strutturato in un Digital Twin Graph (RDF). |
| |
| SCHEMA JSON RICHIESTO: |
| {{ |
| "reasoning": "Spiega brevemente perché hai scelto queste classi/relazioni...", |
| "triples": [ |
| {{"subject": "Entità", "predicate": "prefix:Relazione", "object": "Entità", "confidence": 0.95}} |
| ] |
| }} |
| |
| ONTOLOGIA DI RIFERIMENTO (Usa questi prefissi): |
| - xchh: (Heritage) -> Per oggetti fisici, siti, reperti (es. xchh:HeritageObject, xchh:Site). |
| - crm: (CIDOC-CRM) -> Per relazioni standard (es. crm:P55_has_current_location, crm:P4_has_time-span). |
| - xche: (Experience) -> Per sessioni AR/VR, visitatori, interazioni (es. xche:ExperienceSession). |
| - xcha: (Agents) -> Per agenti umani o artificiali. |
| - skos: -> Per concetti generici o gerarchie. |
| |
| ESEMPI CONTESTUALI (Dynamic Few-Shot): |
| {selected_examples} |
| |
| REGOLE DI CONFIDENZA (Trust Layer): |
| - 1.0 (Fatto Curato): Informazione esplicita e certa nel testo. |
| - 0.8 - 0.9 (Inferenza): Deduzione logica forte ma non esplicita. |
| - < 0.7 (Ipotesi): Associazione probabile ma incerta (da marcare per revisione umana). |
| |
| Canonicalizza i nomi (es. "Il Parco" -> "Parco Archeologico di Canne"). |
| Rispondi ESCLUSIVAMENTE con un JSON valido. |
| """ |
|
|
| def _index_examples(self, path: str): |
| """Carica il JSON e calcola i vettori per ogni esempio.""" |
| try: |
| with open(path, 'r', encoding='utf-8') as f: |
| self.examples = json.load(f) |
| |
| |
| texts = [ex['text'] for ex in self.examples] |
| self.example_embeddings = self.embedding_model.embed_documents(texts) |
| print(f"✅ Indicizzati {len(self.examples)} esempi di Gold Standard.") |
| except Exception as e: |
| print(f"❌ Errore indicizzazione Gold Standard: {e}") |
| self.examples = [] |
|
|
| def _get_relevant_examples(self, query_text: str, k=2) -> str: |
| """ |
| Trova i k esempi più simili semanticamente al chunk attuale. |
| """ |
| if not self.examples or self.example_embeddings is None: |
| return "Nessun esempio disponibile." |
|
|
| |
| query_embedding = self.embedding_model.embed_query(query_text) |
| |
| |
| similarities = cosine_similarity([query_embedding], self.example_embeddings)[0] |
| |
| |
| top_k_indices = np.argsort(similarities)[-k:][::-1] |
| |
| formatted_text = "" |
| for i, idx in enumerate(top_k_indices): |
| ex = self.examples[idx] |
| sim_score = similarities[idx] |
| formatted_text += f"\n--- ESEMPIO RILEVANTE #{i+1} (Sim: {sim_score:.2f}) ---\n" |
| formatted_text += f"INPUT: {ex['text']}\n" |
| |
| triples_out = ex.get('triples', []) |
| formatted_text += f"OUTPUT: {json.dumps({'triples': triples_out}, ensure_ascii=False)}\n" |
| |
| return formatted_text |
|
|
| def extract(self, text_chunk: str, source_id: str = "unknown", max_retries=3) -> KnowledgeGraphExtraction: |
| print(f"🧠 Processing {source_id} (Dynamic Mode)...") |
| |
| |
| relevant_examples_str = self._get_relevant_examples(text_chunk, k=2) |
| |
| |
| final_sys_text = self.system_template_base.format(selected_examples=relevant_examples_str) |
| |
| sys_msg = SystemMessage(content=final_sys_text) |
| |
| prompt = ChatPromptTemplate.from_messages([ |
| sys_msg, |
| ("human", "{text}") |
| ]) |
| |
| chain = prompt | self.llm |
| |
| for attempt in range(max_retries): |
| try: |
| response = chain.invoke({"text": text_chunk}) |
| |
| |
| content = response.content |
| |
| |
| if "```json" in content: |
| content = content.split("```json")[1].split("```")[0].strip() |
| elif "```" in content: |
| content = content.split("```")[1].split("```")[0].strip() |
|
|
| data = json.loads(content) |
| |
| |
| if isinstance(data, list): |
| validated_data = KnowledgeGraphExtraction(triples=data, reasoning="Direct list output") |
| else: |
| |
| triples = [GraphTriple(**t) for t in data.get("triples", [])] |
| validated_data = KnowledgeGraphExtraction( |
| reasoning=data.get("reasoning", "N/A"), |
| triples=triples |
| ) |
| |
| for t in validated_data.triples: |
| t.source = source_id |
| |
| return validated_data |
|
|
| except (json.JSONDecodeError, ValidationError) as e: |
| print(f"⚠️ Errore Validazione (Tentativo {attempt+1}/{max_retries}): {e}") |
| |
| |
| prev_content = locals().get('content', 'No content') |
| |
| correction_prompt = ChatPromptTemplate.from_messages([ |
| sys_msg, |
| HumanMessage(content=text_chunk), |
| AIMessage(content=prev_content), |
| HumanMessage(content=f"Errore nel JSON precedente: {e}. Correggi e restituisci SOLO JSON valido senza markdown.") |
| ]) |
| |
| chain = correction_prompt | self.llm |
| |
| except Exception as e: |
| print(f"❌ Errore critico: {e}") |
| break |
| |
| return KnowledgeGraphExtraction(triples=[]) |