""" MODULE: RDF MANAGER (TRACEABILITY ENABLED) ========================================== """ import rdflib from rdflib import Graph, Literal, Namespace from rdflib.namespace import RDF, RDFS, XSD import re import sys import os sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../'))) from src.Analytics.pipeline_builder import PipelineBuilder VORTEX = Namespace("http://vortex.ai/ontology#") class RDFStore: def __init__(self): self.g = Graph() self.g.bind("vortex", VORTEX) self.pipeline = PipelineBuilder() def ingest_networkx_graph(self, G, ontology_manager): print("⏳ [RDF] Ingestion (Mode Diagnostic)...") self.g = Graph() self.g.bind("vortex", VORTEX) for node_id, data in G.nodes(data=True): if data.get('group') == 'Prop': continue # URI sécurisée safe_id = re.sub(r'[^a-zA-Z0-9_-]', '_', str(node_id)) subject_uri = VORTEX[safe_id] group = data.get('group', 'Entity') self.g.add((subject_uri, RDF.type, VORTEX[group])) self.g.add((subject_uri, RDFS.label, Literal(str(data.get('label', node_id))))) for key, value in data.items(): if key in ['group', 'label', 'title', 'x', 'y', 'color', 'shape', 'size', 'hidden']: continue pred_name = ontology_manager.get_predicate_for_column(group, key) if not pred_name: continue target_type = ontology_manager.get_type_for_predicate(pred_name) # --- APPEL DIAGNOSTIC --- clean_value = self.pipeline.enforce_contract( value, target_type, row_id=str(node_id), col_name=str(key) ) if clean_value is not None: rdf_dtype = getattr(XSD, target_type.split(':')[1]) if ':' in target_type else None self.g.add((subject_uri, VORTEX[pred_name], Literal(clean_value, datatype=rdf_dtype))) # Relations (Version simplifiée pour brièveté, appliquez la même logique si besoin) for u, v, data in G.edges(data=True): if G.nodes[u].get('group') == 'Prop': continue safe_u = re.sub(r'[^a-zA-Z0-9_-]', '_', str(u)) rel_label = data.get('label', 'relatedTo') pred_clean = re.sub(r'[^a-zA-Z0-9_]', '', str(rel_label)) if G.nodes[v].get('group') != 'Prop': safe_v = re.sub(r'[^a-zA-Z0-9_-]', '_', str(v)) self.g.add((VORTEX[safe_u], VORTEX[pred_clean], VORTEX[safe_v])) else: raw_val = G.nodes[v].get('label', str(v)) target_type = ontology_manager.get_type_for_predicate(pred_clean) clean_val = self.pipeline.enforce_contract(raw_val, target_type, row_id=str(u), col_name=str(rel_label)) if clean_val: rdf_dtype = getattr(XSD, target_type.split(':')[1]) if ':' in target_type else None self.g.add((VORTEX[safe_u], VORTEX[pred_clean], Literal(clean_val, datatype=rdf_dtype))) print(f"✅ [RDF] Ingestion terminée. {len(self.g)} triplets.") def execute_sparql(self, query): try: full_query = f""" PREFIX vortex: PREFIX rdf: PREFIX rdfs: PREFIX xsd: {query} """ results = self.g.query(full_query) output = [] for row in results: line = [] for item in row: val = str(item).replace("http://vortex.ai/ontology#", "") line.append(val) output.append(" | ".join(line)) if not output: return "Aucun résultat trouvé." return "\n".join(output[:100]) except Exception as e: return f"❌ Erreur SPARQL : {str(e)}" def get_full_schema(self): if len(self.g) == 0: return {"classes": {}, "predicates": [], "triples": 0} schema = {"classes": {}, "predicates": [], "triples": len(self.g)} q_classes = "SELECT ?type (COUNT(?s) as ?count) WHERE { ?s a ?type } GROUP BY ?type" for row in self.g.query(q_classes): cls_name = str(row[0]).split('#')[-1] schema["classes"][cls_name] = int(row[1]) q_preds = "SELECT DISTINCT ?p WHERE { ?s ?p ?o FILTER(STRSTARTS(STR(?p), 'http://vortex.ai/ontology#')) }" for row in self.g.query(q_preds): pred = str(row[0]).split('#')[-1] schema["predicates"].append(pred) return schema