airflow db init | 데이터베이스 초기화 |
airflow webserver -p 8080 | 웹 서버 시작 |
airflow scheduler | 스케줄러 시작 |
airflow users create --username admin --role Admin | 사용자 생성 |
airflow dags list | DAG 목록 |
airflow dags trigger dag_id | DAG 트리거 |
airflow tasks test dag_id task_id 2024-01-01 | 태스크 테스트 |
airflow dags backfill -s 2024-01-01 -e 2024-01-31 dag_id | 백필 |
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="my_dag",
default_args=default_args,
description="My first DAG",
schedule="0 0 * * *", # Daily at midnight
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["example"],
) as dag:
pass from airflow.decorators import dag, task
from datetime import datetime
@dag(
schedule="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
)
def my_dag():
@task()
def extract():
return {"data": [1, 2, 3]}
@task()
def transform(data: dict):
return [x * 2 for x in data["data"]]
@task()
def load(data: list):
print(f"Loading: {data}")
# Define dependencies
data = extract()
transformed = transform(data)
load(transformed)
my_dag() from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
def my_function(**context):
print(context["ds"]) # Execution date
return "result"
python_task = PythonOperator(
task_id="python_task",
python_callable=my_function,
)
bash_task = BashOperator(
task_id="bash_task",
bash_command="echo Hello {{ ds }}",
) from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator
def choose_branch(**context):
if context["ds_nodash"] > "20240101":
return "branch_a"
return "branch_b"
branch = BranchPythonOperator(
task_id="branch",
python_callable=choose_branch,
)
branch_a = EmptyOperator(task_id="branch_a")
branch_b = EmptyOperator(task_id="branch_b")
join = EmptyOperator(task_id="join", trigger_rule="none_failed_min_one_success")
branch >> [branch_a, branch_b] >> join from airflow.sensors.filesystem import FileSensor
from airflow.providers.http.sensors.http import HttpSensor
file_sensor = FileSensor(
task_id="wait_for_file",
filepath="/path/to/file.csv",
poke_interval=60, # Check every 60 seconds
timeout=3600, # Timeout after 1 hour
mode="poke", # or "reschedule"
)
http_sensor = HttpSensor(
task_id="wait_for_api",
http_conn_id="api_conn",
endpoint="/health",
response_check=lambda response: response.status_code == 200,
) # Using >> and <<
task1 >> task2 >> task3
task1 >> [task2, task3] >> task4
# Using set_upstream/downstream
task2.set_upstream(task1)
task2.set_downstream(task3)
# Cross-dependencies
from airflow.models.baseoperator import cross_downstream
cross_downstream([task1, task2], [task3, task4])
# Chain
from airflow.models.baseoperator import chain
chain(task1, [task2, task3], task4) def push_function(**context):
# Automatic push via return
return {"key": "value"}
# Or explicit push
context["ti"].xcom_push(key="my_key", value="my_value")
def pull_function(**context):
# Pull by task_id
value = context["ti"].xcom_pull(task_ids="push_task")
# Pull by key
value = context["ti"].xcom_pull(task_ids="push_task", key="my_key")
# In templates
bash_task = BashOperator(
task_id="bash_task",
bash_command='echo {{ ti.xcom_pull(task_ids="push_task") }}',
) from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
def query_postgres(**context):
hook = PostgresHook(postgres_conn_id="my_postgres")
records = hook.get_records("SELECT * FROM table")
return records
def upload_to_s3(**context):
hook = S3Hook(aws_conn_id="my_aws")
hook.load_string(
string_data="data",
key="my-key",
bucket_name="my-bucket",
)