Vortex-Flux / src /Algorithms /rdf_manager.py
klydekushy's picture
Update src/Algorithms/rdf_manager.py
f42a19f verified
"""
MODULE: RDF MANAGER (TRACEABILITY ENABLED)
==========================================
"""
import rdflib
from rdflib import Graph, Literal, Namespace
from rdflib.namespace import RDF, RDFS, XSD
import re
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../')))
from src.Analytics.pipeline_builder import PipelineBuilder
VORTEX = Namespace("http://vortex.ai/ontology#")
class RDFStore:
def __init__(self):
self.g = Graph()
self.g.bind("vortex", VORTEX)
self.pipeline = PipelineBuilder()
def ingest_networkx_graph(self, G, ontology_manager):
print("⏳ [RDF] Ingestion (Mode Diagnostic)...")
self.g = Graph()
self.g.bind("vortex", VORTEX)
for node_id, data in G.nodes(data=True):
if data.get('group') == 'Prop': continue
# URI sécurisée
safe_id = re.sub(r'[^a-zA-Z0-9_-]', '_', str(node_id))
subject_uri = VORTEX[safe_id]
group = data.get('group', 'Entity')
self.g.add((subject_uri, RDF.type, VORTEX[group]))
self.g.add((subject_uri, RDFS.label, Literal(str(data.get('label', node_id)))))
for key, value in data.items():
if key in ['group', 'label', 'title', 'x', 'y', 'color', 'shape', 'size', 'hidden']: continue
pred_name = ontology_manager.get_predicate_for_column(group, key)
if not pred_name: continue
target_type = ontology_manager.get_type_for_predicate(pred_name)
# --- APPEL DIAGNOSTIC ---
clean_value = self.pipeline.enforce_contract(
value,
target_type,
row_id=str(node_id),
col_name=str(key)
)
if clean_value is not None:
rdf_dtype = getattr(XSD, target_type.split(':')[1]) if ':' in target_type else None
self.g.add((subject_uri, VORTEX[pred_name], Literal(clean_value, datatype=rdf_dtype)))
# Relations (Version simplifiée pour brièveté, appliquez la même logique si besoin)
for u, v, data in G.edges(data=True):
if G.nodes[u].get('group') == 'Prop': continue
safe_u = re.sub(r'[^a-zA-Z0-9_-]', '_', str(u))
rel_label = data.get('label', 'relatedTo')
pred_clean = re.sub(r'[^a-zA-Z0-9_]', '', str(rel_label))
if G.nodes[v].get('group') != 'Prop':
safe_v = re.sub(r'[^a-zA-Z0-9_-]', '_', str(v))
self.g.add((VORTEX[safe_u], VORTEX[pred_clean], VORTEX[safe_v]))
else:
raw_val = G.nodes[v].get('label', str(v))
target_type = ontology_manager.get_type_for_predicate(pred_clean)
clean_val = self.pipeline.enforce_contract(raw_val, target_type, row_id=str(u), col_name=str(rel_label))
if clean_val:
rdf_dtype = getattr(XSD, target_type.split(':')[1]) if ':' in target_type else None
self.g.add((VORTEX[safe_u], VORTEX[pred_clean], Literal(clean_val, datatype=rdf_dtype)))
print(f"✅ [RDF] Ingestion terminée. {len(self.g)} triplets.")
def execute_sparql(self, query):
try:
full_query = f"""
PREFIX vortex: <http://vortex.ai/ontology#>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
{query}
"""
results = self.g.query(full_query)
output = []
for row in results:
line = []
for item in row:
val = str(item).replace("http://vortex.ai/ontology#", "")
line.append(val)
output.append(" | ".join(line))
if not output: return "Aucun résultat trouvé."
return "\n".join(output[:100])
except Exception as e:
return f"❌ Erreur SPARQL : {str(e)}"
def get_full_schema(self):
if len(self.g) == 0: return {"classes": {}, "predicates": [], "triples": 0}
schema = {"classes": {}, "predicates": [], "triples": len(self.g)}
q_classes = "SELECT ?type (COUNT(?s) as ?count) WHERE { ?s a ?type } GROUP BY ?type"
for row in self.g.query(q_classes):
cls_name = str(row[0]).split('#')[-1]
schema["classes"][cls_name] = int(row[1])
q_preds = "SELECT DISTINCT ?p WHERE { ?s ?p ?o FILTER(STRSTARTS(STR(?p), 'http://vortex.ai/ontology#')) }"
for row in self.g.query(q_preds):
pred = str(row[0]).split('#')[-1]
schema["predicates"].append(pred)
return schema