File size: 708 Bytes
cdcab39 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
def _task_instances_for_dag_runself, dag, dag_run, session=None):
tasks_to_run = {}
if dag_run is None:
return tasks_to_run
self.reset_state_for_orphaned_tasks(filter_by_dag_run=dag_run, session=session)
dag_run.refresh_from_db()
make_transient(dag_run)
dag_run.dag = dag
info = dag_run.task_instance_scheduling_decisions(session=session)
schedulable_tis = info.schedulable_tis
try:
for ti in dag_run.get_task_instances(session=session):
if ti in schedulable_tis:
ti.set_state(TaskInstanceState.SCHEDULED)
if ti.state != TaskInstanceState.REMOVED:
tasks_to_run[ti.key] = ti
session.commit()
except Exception:
session.rollback()
raise
return tasks_to_run |