File size: 708 Bytes
cdcab39
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def _task_instances_for_dag_runself, dag, dag_run, session=None):
		tasks_to_run = {}
		if dag_run is None:
			return tasks_to_run
		self.reset_state_for_orphaned_tasks(filter_by_dag_run=dag_run, session=session)
		dag_run.refresh_from_db()
		make_transient(dag_run)
		dag_run.dag = dag
		info = dag_run.task_instance_scheduling_decisions(session=session)
		schedulable_tis = info.schedulable_tis
		try:
			for ti in dag_run.get_task_instances(session=session):
				if ti in schedulable_tis:
					ti.set_state(TaskInstanceState.SCHEDULED)
				if ti.state != TaskInstanceState.REMOVED:
					tasks_to_run[ti.key] = ti
			session.commit()
		except Exception:
			session.rollback()
			raise
		return tasks_to_run