suvradeepp's picture
Upload 63 files
03e7fda verified
"""
engine/dag_builder.py
----------------------
Builds a directed acyclic graph (DAG) of activity dependencies using networkx.
"""
import pandas as pd
import networkx as nx
from typing import Optional
from data_loader import DataLoader
def build_dag(project_id: str, loader: Optional[DataLoader] = None) -> nx.DiGraph:
"""
Build a dependency DAG for the given project.
Nodes : activity IDs (strings)
Edges : (predecessor_id → successor_id) [depends_on direction]
Node attributes: all activity fields
"""
if loader is None:
loader = DataLoader()
acts = loader.get_project_activities(project_id)
if acts.empty:
return nx.DiGraph()
G = nx.DiGraph()
# Add all activities as nodes
for _, row in acts.iterrows():
G.add_node(row["id"], **row.to_dict())
# Add dependency edges
for _, row in acts.iterrows():
dep = row.get("depends_on")
if dep and not (isinstance(dep, float)) and str(dep).strip():
dep = str(dep).strip()
if dep in G.nodes:
# Edge: dep (predecessor) → row["id"] (successor)
G.add_edge(dep, row["id"])
# Validate: warn if cycle found (shouldn't happen in real data)
if not nx.is_directed_acyclic_graph(G):
print(f"⚠️ Cycle detected in DAG for {project_id}!")
return G
def get_topological_order(G: nx.DiGraph) -> list:
"""Return activities in topological order (starts before ends)."""
try:
return list(nx.topological_sort(G))
except nx.NetworkXUnfeasible:
return list(G.nodes)
def get_descendants(G: nx.DiGraph, activity_id: str) -> list:
"""Return all downstream activities (BFS)."""
try:
return list(nx.descendants(G, activity_id))
except nx.NodeNotFound:
return []
def get_ancestors(G: nx.DiGraph, activity_id: str) -> list:
"""Return all upstream activities."""
try:
return list(nx.ancestors(G, activity_id))
except nx.NodeNotFound:
return []
def get_activity_depth(G: nx.DiGraph, activity_id: str) -> int:
"""Depth of activity in the DAG from source nodes."""
try:
sources = [n for n in G.nodes if G.in_degree(n) == 0]
max_depth = 0
for src in sources:
try:
path_length = nx.shortest_path_length(G, src, activity_id)
max_depth = max(max_depth, path_length)
except nx.NetworkXNoPath:
pass
return max_depth
except Exception:
return 0