Spaces:
Sleeping
Sleeping
| # src/knowledge_graph.py | |
| import networkx as nx | |
| from pyvis.network import Network | |
| import json | |
| from typing import Dict, List, Any, Optional, Set, Tuple | |
| import matplotlib.pyplot as plt | |
| import matplotlib.colors as mcolors | |
| from collections import defaultdict | |
| class KnowledgeGraph: | |
| """ | |
| Handles the construction and visualization of knowledge graphs | |
| based on the ontology data. | |
| """ | |
| def __init__(self, ontology_manager=None): | |
| """ | |
| Initialize the knowledge graph handler. | |
| Args: | |
| ontology_manager: Optional ontology manager instance | |
| """ | |
| self.ontology_manager = ontology_manager | |
| self.graph = None | |
| if ontology_manager: | |
| self.graph = ontology_manager.graph | |
| def build_visualization_graph( | |
| self, | |
| include_classes: bool = True, | |
| include_instances: bool = True, | |
| central_entity: Optional[str] = None, | |
| max_distance: int = 2, | |
| include_properties: bool = False | |
| ) -> nx.Graph: | |
| """ | |
| Build a simplified graph for visualization purposes. | |
| Args: | |
| include_classes: Whether to include class nodes | |
| include_instances: Whether to include instance nodes | |
| central_entity: Optional central entity to focus the graph on | |
| max_distance: Maximum distance from central entity to include | |
| include_properties: Whether to include property nodes | |
| Returns: | |
| A NetworkX graph suitable for visualization | |
| """ | |
| if not self.graph: | |
| return nx.Graph() | |
| # Create an undirected graph for visualization | |
| viz_graph = nx.Graph() | |
| # If we have a central entity, extract a subgraph around it | |
| if central_entity and central_entity in self.graph: | |
| # Get nodes within max_distance of central_entity | |
| nodes_to_include = set([central_entity]) | |
| current_distance = 0 | |
| current_layer = set([central_entity]) | |
| while current_distance < max_distance: | |
| next_layer = set() | |
| for node in current_layer: | |
| # Get neighbors | |
| neighbors = set(self.graph.successors(node)).union(set(self.graph.predecessors(node))) | |
| next_layer.update(neighbors) | |
| nodes_to_include.update(next_layer) | |
| current_layer = next_layer | |
| current_distance += 1 | |
| # Create subgraph | |
| subgraph = self.graph.subgraph(nodes_to_include) | |
| else: | |
| subgraph = self.graph | |
| # Add nodes to the visualization graph | |
| for node, data in subgraph.nodes(data=True): | |
| node_type = data.get("type") | |
| # Skip nodes based on configuration | |
| if node_type == "class" and not include_classes: | |
| continue | |
| if node_type == "instance" and not include_instances: | |
| continue | |
| # Get readable name for the node | |
| if node_type == "instance" and "properties" in data: | |
| label = data["properties"].get("name", node) | |
| else: | |
| label = node | |
| # Set node attributes for visualization | |
| viz_attrs = { | |
| "id": node, | |
| "label": label, | |
| "title": self._get_node_tooltip(node, data), | |
| "group": data.get("class_type", node_type), | |
| "shape": "dot" if node_type == "instance" else "diamond" | |
| } | |
| # Highlight central entity if specified | |
| if central_entity and node == central_entity: | |
| viz_attrs["color"] = "#ff7f0e" # Orange for central entity | |
| viz_attrs["size"] = 25 # Larger size for central entity | |
| # Add the node | |
| viz_graph.add_node(node, **viz_attrs) | |
| # Add property nodes if configured | |
| if include_properties and node_type == "instance" and "properties" in data: | |
| for prop_name, prop_value in data["properties"].items(): | |
| # Create a property node | |
| prop_node_id = f"{node}_{prop_name}" | |
| prop_value_str = str(prop_value) | |
| if len(prop_value_str) > 20: | |
| prop_value_str = prop_value_str[:17] + "..." | |
| viz_graph.add_node( | |
| prop_node_id, | |
| id=prop_node_id, | |
| label=f"{prop_name}: {prop_value_str}", | |
| title=f"{prop_name}: {prop_value}", | |
| group="property", | |
| shape="ellipse", | |
| size=5 | |
| ) | |
| # Connect instance to property | |
| viz_graph.add_edge(node, prop_node_id, label="has_property", dashes=True) | |
| # Add edges to the visualization graph | |
| for source, target, data in subgraph.edges(data=True): | |
| # Only include edges between nodes that are in the viz_graph | |
| if source in viz_graph and target in viz_graph: | |
| # Skip property-related edges if we're manually creating them | |
| if include_properties and ( | |
| source.startswith(target + "_") or target.startswith(source + "_") | |
| ): | |
| continue | |
| # Set edge attributes | |
| edge_type = data.get("type", "unknown") | |
| # Don't show subClassOf and instanceOf relationships if not explicitly requested | |
| if edge_type in ["subClassOf", "instanceOf"] and not include_classes: | |
| continue | |
| viz_graph.add_edge(source, target, label=edge_type, title=edge_type) | |
| return viz_graph | |
| def _get_node_tooltip(self, node_id: str, data: Dict) -> str: | |
| """Generate a tooltip for a node.""" | |
| tooltip = f"<strong>ID:</strong> {node_id}<br>" | |
| node_type = data.get("type") | |
| if node_type: | |
| tooltip += f"<strong>Type:</strong> {node_type}<br>" | |
| if node_type == "instance": | |
| tooltip += f"<strong>Class:</strong> {data.get('class_type', 'unknown')}<br>" | |
| # Add properties | |
| if "properties" in data: | |
| tooltip += "<strong>Properties:</strong><br>" | |
| for key, value in data["properties"].items(): | |
| tooltip += f"- {key}: {value}<br>" | |
| elif node_type == "class": | |
| tooltip += f"<strong>Description:</strong> {data.get('description', '')}<br>" | |
| # Add properties if available | |
| if "properties" in data: | |
| tooltip += "<strong>Properties:</strong> " + ", ".join(data["properties"]) + "<br>" | |
| return tooltip | |
| def generate_html_visualization( | |
| self, | |
| include_classes: bool = True, | |
| include_instances: bool = True, | |
| central_entity: Optional[str] = None, | |
| max_distance: int = 2, | |
| include_properties: bool = False, | |
| height: str = "600px", | |
| width: str = "100%", | |
| bgcolor: str = "#ffffff", | |
| font_color: str = "#000000", | |
| layout_algorithm: str = "force-directed" | |
| ) -> str: | |
| """ | |
| Generate an HTML visualization of the knowledge graph. | |
| Args: | |
| include_classes: Whether to include class nodes | |
| include_instances: Whether to include instance nodes | |
| central_entity: Optional central entity to focus the graph on | |
| max_distance: Maximum distance from central entity to include | |
| include_properties: Whether to include property nodes | |
| height: Height of the visualization | |
| width: Width of the visualization | |
| bgcolor: Background color | |
| font_color: Font color | |
| layout_algorithm: Algorithm for layout ('force-directed', 'hierarchical', 'radial', 'circular') | |
| Returns: | |
| HTML string containing the visualization | |
| """ | |
| # Build the visualization graph | |
| viz_graph = self.build_visualization_graph( | |
| include_classes=include_classes, | |
| include_instances=include_instances, | |
| central_entity=central_entity, | |
| max_distance=max_distance, | |
| include_properties=include_properties | |
| ) | |
| # Create a PyVis network | |
| net = Network(height=height, width=width, bgcolor=bgcolor, font_color=font_color, directed=True) | |
| # Configure physics based on the selected layout algorithm | |
| if layout_algorithm == "force-directed": | |
| physics_options = { | |
| "enabled": True, | |
| "solver": "forceAtlas2Based", | |
| "forceAtlas2Based": { | |
| "gravitationalConstant": -50, | |
| "centralGravity": 0.01, | |
| "springLength": 100, | |
| "springConstant": 0.08 | |
| }, | |
| "stabilization": { | |
| "enabled": True, | |
| "iterations": 100 | |
| } | |
| } | |
| elif layout_algorithm == "hierarchical": | |
| physics_options = { | |
| "enabled": True, | |
| "hierarchicalRepulsion": { | |
| "centralGravity": 0.0, | |
| "springLength": 100, | |
| "springConstant": 0.01, | |
| "nodeDistance": 120 | |
| }, | |
| "solver": "hierarchicalRepulsion", | |
| "stabilization": { | |
| "enabled": True, | |
| "iterations": 100 | |
| } | |
| } | |
| # Set hierarchical layout | |
| net.set_options(""" | |
| var options = { | |
| "layout": { | |
| "hierarchical": { | |
| "enabled": true, | |
| "direction": "UD", | |
| "sortMethod": "directed", | |
| "nodeSpacing": 150, | |
| "treeSpacing": 200 | |
| } | |
| } | |
| } | |
| """) | |
| elif layout_algorithm == "radial": | |
| physics_options = { | |
| "enabled": True, | |
| "solver": "repulsion", | |
| "repulsion": { | |
| "nodeDistance": 120, | |
| "centralGravity": 0.2, | |
| "springLength": 200, | |
| "springConstant": 0.05 | |
| }, | |
| "stabilization": { | |
| "enabled": True, | |
| "iterations": 100 | |
| } | |
| } | |
| elif layout_algorithm == "circular": | |
| physics_options = { | |
| "enabled": False | |
| } | |
| # Compute circular layout and set fixed positions | |
| pos = nx.circular_layout(viz_graph) | |
| for node_id, coords in pos.items(): | |
| if node_id in viz_graph.nodes: | |
| x, y = coords | |
| viz_graph.nodes[node_id]['x'] = float(x) * 500 | |
| viz_graph.nodes[node_id]['y'] = float(y) * 500 | |
| viz_graph.nodes[node_id]['physics'] = False | |
| # Configure other options | |
| options = { | |
| "nodes": { | |
| "font": {"size": 12}, | |
| "scaling": {"min": 10, "max": 30} | |
| }, | |
| "edges": { | |
| "color": {"inherit": True}, | |
| "smooth": {"enabled": True, "type": "dynamic"}, | |
| "arrows": {"to": {"enabled": True, "scaleFactor": 0.5}}, | |
| "font": {"size": 10, "align": "middle"} | |
| }, | |
| "physics": physics_options, | |
| "interaction": { | |
| "hover": True, | |
| "navigationButtons": True, | |
| "keyboard": True, | |
| "tooltipDelay": 100 | |
| } | |
| } | |
| # Set options and create the network | |
| net.options = options | |
| net.from_nx(viz_graph) | |
| # Add custom CSS for better visualization | |
| custom_css = """ | |
| <style> | |
| .vis-network { | |
| border: 1px solid #ddd; | |
| border-radius: 5px; | |
| } | |
| .vis-tooltip { | |
| position: absolute; | |
| background-color: #f5f5f5; | |
| border: 1px solid #ccc; | |
| border-radius: 4px; | |
| padding: 10px; | |
| font-family: Arial, sans-serif; | |
| font-size: 12px; | |
| color: #333; | |
| max-width: 300px; | |
| z-index: 9999; | |
| box-shadow: 0 2px 4px rgba(0,0,0,0.1); | |
| } | |
| </style> | |
| """ | |
| # Generate the HTML and add custom CSS | |
| html = net.generate_html() | |
| html = html.replace("<style>", custom_css + "<style>") | |
| # Add legend | |
| legend_html = self._generate_legend_html(viz_graph) | |
| html = html.replace("</body>", legend_html + "</body>") | |
| return html | |
| def _generate_legend_html(self, graph: nx.Graph) -> str: | |
| """Generate a legend for the visualization.""" | |
| # Collect unique groups | |
| groups = set() | |
| for _, attrs in graph.nodes(data=True): | |
| if "group" in attrs and attrs["group"] is not None: | |
| groups.add(attrs["group"]) | |
| # 過濾並排序groups,確保沒有None值 | |
| sorted_groups = sorted([g for g in groups if g is not None]) | |
| # Generate HTML for legend | |
| legend_html = """ | |
| <div id="graph-legend" style="position: absolute; top: 10px; right: 10px; background-color: rgba(255,255,255,0.8); | |
| padding: 10px; border-radius: 5px; border: 1px solid #ddd; max-width: 200px;"> | |
| <strong>Legend:</strong> | |
| <ul style="list-style-type: none; padding-left: 0; margin-top: 5px;"> | |
| """ | |
| # Add items for each group | |
| for group in sorted_groups: | |
| color = "#97c2fc" # Default color | |
| if group == "property": | |
| color = "#ffcc99" | |
| elif group == "class": | |
| color = "#a1d3a2" | |
| legend_html += f""" | |
| <li style="margin-bottom: 5px;"> | |
| <span style="display: inline-block; width: 12px; height: 12px; border-radius: 50%; | |
| background-color: {color}; margin-right: 5px;"></span> | |
| {group} | |
| </li> | |
| """ | |
| # Close the legend container | |
| legend_html += """ | |
| </ul> | |
| <div style="font-size: 10px; margin-top: 5px; color: #666;"> | |
| Double-click to zoom, drag to pan, scroll to zoom in/out | |
| </div> | |
| </div> | |
| """ | |
| return legend_html | |
| def get_graph_statistics(self) -> Dict[str, Any]: | |
| """ | |
| Calculate statistics about the knowledge graph. | |
| Returns: | |
| A dictionary containing graph statistics | |
| """ | |
| if not self.graph: | |
| return {} | |
| # Count nodes by type | |
| class_count = 0 | |
| instance_count = 0 | |
| property_count = 0 | |
| for _, data in self.graph.nodes(data=True): | |
| node_type = data.get("type") | |
| if node_type == "class": | |
| class_count += 1 | |
| elif node_type == "instance": | |
| instance_count += 1 | |
| if "properties" in data: | |
| property_count += len(data["properties"]) | |
| # Count edges by type | |
| relationship_counts = {} | |
| for _, _, data in self.graph.edges(data=True): | |
| rel_type = data.get("type", "unknown") | |
| relationship_counts[rel_type] = relationship_counts.get(rel_type, 0) + 1 | |
| # Calculate graph metrics | |
| try: | |
| # Some metrics only work on undirected graphs | |
| undirected = nx.Graph(self.graph) | |
| avg_degree = sum(dict(undirected.degree()).values()) / undirected.number_of_nodes() | |
| # Only calculate these if the graph is connected | |
| if nx.is_connected(undirected): | |
| avg_path_length = nx.average_shortest_path_length(undirected) | |
| diameter = nx.diameter(undirected) | |
| else: | |
| # Get the largest connected component | |
| largest_cc = max(nx.connected_components(undirected), key=len) | |
| largest_cc_subgraph = undirected.subgraph(largest_cc) | |
| avg_path_length = nx.average_shortest_path_length(largest_cc_subgraph) | |
| diameter = nx.diameter(largest_cc_subgraph) | |
| # Calculate density | |
| density = nx.density(self.graph) | |
| # Calculate clustering coefficient | |
| clustering = nx.average_clustering(undirected) | |
| except: | |
| avg_degree = 0 | |
| avg_path_length = 0 | |
| diameter = 0 | |
| density = 0 | |
| clustering = 0 | |
| # Count different entity types | |
| class_counts = defaultdict(int) | |
| for _, data in self.graph.nodes(data=True): | |
| if data.get("type") == "instance": | |
| class_type = data.get("class_type", "unknown") | |
| class_counts[class_type] += 1 | |
| # Get nodes with highest centrality | |
| try: | |
| betweenness = nx.betweenness_centrality(self.graph) | |
| degree = nx.degree_centrality(self.graph) | |
| # Get top 5 nodes by betweenness centrality | |
| top_betweenness = sorted(betweenness.items(), key=lambda x: x[1], reverse=True)[:5] | |
| top_degree = sorted(degree.items(), key=lambda x: x[1], reverse=True)[:5] | |
| central_nodes = { | |
| "betweenness": [{"node": node, "centrality": round(cent, 3)} for node, cent in top_betweenness], | |
| "degree": [{"node": node, "centrality": round(cent, 3)} for node, cent in top_degree] | |
| } | |
| except: | |
| central_nodes = {} | |
| return { | |
| "node_count": self.graph.number_of_nodes(), | |
| "edge_count": self.graph.number_of_edges(), | |
| "class_count": class_count, | |
| "instance_count": instance_count, | |
| "property_count": property_count, | |
| "relationship_counts": relationship_counts, | |
| "class_instance_counts": dict(class_counts), | |
| "average_degree": avg_degree, | |
| "average_path_length": avg_path_length, | |
| "diameter": diameter, | |
| "density": density, | |
| "clustering_coefficient": clustering, | |
| "central_nodes": central_nodes | |
| } | |
| def find_paths_between_entities( | |
| self, | |
| source_entity: str, | |
| target_entity: str, | |
| max_length: int = 3 | |
| ) -> List[List[Dict]]: | |
| """ | |
| Find all paths between two entities up to a maximum length. | |
| Args: | |
| source_entity: Starting entity ID | |
| target_entity: Target entity ID | |
| max_length: Maximum path length | |
| Returns: | |
| A list of paths, where each path is a list of edge dictionaries | |
| """ | |
| if not self.graph or source_entity not in self.graph or target_entity not in self.graph: | |
| return [] | |
| # Use networkx to find simple paths | |
| try: | |
| simple_paths = list(nx.all_simple_paths( | |
| self.graph, source_entity, target_entity, cutoff=max_length | |
| )) | |
| except (nx.NetworkXNoPath, nx.NodeNotFound): | |
| return [] | |
| # Convert paths to edge sequences | |
| paths = [] | |
| for path in simple_paths: | |
| edge_sequence = [] | |
| for i in range(len(path) - 1): | |
| source = path[i] | |
| target = path[i + 1] | |
| # There may be multiple edges between nodes | |
| edges = self.graph.get_edge_data(source, target) | |
| if edges: | |
| for key, data in edges.items(): | |
| edge_sequence.append({ | |
| "source": source, | |
| "target": target, | |
| "type": data.get("type", "unknown") | |
| }) | |
| # Only include the path if it has meaningful relationships | |
| # Filter out paths that only contain structural relationships like subClassOf, instanceOf | |
| meaningful_relationships = [edge for edge in edge_sequence | |
| if edge["type"] not in ["subClassOf", "instanceOf"]] | |
| if meaningful_relationships: | |
| paths.append(edge_sequence) | |
| # Sort paths by length (shorter paths first) | |
| paths.sort(key=len) | |
| return paths | |
| def get_entity_neighborhood( | |
| self, | |
| entity_id: str, | |
| max_distance: int = 1, | |
| include_classes: bool = True | |
| ) -> Dict[str, Any]: | |
| """ | |
| Get the neighborhood of an entity. | |
| Args: | |
| entity_id: The central entity ID | |
| max_distance: Maximum distance from the central entity | |
| include_classes: Whether to include class relationships | |
| Returns: | |
| A dictionary containing the neighborhood information | |
| """ | |
| if not self.graph or entity_id not in self.graph: | |
| return {} | |
| # Get nodes within max_distance of entity_id using BFS | |
| nodes_at_distance = {0: [entity_id]} | |
| visited = set([entity_id]) | |
| for distance in range(1, max_distance + 1): | |
| nodes_at_distance[distance] = [] | |
| for node in nodes_at_distance[distance - 1]: | |
| # Get neighbors | |
| neighbors = list(self.graph.successors(node)) + list(self.graph.predecessors(node)) | |
| for neighbor in neighbors: | |
| # Skip class nodes if not including classes | |
| neighbor_data = self.graph.nodes.get(neighbor, {}) | |
| if not include_classes and neighbor_data.get("type") == "class": | |
| continue | |
| if neighbor not in visited: | |
| nodes_at_distance[distance].append(neighbor) | |
| visited.add(neighbor) | |
| # Flatten the nodes | |
| all_nodes = [node for nodes in nodes_at_distance.values() for node in nodes] | |
| # Extract the subgraph | |
| subgraph = self.graph.subgraph(all_nodes) | |
| # Build neighbor information | |
| neighbors = [] | |
| for node in all_nodes: | |
| if node == entity_id: | |
| continue | |
| node_data = self.graph.nodes[node] | |
| # Determine the relations to central entity | |
| relations = [] | |
| # Check direct relationships | |
| # Check if central entity is source | |
| edges_out = self.graph.get_edge_data(entity_id, node) | |
| if edges_out: | |
| for key, data in edges_out.items(): | |
| rel_type = data.get("type", "unknown") | |
| # Skip structural relationships if not including classes | |
| if not include_classes and rel_type in ["subClassOf", "instanceOf"]: | |
| continue | |
| relations.append({ | |
| "type": rel_type, | |
| "direction": "outgoing" | |
| }) | |
| # Check if central entity is target | |
| edges_in = self.graph.get_edge_data(node, entity_id) | |
| if edges_in: | |
| for key, data in edges_in.items(): | |
| rel_type = data.get("type", "unknown") | |
| # Skip structural relationships if not including classes | |
| if not include_classes and rel_type in ["subClassOf", "instanceOf"]: | |
| continue | |
| relations.append({ | |
| "type": rel_type, | |
| "direction": "incoming" | |
| }) | |
| # Also find paths through intermediate nodes (indirect relationships) | |
| if not relations: # Only look for indirect if no direct relationships | |
| for path_length in range(2, max_distance + 1): | |
| try: | |
| # Find paths of exactly length path_length | |
| paths = list(nx.all_simple_paths( | |
| self.graph, entity_id, node, cutoff=path_length, min_edges=path_length | |
| )) | |
| for path in paths: | |
| if len(path) > 1: # Path should have at least 2 nodes | |
| intermediate_nodes = path[1:-1] # Skip source and target | |
| # Format the path as a relation | |
| path_relation = { | |
| "type": "indirect_connection", | |
| "direction": "outgoing", | |
| "path_length": len(path) - 1, | |
| "intermediates": intermediate_nodes | |
| } | |
| relations.append(path_relation) | |
| # Only need one example of an indirect path | |
| break | |
| except (nx.NetworkXNoPath, nx.NodeNotFound): | |
| pass | |
| # Only include neighbors with relations | |
| if relations: | |
| neighbors.append({ | |
| "id": node, | |
| "type": node_data.get("type"), | |
| "class_type": node_data.get("class_type"), | |
| "properties": node_data.get("properties", {}), | |
| "relations": relations, | |
| "distance": next(dist for dist, nodes in nodes_at_distance.items() if node in nodes) | |
| }) | |
| # Group neighbors by distance | |
| neighbors_by_distance = defaultdict(list) | |
| for neighbor in neighbors: | |
| neighbors_by_distance[neighbor["distance"]].append(neighbor) | |
| # Get central entity info | |
| central_data = self.graph.nodes[entity_id] | |
| return { | |
| "central_entity": { | |
| "id": entity_id, | |
| "type": central_data.get("type"), | |
| "class_type": central_data.get("class_type", ""), | |
| "properties": central_data.get("properties", {}) | |
| }, | |
| "neighbors": neighbors, | |
| "neighbors_by_distance": dict(neighbors_by_distance), | |
| "total_neighbors": len(neighbors) | |
| } | |
| def find_common_patterns(self) -> List[Dict[str, Any]]: | |
| """ | |
| Find common patterns and structures in the knowledge graph. | |
| Returns: | |
| A list of pattern dictionaries | |
| """ | |
| if not self.graph: | |
| return [] | |
| patterns = [] | |
| # Find common relationship patterns | |
| relationship_patterns = self._find_relationship_patterns() | |
| if relationship_patterns: | |
| patterns.extend(relationship_patterns) | |
| # Find hub entities (entities with many connections) | |
| hub_entities = self._find_hub_entities() | |
| if hub_entities: | |
| patterns.append({ | |
| "type": "hub_entities", | |
| "description": "Entities with high connectivity serving as knowledge hubs", | |
| "entities": hub_entities | |
| }) | |
| # Find common property patterns | |
| property_patterns = self._find_property_patterns() | |
| if property_patterns: | |
| patterns.extend(property_patterns) | |
| return patterns | |
| def _find_relationship_patterns(self) -> List[Dict[str, Any]]: | |
| """Find common relationship patterns in the graph.""" | |
| # Count relationship triplets (source_type, relation, target_type) | |
| triplet_counts = defaultdict(int) | |
| for source, target, data in self.graph.edges(data=True): | |
| rel_type = data.get("type", "unknown") | |
| # Skip structural relationships | |
| if rel_type in ["subClassOf", "instanceOf"]: | |
| continue | |
| # Get node types | |
| source_data = self.graph.nodes[source] | |
| target_data = self.graph.nodes[target] | |
| source_type = ( | |
| source_data.get("class_type") | |
| if source_data.get("type") == "instance" | |
| else source_data.get("type") | |
| ) | |
| target_type = ( | |
| target_data.get("class_type") | |
| if target_data.get("type") == "instance" | |
| else target_data.get("type") | |
| ) | |
| if source_type and target_type: | |
| triplet = (source_type, rel_type, target_type) | |
| triplet_counts[triplet] += 1 | |
| # Get patterns with significant frequency (more than 1 occurrence) | |
| patterns = [] | |
| for triplet, count in triplet_counts.items(): | |
| if count > 1: | |
| source_type, rel_type, target_type = triplet | |
| # Find examples of this pattern | |
| examples = [] | |
| for source, target, data in self.graph.edges(data=True): | |
| if len(examples) >= 3: # Limit to 3 examples | |
| break | |
| rel = data.get("type", "unknown") | |
| if rel != rel_type: | |
| continue | |
| source_data = self.graph.nodes[source] | |
| target_data = self.graph.nodes[target] | |
| current_source_type = ( | |
| source_data.get("class_type") | |
| if source_data.get("type") == "instance" | |
| else source_data.get("type") | |
| ) | |
| current_target_type = ( | |
| target_data.get("class_type") | |
| if target_data.get("type") == "instance" | |
| else target_data.get("type") | |
| ) | |
| if current_source_type == source_type and current_target_type == target_type: | |
| # Get readable names if available | |
| source_name = source | |
| if source_data.get("type") == "instance" and "properties" in source_data: | |
| properties = source_data["properties"] | |
| if "name" in properties: | |
| source_name = properties["name"] | |
| target_name = target | |
| if target_data.get("type") == "instance" and "properties" in target_data: | |
| properties = target_data["properties"] | |
| if "name" in properties: | |
| target_name = properties["name"] | |
| examples.append({ | |
| "source": source, | |
| "source_name": source_name, | |
| "target": target, | |
| "target_name": target_name, | |
| "relationship": rel_type | |
| }) | |
| patterns.append({ | |
| "type": "relationship_pattern", | |
| "description": f"{source_type} {rel_type} {target_type}", | |
| "source_type": source_type, | |
| "relationship": rel_type, | |
| "target_type": target_type, | |
| "count": count, | |
| "examples": examples | |
| }) | |
| patterns.sort(key=lambda x: x["count"], reverse=True) | |
| return patterns | |
| def _find_hub_entities(self) -> List[Dict[str, Any]]: | |
| """Find entities that serve as hubs (many connections).""" | |
| # Calculate degree centrality | |
| degree = nx.degree_centrality(self.graph) | |
| # Get top entities by degree | |
| top_entities = sorted(degree.items(), key=lambda x: x[1], reverse=True)[:10] | |
| hub_entities = [] | |
| for node, centrality in top_entities: | |
| node_data = self.graph.nodes[node] | |
| node_type = node_data.get("type") | |
| # Only consider instance nodes | |
| if node_type == "instance": | |
| # Get class type | |
| class_type = node_data.get("class_type", "unknown") | |
| # Get name if available | |
| name = node | |
| if "properties" in node_data and "name" in node_data["properties"]: | |
| name = node_data["properties"]["name"] | |
| # Count relationships by type | |
| relationships = defaultdict(int) | |
| for _, _, data in self.graph.edges(data=True, nbunch=[node]): | |
| rel_type = data.get("type", "unknown") | |
| if rel_type not in ["subClassOf", "instanceOf"]: | |
| relationships[rel_type] += 1 | |
| hub_entities.append({ | |
| "id": node, | |
| "name": name, | |
| "type": class_type, | |
| "centrality": centrality, | |
| "relationships": dict(relationships), | |
| "total_connections": sum(relationships.values()) | |
| }) | |
| # Sort by total connections | |
| hub_entities.sort(key=lambda x: x["total_connections"], reverse=True) | |
| return hub_entities | |
| def _find_property_patterns(self) -> List[Dict[str, Any]]: | |
| """Find common property patterns in instance data.""" | |
| # Track properties by class type | |
| properties_by_class = defaultdict(lambda: defaultdict(int)) | |
| for node, data in self.graph.nodes(data=True): | |
| if data.get("type") == "instance": | |
| class_type = data.get("class_type", "unknown") | |
| if "properties" in data: | |
| for prop in data["properties"].keys(): | |
| properties_by_class[class_type][prop] += 1 | |
| # Find common property combinations | |
| patterns = [] | |
| for class_type, props in properties_by_class.items(): | |
| # Sort properties by frequency | |
| sorted_props = sorted(props.items(), key=lambda x: x[1], reverse=True) | |
| # Only include classes with multiple instances | |
| class_instances = sum(1 for _, data in self.graph.nodes(data=True) | |
| if data.get("type") == "instance" and data.get("class_type") == class_type) | |
| if class_instances > 1: | |
| common_props = [prop for prop, count in sorted_props if count > 1] | |
| if common_props: | |
| patterns.append({ | |
| "type": "property_pattern", | |
| "description": f"Common properties for {class_type} instances", | |
| "class_type": class_type, | |
| "instance_count": class_instances, | |
| "common_properties": common_props, | |
| "property_frequencies": {prop: count for prop, count in sorted_props} | |
| }) | |
| return patterns |