Spaces:
Running
Running
| """ | |
| MODULE: RDF MANAGER (TRACEABILITY ENABLED) | |
| ========================================== | |
| """ | |
| import rdflib | |
| from rdflib import Graph, Literal, Namespace | |
| from rdflib.namespace import RDF, RDFS, XSD | |
| import re | |
| import sys | |
| import os | |
| sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../'))) | |
| from src.Analytics.pipeline_builder import PipelineBuilder | |
| VORTEX = Namespace("http://vortex.ai/ontology#") | |
| class RDFStore: | |
| def __init__(self): | |
| self.g = Graph() | |
| self.g.bind("vortex", VORTEX) | |
| self.pipeline = PipelineBuilder() | |
| def ingest_networkx_graph(self, G, ontology_manager): | |
| print("⏳ [RDF] Ingestion (Mode Diagnostic)...") | |
| self.g = Graph() | |
| self.g.bind("vortex", VORTEX) | |
| for node_id, data in G.nodes(data=True): | |
| if data.get('group') == 'Prop': continue | |
| # URI sécurisée | |
| safe_id = re.sub(r'[^a-zA-Z0-9_-]', '_', str(node_id)) | |
| subject_uri = VORTEX[safe_id] | |
| group = data.get('group', 'Entity') | |
| self.g.add((subject_uri, RDF.type, VORTEX[group])) | |
| self.g.add((subject_uri, RDFS.label, Literal(str(data.get('label', node_id))))) | |
| for key, value in data.items(): | |
| if key in ['group', 'label', 'title', 'x', 'y', 'color', 'shape', 'size', 'hidden']: continue | |
| pred_name = ontology_manager.get_predicate_for_column(group, key) | |
| if not pred_name: continue | |
| target_type = ontology_manager.get_type_for_predicate(pred_name) | |
| # --- APPEL DIAGNOSTIC --- | |
| clean_value = self.pipeline.enforce_contract( | |
| value, | |
| target_type, | |
| row_id=str(node_id), | |
| col_name=str(key) | |
| ) | |
| if clean_value is not None: | |
| rdf_dtype = getattr(XSD, target_type.split(':')[1]) if ':' in target_type else None | |
| self.g.add((subject_uri, VORTEX[pred_name], Literal(clean_value, datatype=rdf_dtype))) | |
| # Relations (Version simplifiée pour brièveté, appliquez la même logique si besoin) | |
| for u, v, data in G.edges(data=True): | |
| if G.nodes[u].get('group') == 'Prop': continue | |
| safe_u = re.sub(r'[^a-zA-Z0-9_-]', '_', str(u)) | |
| rel_label = data.get('label', 'relatedTo') | |
| pred_clean = re.sub(r'[^a-zA-Z0-9_]', '', str(rel_label)) | |
| if G.nodes[v].get('group') != 'Prop': | |
| safe_v = re.sub(r'[^a-zA-Z0-9_-]', '_', str(v)) | |
| self.g.add((VORTEX[safe_u], VORTEX[pred_clean], VORTEX[safe_v])) | |
| else: | |
| raw_val = G.nodes[v].get('label', str(v)) | |
| target_type = ontology_manager.get_type_for_predicate(pred_clean) | |
| clean_val = self.pipeline.enforce_contract(raw_val, target_type, row_id=str(u), col_name=str(rel_label)) | |
| if clean_val: | |
| rdf_dtype = getattr(XSD, target_type.split(':')[1]) if ':' in target_type else None | |
| self.g.add((VORTEX[safe_u], VORTEX[pred_clean], Literal(clean_val, datatype=rdf_dtype))) | |
| print(f"✅ [RDF] Ingestion terminée. {len(self.g)} triplets.") | |
| def execute_sparql(self, query): | |
| try: | |
| full_query = f""" | |
| PREFIX vortex: <http://vortex.ai/ontology#> | |
| PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> | |
| PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#> | |
| PREFIX xsd: <http://www.w3.org/2001/XMLSchema#> | |
| {query} | |
| """ | |
| results = self.g.query(full_query) | |
| output = [] | |
| for row in results: | |
| line = [] | |
| for item in row: | |
| val = str(item).replace("http://vortex.ai/ontology#", "") | |
| line.append(val) | |
| output.append(" | ".join(line)) | |
| if not output: return "Aucun résultat trouvé." | |
| return "\n".join(output[:100]) | |
| except Exception as e: | |
| return f"❌ Erreur SPARQL : {str(e)}" | |
| def get_full_schema(self): | |
| if len(self.g) == 0: return {"classes": {}, "predicates": [], "triples": 0} | |
| schema = {"classes": {}, "predicates": [], "triples": len(self.g)} | |
| q_classes = "SELECT ?type (COUNT(?s) as ?count) WHERE { ?s a ?type } GROUP BY ?type" | |
| for row in self.g.query(q_classes): | |
| cls_name = str(row[0]).split('#')[-1] | |
| schema["classes"][cls_name] = int(row[1]) | |
| q_preds = "SELECT DISTINCT ?p WHERE { ?s ?p ?o FILTER(STRSTARTS(STR(?p), 'http://vortex.ai/ontology#')) }" | |
| for row in self.g.query(q_preds): | |
| pred = str(row[0]).split('#')[-1] | |
| schema["predicates"].append(pred) | |
| return schema |