File size: 2,640 Bytes
03e7fda
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
"""

engine/dag_builder.py

----------------------

Builds a directed acyclic graph (DAG) of activity dependencies using networkx.

"""

import pandas as pd
import networkx as nx
from typing import Optional
from data_loader import DataLoader


def build_dag(project_id: str, loader: Optional[DataLoader] = None) -> nx.DiGraph:
    """

    Build a dependency DAG for the given project.



    Nodes : activity IDs (strings)

    Edges : (predecessor_id → successor_id)  [depends_on direction]

    Node attributes: all activity fields

    """
    if loader is None:
        loader = DataLoader()

    acts = loader.get_project_activities(project_id)
    if acts.empty:
        return nx.DiGraph()

    G = nx.DiGraph()

    # Add all activities as nodes
    for _, row in acts.iterrows():
        G.add_node(row["id"], **row.to_dict())

    # Add dependency edges
    for _, row in acts.iterrows():
        dep = row.get("depends_on")
        if dep and not (isinstance(dep, float)) and str(dep).strip():
            dep = str(dep).strip()
            if dep in G.nodes:
                # Edge: dep (predecessor) → row["id"] (successor)
                G.add_edge(dep, row["id"])

    # Validate: warn if cycle found (shouldn't happen in real data)
    if not nx.is_directed_acyclic_graph(G):
        print(f"⚠️  Cycle detected in DAG for {project_id}!")

    return G


def get_topological_order(G: nx.DiGraph) -> list:
    """Return activities in topological order (starts before ends)."""
    try:
        return list(nx.topological_sort(G))
    except nx.NetworkXUnfeasible:
        return list(G.nodes)


def get_descendants(G: nx.DiGraph, activity_id: str) -> list:
    """Return all downstream activities (BFS)."""
    try:
        return list(nx.descendants(G, activity_id))
    except nx.NodeNotFound:
        return []


def get_ancestors(G: nx.DiGraph, activity_id: str) -> list:
    """Return all upstream activities."""
    try:
        return list(nx.ancestors(G, activity_id))
    except nx.NodeNotFound:
        return []


def get_activity_depth(G: nx.DiGraph, activity_id: str) -> int:
    """Depth of activity in the DAG from source nodes."""
    try:
        sources = [n for n in G.nodes if G.in_degree(n) == 0]
        max_depth = 0
        for src in sources:
            try:
                path_length = nx.shortest_path_length(G, src, activity_id)
                max_depth = max(max_depth, path_length)
            except nx.NetworkXNoPath:
                pass
        return max_depth
    except Exception:
        return 0