| """ |
| ## Astronaut ETL example DAG |
| |
| This DAG queries the list of astronauts currently in space from the |
| Open Notify API and prints each astronaut's name and flying craft. |
| |
| There are two tasks, one to get the data from the API and save the results, |
| and another to print the results. Both tasks are written in Python using |
| Airflow's TaskFlow API, which allows you to easily turn Python functions into |
| Airflow tasks, and automatically infer dependencies and pass data. |
| |
| The second task uses dynamic task mapping to create a copy of the task for |
| each Astronaut in the list retrieved from the API. This list will change |
| depending on how many Astronauts are in space, and the DAG will adjust |
| accordingly each time it runs. |
| |
| For more explanation and getting started instructions, see our Write your |
| first DAG tutorial: https://www.astronomer.io/docs/learn/get-started-with-airflow |
| |
|  |
| """ |
|
|
| from airflow.sdk.definitions.asset import Asset |
| from airflow.decorators import dag, task |
| from pendulum import datetime |
| import requests |
|
|
|
|
| |
| @dag( |
| start_date=datetime(2024, 1, 1), |
| schedule="@daily", |
| catchup=False, |
| doc_md=__doc__, |
| default_args={"owner": "Astro", "retries": 3}, |
| tags=["example"], |
| ) |
| def example_astronauts(): |
| |
| @task( |
| |
| outlets=[Asset("current_astronauts")] |
| ) |
| def get_astronauts(**context) -> list[dict]: |
| """ |
| This task uses the requests library to retrieve a list of Astronauts |
| currently in space. The results are pushed to XCom with a specific key |
| so they can be used in a downstream pipeline. The task returns a list |
| of Astronauts to be used in the next task. |
| """ |
| try: |
| r = requests.get("http://api.open-notify.org/astros.json") |
| r.raise_for_status() |
| number_of_people_in_space = r.json()["number"] |
| list_of_people_in_space = r.json()["people"] |
| except Exception: |
| print("API currently not available, using hardcoded data instead.") |
| number_of_people_in_space = 12 |
| list_of_people_in_space = [ |
| {"craft": "ISS", "name": "Oleg Kononenko"}, |
| {"craft": "ISS", "name": "Nikolai Chub"}, |
| {"craft": "ISS", "name": "Tracy Caldwell Dyson"}, |
| {"craft": "ISS", "name": "Matthew Dominick"}, |
| {"craft": "ISS", "name": "Michael Barratt"}, |
| {"craft": "ISS", "name": "Jeanette Epps"}, |
| {"craft": "ISS", "name": "Alexander Grebenkin"}, |
| {"craft": "ISS", "name": "Butch Wilmore"}, |
| {"craft": "ISS", "name": "Sunita Williams"}, |
| {"craft": "Tiangong", "name": "Li Guangsu"}, |
| {"craft": "Tiangong", "name": "Li Cong"}, |
| {"craft": "Tiangong", "name": "Ye Guangfu"}, |
| ] |
|
|
| context["ti"].xcom_push( |
| key="number_of_people_in_space", value=number_of_people_in_space |
| ) |
| return list_of_people_in_space |
|
|
| @task |
| def print_astronaut_craft(greeting: str, person_in_space: dict) -> None: |
| """ |
| This task creates a print statement with the name of an |
| Astronaut in space and the craft they are flying on from |
| the API request results of the previous task, along with a |
| greeting which is hard-coded in this example. |
| """ |
| craft = person_in_space["craft"] |
| name = person_in_space["name"] |
|
|
| print(f"{name} is currently in space flying on the {craft}! {greeting}") |
|
|
| |
| |
| print_astronaut_craft.partial(greeting="Hello! :)").expand( |
| person_in_space=get_astronauts() |
| ) |
|
|
|
|
| |
| example_astronauts() |
|
|