import ast import hashlib def get_category_id(category): """Maps categorical roles to integers for vector embedding.""" mapping = { 'unknown': 0, 'import': 1, 'function': 2, 'class': 3, 'if': 4, 'while': 5, 'for': 6, 'try': 7, 'expression': 8, 'spacer': 9, 'elif': 10, 'else': 11, 'except': 12, 'return': 13, 'assigned_variable': 14, 'variable_def': 15 } return mapping.get(category, 0) def create_vector(category, level, location, total_lines, parent_path): """ Creates a 6D normalized vector: [Category, Depth, RelativeCenter, Density, ParentDepth, AncestryWeight] """ cat_id = get_category_id(category) start, end = location total_lines = max(1, total_lines) # metrics span = (end - start + 1) / total_lines center = ((start + end) / 2) / total_lines parent_depth = len(parent_path) # Ancestry weight: Simple hash sum of parent IDs to represent unique path path_str = "".join(parent_path) parent_weight = (int(hashlib.md5(path_str.encode()).hexdigest(), 16) % 100) / 100.0 return [ cat_id, level, float(f"{center:.4f}"), float(f"{span:.4f}"), parent_depth, float(f"{parent_weight:.4f}") ] def parse_source_to_graph(code): try: tree = ast.parse(code) except SyntaxError as e: return {"error": f"Syntax Error on line {e.lineno}: {e.msg}"} lines = code.splitlines(keepends=True) total_lines = len(lines) nodes = [] # Recursive visitor def traverse(node, parent_path=[], level=0, parent_id=None): category = 'other' name = getattr(node, 'name', None) # Unique Node ID based on position to ensure consistency node_id = f"{type(node).__name__}_{getattr(node, 'lineno', 0)}_{getattr(node, 'col_offset', 0)}" # Categorization logic if isinstance(node, (ast.Import, ast.ImportFrom)): category = 'import'; name = "import" elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): category = 'function' elif isinstance(node, ast.ClassDef): category = 'class' elif isinstance(node, ast.If): category = 'if'; name = "if" elif isinstance(node, (ast.For, ast.AsyncFor)): category = 'for'; name = "for" elif isinstance(node, ast.While): category = 'while'; name = "while" elif isinstance(node, ast.Return): category = 'return'; name = "return" elif isinstance(node, (ast.Assign, ast.AnnAssign)): category = 'assigned_variable'; name = "assignment" elif isinstance(node, ast.Expr): category = 'expression'; name = "expr" elif isinstance(node, ast.Try): category = 'try'; name = "try" elif isinstance(node, ast.ExceptHandler): category = 'except'; name = "except" lineno = getattr(node, 'lineno', 0) end_lineno = getattr(node, 'end_lineno', lineno) if lineno == 0: return # Skip nodes without line numbers (e.g. Load context) # Create source snippet source_segment = "".join(lines[lineno-1:end_lineno]) # Determine Label label = name if name else category if category == 'assigned_variable': targets = getattr(node, 'targets', []) or [getattr(node, 'target', None)] if targets and isinstance(targets[0], ast.Name): label = f"{targets[0].id} =" vector = create_vector(category, level, (lineno, end_lineno), total_lines, parent_path) node_data = { "id": node_id, "label": label, "type": category, "source": source_segment.strip(), "vector": vector, "level": level, "lineno": lineno, "parent_id": parent_id } # Filter: Only visualize structural elements (skip raw expressions unless useful) if category != 'other': nodes.append(node_data) current_path = parent_path + [node_id] current_parent = node_id next_level = level + 1 else: current_path = parent_path current_parent = parent_id next_level = level for child in ast.iter_child_nodes(node): traverse(child, current_path, next_level, current_parent) for node in tree.body: traverse(node) # Sort by line number for linear visual flow nodes.sort(key=lambda x: x['lineno']) return {"nodes": nodes, "connections": generate_connections(nodes)} def generate_connections(nodes): connections = [] node_map = {n['id']: n for n in nodes} for node in nodes: # 1. Structural Hierarchy (Tree) if node['parent_id'] and node['parent_id'] in node_map: connections.append({ "from": node['parent_id'], "to": node['id'], "type": "hierarchy" }) return connections