GaetanoParente's picture
Upload 3 files
4c86dc7 verified
raw
history blame
5.21 kB
import sys
import os
import time
import glob
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from src.ingestion.semantic_splitter import ActivaSemanticSplitter
from src.extraction.extractor import NeuroSymbolicExtractor
from src.validation.validator import SemanticValidator
from src.graph.graph_loader import KnowledgeGraphPersister
from src.graph.entity_resolver import EntityResolver
def pipeline_execution():
print("\n🚀 AVVIO PIPELINE AUTOMATED DISCOVERY\n" + "="*50)
raw_text = load_raw_documents()
if not raw_text:
print("⚠️ Nessun file trovato in data/raw/. Uso testo di default.")
raw_text = """
La Basilica di San Marco a Venezia è il principale luogo di culto della città.
È uno degli esempi più noti di architettura italo-bizantina.
"""
# --- FASE 1: INGESTION ---
print("\n[FASE 1] Ingestion & Semantic Chunking...")
try:
# Usa un modello piccolo per lo splitting veloce
splitter = ActivaSemanticSplitter(model_name="all-MiniLM-L6-v2")
# percentile_threshold=90 significa: taglia solo quando la similarità scende molto
chunks, _, _ = splitter.create_chunks(raw_text, percentile_threshold=90)
save_chunks_to_processed(chunks)
print(f"✅ Testo diviso in {len(chunks)} segmenti semantici.")
except Exception as e:
print(f"❌ Errore in Fase 1: {e}")
return
# --- FASE 2: EXTRACTION ---
print("\n[FASE 2] Init Neuro-Symbolic Core (Llama 3)...")
gold_path = os.path.join("data", "gold_standard", "examples.json")
try:
# Assicurati che Ollama sia attivo!
extractor = NeuroSymbolicExtractor(model_name="llama3", gold_standard_path=gold_path)
except Exception as e:
print(f"❌ Errore connessione Ollama: {e}")
return
all_triples = []
print(f"🔄 Avvio estrazione su {len(chunks)} chunk...")
for i, chunk in enumerate(chunks):
chunk_id = f"doc_sample_chunk_{i+1}"
print(f"\n Processing {chunk_id} ({len(chunk)} chars)...")
# Invoca Llama 3
extraction_result = extractor.extract(chunk, source_id=chunk_id)
if extraction_result and extraction_result.triples:
count = len(extraction_result.triples)
print(f" -> Estratte {count} triple.")
# Aggiungiamo le triple alla lista totale
all_triples.extend(extraction_result.triples)
else:
print(" -> Nessuna tripla trovata (o errore parsing).")
print(f"\n✅ Totale triple raccolte: {len(all_triples)}")
if not all_triples:
print("⚠️ Nessuna tripla da salvare. Pipeline terminata.")
return
# --- FASE 2.5: SYMBOLIC RESOLUTION & CANONICALIZATION ---
# Implementazione Sezione 4.1 del Documento
print("\n[FASE 2.5] Entity Resolution & Canonicalization (DBSCAN)...")
try:
resolver = EntityResolver(similarity_threshold=0.85)
# Sovrascriviamo le triple con quelle pulite
all_triples = resolver.resolve_entities(all_triples)
print("✅ Risoluzione entità completata.")
except Exception as e:
print(f"⚠️ Errore nel resolver (skip): {e}")
print("\n[FASE 2.6] Validazione Semantica (SHACL)...")
validator = SemanticValidator()
is_valid, report, _ = validator.validate_batch(all_triples)
if is_valid:
print("✅ Validazione passata. I dati rispettano l'ontologia.")
else:
print("⚠️ Warning: Rilevate violazioni SHACL.")
print(" (In produzione, queste triple verrebbero scartate o mandate in Human Review)")
# Per ora procediamo, ma in un sistema reale fermeremmo qui le triple corrotte.
print(report)
# --- FASE 3: PERSISTENCE ---
print("\n[FASE 3] Graph Construction & Persistence (Neo4j)...")
try:
persister = KnowledgeGraphPersister()
persister.save_triples(all_triples)
persister.close()
print("\n🎉 PIPELINE COMPLETATA CON SUCCESSO!")
print("👉 Vai su http://localhost:7474 ed esegui: MATCH (n)-[r]->(m) RETURN n,r,m")
except Exception as e:
print(f"❌ Errore in Fase 3 (Neo4j): {e}")
def load_raw_documents(directory="data/raw"):
"""Legge tutti i file .txt nella cartella raw."""
texts = []
files = glob.glob(os.path.join(directory, "*.txt"))
print(f"📂 Trovati {len(files)} documenti in {directory}")
for f_path in files:
with open(f_path, 'r', encoding='utf-8') as f:
texts.append(f.read())
return "\n\n".join(texts)
def save_chunks_to_processed(chunks, directory="data/processed"):
"""Salva i chunk su disco per debug."""
os.makedirs(directory, exist_ok=True)
with open(os.path.join(directory, "chunks_debug.txt"), "w", encoding="utf-8") as f:
for i, c in enumerate(chunks):
f.write(f"--- CHUNK {i} ---\n{c}\n\n")
print(f"💾 Chunk salvati in {directory}/chunks_debug.txt")
if __name__ == "__main__":
start_time = time.time()
pipeline_execution()
print(f"\n⏱️ Tempo totale esecuzione: {time.time() - start_time:.2f} secondi")