piliguori's picture
update examples
cdcab39
raw
history blame
710 Bytes
def _task_instances_for_dag_run(self, dag_run, session=None):\\n\\t\\ttasks_to_run = {}\\n\\t\\tif dag_run is None:\\n\\t\\t\\treturn tasks_to_run\\n\\t\\tself.reset_state_for_orphaned_tasks(filter_by_dag_run=dag_run, session=session)\\n\\t\\tdag_run.refresh_from_db()\\n\\t\\tmake_transient(dag_run)\\n\\t\\ttry:\\n\\t\\t\\tfor ti in dag_run.get_task_instances():\\n\\t\\t\\t\\tif ti.state == State.NONE:\\n\\t\\t\\t\\t\\tti.set_state(TaskInstanceState.SCHEDULED, session=session)\\n\\t\\t\\t\\tif ti.state != TaskInstanceState.REMOVED:\\n\\t\\t\\t\\t\\ttasks_to_run[ti.key] = ti\\n\\t\\t\\tsession.commit()\\n\\t\\texcept Exception:\\n\\t\\t\\tsession.rollback()\\n\\t\\t\\traise\\n\\t\\treturn tasks_to_run