import sys import os import time import glob sys.path.append(os.path.dirname(os.path.abspath(__file__))) from src.ingestion.semantic_splitter import ActivaSemanticSplitter from src.extraction.extractor import NeuroSymbolicExtractor from src.validation.validator import SemanticValidator from src.graph.graph_loader import KnowledgeGraphPersister from src.graph.entity_resolver import EntityResolver def pipeline_execution(): print("\n🚀 AVVIO PIPELINE AUTOMATED DISCOVERY\n" + "="*50) raw_text = load_raw_documents() if not raw_text: print("⚠️ Nessun file trovato in data/raw/. Uso testo di default.") raw_text = """ La Basilica di San Marco a Venezia è il principale luogo di culto della città. È uno degli esempi più noti di architettura italo-bizantina. """ # --- FASE 1: INGESTION --- print("\n[FASE 1] Ingestion & Semantic Chunking...") try: # Usa un modello piccolo per lo splitting veloce splitter = ActivaSemanticSplitter(model_name="all-MiniLM-L6-v2") # percentile_threshold=90 significa: taglia solo quando la similarità scende molto chunks, _, _ = splitter.create_chunks(raw_text, percentile_threshold=90) save_chunks_to_processed(chunks) print(f"✅ Testo diviso in {len(chunks)} segmenti semantici.") except Exception as e: print(f"❌ Errore in Fase 1: {e}") return # --- FASE 2: EXTRACTION --- print("\n[FASE 2] Init Neuro-Symbolic Core (Llama 3)...") gold_path = os.path.join("data", "gold_standard", "examples.json") try: # Assicurati che Ollama sia attivo! extractor = NeuroSymbolicExtractor(model_name="llama3", gold_standard_path=gold_path) except Exception as e: print(f"❌ Errore connessione Ollama: {e}") return all_triples = [] print(f"🔄 Avvio estrazione su {len(chunks)} chunk...") for i, chunk in enumerate(chunks): chunk_id = f"doc_sample_chunk_{i+1}" print(f"\n Processing {chunk_id} ({len(chunk)} chars)...") # Invoca Llama 3 extraction_result = extractor.extract(chunk, source_id=chunk_id) if extraction_result and extraction_result.triples: count = len(extraction_result.triples) print(f" -> Estratte {count} triple.") # Aggiungiamo le triple alla lista totale all_triples.extend(extraction_result.triples) else: print(" -> Nessuna tripla trovata (o errore parsing).") print(f"\n✅ Totale triple raccolte: {len(all_triples)}") if not all_triples: print("⚠️ Nessuna tripla da salvare. Pipeline terminata.") return # --- FASE 2.5: SYMBOLIC RESOLUTION & CANONICALIZATION --- # Implementazione Sezione 4.1 del Documento print("\n[FASE 2.5] Entity Resolution & Canonicalization (DBSCAN)...") try: resolver = EntityResolver(similarity_threshold=0.85) # Sovrascriviamo le triple con quelle pulite all_triples = resolver.resolve_entities(all_triples) print("✅ Risoluzione entità completata.") except Exception as e: print(f"⚠️ Errore nel resolver (skip): {e}") print("\n[FASE 2.6] Validazione Semantica (SHACL)...") validator = SemanticValidator() is_valid, report, _ = validator.validate_batch(all_triples) if is_valid: print("✅ Validazione passata. I dati rispettano l'ontologia.") else: print("⚠️ Warning: Rilevate violazioni SHACL.") print(" (In produzione, queste triple verrebbero scartate o mandate in Human Review)") # Per ora procediamo, ma in un sistema reale fermeremmo qui le triple corrotte. print(report) # --- FASE 3: PERSISTENCE --- print("\n[FASE 3] Graph Construction & Persistence (Neo4j)...") try: persister = KnowledgeGraphPersister() persister.save_triples(all_triples) persister.close() print("\n🎉 PIPELINE COMPLETATA CON SUCCESSO!") print("👉 Vai su http://localhost:7474 ed esegui: MATCH (n)-[r]->(m) RETURN n,r,m") except Exception as e: print(f"❌ Errore in Fase 3 (Neo4j): {e}") def load_raw_documents(directory="data/raw"): """Legge tutti i file .txt nella cartella raw.""" texts = [] files = glob.glob(os.path.join(directory, "*.txt")) print(f"📂 Trovati {len(files)} documenti in {directory}") for f_path in files: with open(f_path, 'r', encoding='utf-8') as f: texts.append(f.read()) return "\n\n".join(texts) def save_chunks_to_processed(chunks, directory="data/processed"): """Salva i chunk su disco per debug.""" os.makedirs(directory, exist_ok=True) with open(os.path.join(directory, "chunks_debug.txt"), "w", encoding="utf-8") as f: for i, c in enumerate(chunks): f.write(f"--- CHUNK {i} ---\n{c}\n\n") print(f"💾 Chunk salvati in {directory}/chunks_debug.txt") if __name__ == "__main__": start_time = time.time() pipeline_execution() print(f"\n⏱️ Tempo totale esecuzione: {time.time() - start_time:.2f} secondi")