Spaces:
Sleeping
Sleeping
| from shapely.geometry import box as shapely_box, Point | |
| from collections import defaultdict, deque | |
| import math | |
| MAX_FALLBACK_DIST = 150 # pixels | |
| def map_arrows(nodes, arrows): | |
| """ | |
| Map arrows to node boxes using geometric containment with fallback to nearest box. | |
| Returns directional edges (source_id, target_id, label). | |
| Args: | |
| nodes (list): List of node dicts with 'bbox' field. | |
| arrows (list): List of arrow dicts with 'tail' and 'head' coordinates. | |
| Returns: | |
| list: List of (source, target, label) tuples. | |
| """ | |
| for node in nodes: | |
| node["shape"] = shapely_box(*node["bbox"]) | |
| node["center"] = ( | |
| (node["bbox"][0] + node["bbox"][2]) // 2, | |
| (node["bbox"][1] + node["bbox"][3]) // 2 | |
| ) | |
| edges = [] | |
| def find_nearest_node(pt): | |
| min_dist = float("inf") | |
| nearest_id = None | |
| for n in nodes: | |
| cx, cy = n["center"] | |
| dist = math.dist(pt, (cx, cy)) | |
| if dist < min_dist: | |
| min_dist = dist | |
| nearest_id = n["id"] | |
| return nearest_id, min_dist | |
| for arrow in arrows: | |
| if not isinstance(arrow, dict) or not isinstance(arrow.get("tail"), (tuple, list)) or not isinstance(arrow.get("head"), (tuple, list)): | |
| print(f"⚠️ Skipping malformed arrow: {arrow}") | |
| continue | |
| tail_pt = Point(arrow["tail"]) | |
| head_pt = Point(arrow["head"]) | |
| label = arrow.get("label", "").strip() | |
| source = next((n["id"] for n in nodes if n["shape"].buffer(10).contains(tail_pt)), None) | |
| target = next((n["id"] for n in nodes if n["shape"].buffer(10).contains(head_pt)), None) | |
| # Fallback to nearest node if not found | |
| if not source: | |
| source, dist = find_nearest_node(arrow["tail"]) | |
| if dist > MAX_FALLBACK_DIST: | |
| source = None | |
| if not target: | |
| target, dist = find_nearest_node(arrow["head"]) | |
| if dist > MAX_FALLBACK_DIST: | |
| target = None | |
| if source and target and source != target: | |
| print(f"✅ Mapped arrow from {source} → {target} [{label}]") | |
| edges.append((source, target, label)) | |
| else: | |
| print(f"⚠️ Could not map arrow endpoints to nodes: tail={arrow.get('tail')} head={arrow.get('head')}") | |
| return edges | |
| def detect_node_type(text, default_type="process"): | |
| """ | |
| Heuristically infer the node type from its text. | |
| Args: | |
| text (str): Node label. | |
| default_type (str): Fallback type. | |
| Returns: | |
| str: Inferred node type. | |
| """ | |
| text_lower = text.lower() | |
| if "start" in text_lower: | |
| return "start" | |
| if "end" in text_lower or "full" in text_lower: | |
| return "end" | |
| if "?" in text or "yes" in text_lower or "no" in text_lower: | |
| return "decision" | |
| return default_type | |
| def build_flowchart_json(nodes, arrows): | |
| """ | |
| Construct a structured flowchart JSON using basic graph traversal. | |
| Args: | |
| nodes (list): Detected node dicts. | |
| arrows (list): Detected arrow dicts. | |
| Returns: | |
| dict: JSON with 'start' and 'steps'. | |
| """ | |
| edges = map_arrows(nodes, arrows) | |
| # Build adjacency and reverse mappings | |
| graph = defaultdict(list) | |
| reverse_links = defaultdict(list) | |
| edge_labels = {} | |
| for src, tgt, label in edges: | |
| graph[src].append(tgt) | |
| reverse_links[tgt].append(src) | |
| edge_labels[(src, tgt)] = label.lower() | |
| all_node_ids = {n["id"] for n in nodes} | |
| start_candidates = [nid for nid in all_node_ids if nid not in reverse_links] | |
| flowchart = { | |
| "start": start_candidates[0] if start_candidates else None, | |
| "steps": [] | |
| } | |
| visited = set() | |
| queue = deque(start_candidates) | |
| id_to_node = {n["id"]: n for n in nodes} | |
| while queue: | |
| curr = queue.popleft() | |
| if curr in visited: | |
| continue | |
| visited.add(curr) | |
| node = id_to_node.get(curr, {}) | |
| text = node.get("text", "").strip() | |
| node_type = node.get("type") or detect_node_type(text) | |
| step = { | |
| "id": curr, | |
| "text": text, | |
| "type": node_type | |
| } | |
| parents = list(set(reverse_links[curr])) | |
| if len(parents) == 1: | |
| step["parent"] = parents[0] | |
| elif len(parents) > 1: | |
| step["parents"] = parents | |
| next_nodes = list(set(graph[curr])) | |
| if node_type == "decision" and next_nodes: | |
| step["branches"] = {} | |
| for tgt in next_nodes: | |
| label = edge_labels.get((curr, tgt), "") | |
| if "yes" in label: | |
| step["branches"]["yes"] = tgt | |
| elif "no" in label: | |
| step["branches"]["no"] = tgt | |
| else: | |
| step["branches"].setdefault("unknown", []).append(tgt) | |
| if tgt not in visited: | |
| queue.append(tgt) | |
| elif len(next_nodes) == 1: | |
| step["next"] = next_nodes[0] | |
| if next_nodes[0] not in visited: | |
| queue.append(next_nodes[0]) | |
| elif len(next_nodes) > 1: | |
| step["next"] = next_nodes | |
| for n in next_nodes: | |
| if n not in visited: | |
| queue.append(n) | |
| flowchart["steps"].append(step) | |
| return flowchart |