| import streamlit as st |
| from rdflib import Graph, RDFS, Namespace, URIRef, RDF, Literal |
| from pyvis.network import Network |
| import streamlit.components.v1 as components |
|
|
| |
| g = Graph() |
| g.parse("../KnowledgeGraph/gdpr_policy_graph.ttl", format="ttl") |
|
|
| BASE_URI = "http://example.org/gdpr#" |
| EX = Namespace(BASE_URI) |
|
|
| |
| st.set_page_config(layout="wide") |
| st.title("๐ธ๏ธ GDPR Knowledge Graph Visualizer") |
|
|
| |
| articles = sorted({str(o).split(" ")[1] for s, p, o in g.triples((None, RDFS.label, None)) if "Article" in str(o)}) |
| sections = sorted({ |
| str(label) |
| for sec in g.subjects(RDF.type, EX.PolicySection) |
| for label in g.objects(sec, RDFS.label) |
| }) |
|
|
| col1, col2 = st.columns([1, 3]) |
| selected_article = col1.selectbox("๐ Filter by Article Number", ["All"] + articles) |
| selected_section = col2.selectbox("๐ Filter by Policy Section", ["All"] + sections) |
|
|
| |
| def get_related_nodes_for_section(section_label): |
| |
| matching_sections = [ |
| s for s, p, o in g.triples((None, RDFS.label, Literal(section_label))) |
| if (s, RDF.type, EX.PolicySection) in g |
| ] |
| if not matching_sections: |
| return set() |
| sec_node = matching_sections[0] |
|
|
| clause_nodes = set(o for _, _, o in g.triples((sec_node, EX.relatesToClause, None))) |
| article_nodes = set(o for c in clause_nodes for _, _, o in g.triples((c, EX.partOf, None))) |
| return {sec_node} | clause_nodes | article_nodes |
|
|
| def get_related_nodes_for_article(article_number): |
| |
| article_nodes = set( |
| s for s, p, o in g.triples((None, RDFS.label, None)) |
| if (s, RDF.type, EX.Article) in g and f"Article {article_number}" in str(o) |
| ) |
| if not article_nodes: |
| return set() |
| article_node = list(article_nodes)[0] |
|
|
| |
| clause_nodes = set(o for _, _, o in g.triples((None, EX.partOf, article_node))) |
|
|
| |
| policy_sections = set( |
| s for s, p, o in g.triples((None, EX.relatesToClause, None)) if o in clause_nodes |
| ) |
| return {article_node} | clause_nodes | policy_sections |
|
|
| def expand_with_neighbors(graph, nodes): |
| expanded = set(nodes) |
| for node in nodes: |
| |
| for _, _, o in graph.triples((node, None, None)): |
| expanded.add(o) |
| |
| for s, _, _ in graph.triples((None, None, node)): |
| expanded.add(s) |
| return expanded |
|
|
| if selected_section != "All" and selected_article != "All": |
| |
| nodes_for_section = get_related_nodes_for_section(selected_section) |
| nodes_for_article = get_related_nodes_for_article(selected_article) |
| base_nodes = nodes_for_section & nodes_for_article |
| elif selected_section != "All": |
| base_nodes = get_related_nodes_for_section(selected_section) |
| elif selected_article != "All": |
| base_nodes = get_related_nodes_for_article(selected_article) |
| else: |
| |
| base_nodes = set() |
| for s, p, o in g: |
| base_nodes.add(s) |
| base_nodes.add(o) |
|
|
| nodes_to_show = expand_with_neighbors(g, base_nodes) |
| |
| net = Network(height="750px", width="100%", bgcolor="#ffffff", font_color="black") |
| net.force_atlas_2based(gravity=-30) |
|
|
| added_nodes = set() |
|
|
| def get_type(uri): |
| for s, p, o in g.triples((uri, RDFS.label, None)): |
| label = str(o) |
| if label.startswith("Article"): |
| return "article" |
| elif label.startswith("Art."): |
| return "clause" |
| elif label.startswith("Section"): |
| return "section" |
| return "unknown" |
|
|
| def get_label(g, node): |
| for _, _, label in g.triples((node, RDFS.label, None)): |
| return str(label) |
| return None |
|
|
| def color_by_type(node_type): |
| return { |
| "article": "#77B5FE", |
| "clause": "#81C784", |
| "section": "#FFB74D", |
| }.get(node_type, "#D3D3D3") |
|
|
| |
| for node in nodes_to_show: |
| label = get_label(g, node) or (str(node).split("#")[-1] if isinstance(node, URIRef) else str(node)) |
| n_type = get_type(node) |
| tooltip = label |
| for val in g.objects(node, EX.similarityScore): |
| tooltip += f"\nSimilarity: {val}" |
| for val in g.objects(node, RDFS.comment): |
| tooltip += f"\n{val}" |
| net.add_node(str(node), label=label, title=tooltip, color=color_by_type(n_type)) |
| added_nodes.add(str(node)) |
|
|
| |
| for s, p, o in g: |
| if s in nodes_to_show and o in nodes_to_show: |
| pred_label = p.split("#")[-1] if isinstance(p, URIRef) else str(p) |
| net.add_edge(str(s), str(o), label=pred_label) |
|
|
| |
| net.save_graph("graph.html") |
| with open("graph.html", "r", encoding="utf-8") as f: |
| html = f.read() |
|
|
| components.html(html, height=780, scrolling=True) |