Spaces:
Configuration error
Configuration error
| """ | |
| engine/dag_builder.py | |
| ---------------------- | |
| Builds a directed acyclic graph (DAG) of activity dependencies using networkx. | |
| """ | |
| import pandas as pd | |
| import networkx as nx | |
| from typing import Optional | |
| from data_loader import DataLoader | |
| def build_dag(project_id: str, loader: Optional[DataLoader] = None) -> nx.DiGraph: | |
| """ | |
| Build a dependency DAG for the given project. | |
| Nodes : activity IDs (strings) | |
| Edges : (predecessor_id → successor_id) [depends_on direction] | |
| Node attributes: all activity fields | |
| """ | |
| if loader is None: | |
| loader = DataLoader() | |
| acts = loader.get_project_activities(project_id) | |
| if acts.empty: | |
| return nx.DiGraph() | |
| G = nx.DiGraph() | |
| # Add all activities as nodes | |
| for _, row in acts.iterrows(): | |
| G.add_node(row["id"], **row.to_dict()) | |
| # Add dependency edges | |
| for _, row in acts.iterrows(): | |
| dep = row.get("depends_on") | |
| if dep and not (isinstance(dep, float)) and str(dep).strip(): | |
| dep = str(dep).strip() | |
| if dep in G.nodes: | |
| # Edge: dep (predecessor) → row["id"] (successor) | |
| G.add_edge(dep, row["id"]) | |
| # Validate: warn if cycle found (shouldn't happen in real data) | |
| if not nx.is_directed_acyclic_graph(G): | |
| print(f"⚠️ Cycle detected in DAG for {project_id}!") | |
| return G | |
| def get_topological_order(G: nx.DiGraph) -> list: | |
| """Return activities in topological order (starts before ends).""" | |
| try: | |
| return list(nx.topological_sort(G)) | |
| except nx.NetworkXUnfeasible: | |
| return list(G.nodes) | |
| def get_descendants(G: nx.DiGraph, activity_id: str) -> list: | |
| """Return all downstream activities (BFS).""" | |
| try: | |
| return list(nx.descendants(G, activity_id)) | |
| except nx.NodeNotFound: | |
| return [] | |
| def get_ancestors(G: nx.DiGraph, activity_id: str) -> list: | |
| """Return all upstream activities.""" | |
| try: | |
| return list(nx.ancestors(G, activity_id)) | |
| except nx.NodeNotFound: | |
| return [] | |
| def get_activity_depth(G: nx.DiGraph, activity_id: str) -> int: | |
| """Depth of activity in the DAG from source nodes.""" | |
| try: | |
| sources = [n for n in G.nodes if G.in_degree(n) == 0] | |
| max_depth = 0 | |
| for src in sources: | |
| try: | |
| path_length = nx.shortest_path_length(G, src, activity_id) | |
| max_depth = max(max_depth, path_length) | |
| except nx.NetworkXNoPath: | |
| pass | |
| return max_depth | |
| except Exception: | |
| return 0 | |