Getting Started
Spin up Airflow locally, write a DAG, trigger a dbt run, explore Dagster's asset model, compare execution
Getting Started
This page runs two orchestrators side by side — Airflow (the incumbent) and Dagster (the modern asset-centric option) — building the same pipeline in each.
Path A: Airflow with Docker Compose
The official Quick Start is the fastest path to a working Airflow.
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.10.0/docker-compose.yaml'
mkdir -p ./dags ./logs ./plugins ./config
echo -e "AIRFLOW_UID=$(id -u)" > .env
docker compose up airflow-init # one-time
docker compose up -d # bring up everythingOpen http://localhost:8080. Login airflow / airflow.
You're now running: PostgreSQL (metadata), Redis (broker), webserver, scheduler, worker (CeleryExecutor by default).
Write Your First DAG
dags/hello_world.py:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
def say_hello(**context):
print(f"Hello from Airflow! Run date: {context['ds']}")
return "ok"
def fail_sometimes():
import random
if random.random() < 0.3:
raise Exception("Random failure to demonstrate retries")
print("Succeeded this time")
with DAG(
dag_id="hello_world",
start_date=datetime(2026, 1, 1),
schedule="@hourly",
catchup=False,
default_args={
"owner": "platform-team",
"retries": 2,
"retry_delay": timedelta(minutes=1),
},
tags=["demo"],
) as dag:
hello = PythonOperator(task_id="say_hello", python_callable=say_hello)
flaky = PythonOperator(task_id="might_fail", python_callable=fail_sometimes)
bash = BashOperator(task_id="run_command", bash_command="echo 'date={{ ds }}'")
hello >> flaky >> bashSave it. Within ~30s the scheduler picks it up. Refresh the UI; you'll see hello_world. Toggle it ON. It'll run hourly.
Trigger and Observe
In the UI:
- Trigger DAG ▶ runs it now
- Click into a run → see task status
- Click a task → Log to see what happened
- Failed tasks turn red; retries are visible; the second-or-third try succeeds
The Tree / Graph / Gantt views show different aspects. The Graph is what you check most.
Real Pipeline: Extract → Transform → Notify
dags/daily_report.py:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
def extract_data(**context):
"""Pretend to extract from a SaaS API"""
import json
data = [{"region": "US", "revenue": 1234}, {"region": "EU", "revenue": 567}]
context['ti'].xcom_push(key='rows', value=data)
return len(data)
def transform_data(**context):
rows = context['ti'].xcom_pull(task_ids='extract', key='rows')
total = sum(r['revenue'] for r in rows)
context['ti'].xcom_push(key='total', value=total)
return total
def report(**context):
total = context['ti'].xcom_pull(task_ids='transform', key='total')
print(f"Daily total revenue: ${total}")
with DAG(
"daily_report",
start_date=datetime(2026, 1, 1),
schedule="0 3 * * *", # 03:00 UTC
catchup=False,
default_args={"retries": 3, "retry_delay": timedelta(minutes=5)},
) as dag:
extract = PythonOperator(task_id="extract", python_callable=extract_data)
transform = PythonOperator(task_id="transform", python_callable=transform_data)
notify = PythonOperator(task_id="report", python_callable=report)
extract >> transform >> notifyxcom_push / xcom_pull pass small values between tasks. Don't push large datasets through XCom — the metadata DB will choke. Reference S3/warehouse locations instead.
Run dbt from Airflow
The most common Airflow-meets-real-life pattern:
from airflow.operators.bash import BashOperator
dbt_run = BashOperator(
task_id="dbt_run",
bash_command="cd /opt/dbt_project && dbt run --target prod",
env={"DBT_PROFILES_DIR": "/opt/dbt_project"},
)
dbt_test = BashOperator(
task_id="dbt_test",
bash_command="cd /opt/dbt_project && dbt test --target prod",
)
dbt_run >> dbt_testFor something nicer: the airflow-dbt-python or cosmos packages give you a task per dbt model with proper dependencies in the Airflow UI.
Path B: Dagster
Dagster has a more modern model — assets, types, software-engineering ergonomics.
pip install dagster dagster-webserver
mkdir my_dagster && cd my_dagster
dagster project scaffold --name my_pipeline
cd my_pipelineWrite Some Assets
my_pipeline/assets.py:
from dagster import asset
import pandas as pd
@asset
def raw_sales() -> pd.DataFrame:
"""Pretend extract from a source."""
return pd.DataFrame({
"day": ["2026-05-19", "2026-05-20", "2026-05-21"],
"region": ["US", "US", "EU"],
"revenue": [1200, 1500, 800],
})
@asset
def daily_totals(raw_sales: pd.DataFrame) -> pd.DataFrame:
return raw_sales.groupby("day", as_index=False)["revenue"].sum()
@asset
def regional_totals(raw_sales: pd.DataFrame) -> pd.DataFrame:
return raw_sales.groupby("region", as_index=False)["revenue"].sum()my_pipeline/__init__.py registers the assets (created by the scaffold).
Run It
dagster devOpen http://localhost:3000. You see the asset graph: raw_sales produces daily_totals and regional_totals in parallel.
- Materialize all runs the pipeline.
- Each asset has a history of materializations and lineage view.
- Click an asset → see code, type, partitions, freshness policy.
This is conceptually different from Airflow: the data is what you orchestrate, not the tasks. Equivalent power, more aligned with how data engineers think.
Schedules and Sensors
# my_pipeline/schedules.py
from dagster import schedule
@schedule(cron_schedule="0 3 * * *", job_name="__ASSET_JOB")
def daily_schedule(_context):
return {}Or sensors that trigger on external conditions:
from dagster import sensor, RunRequest
@sensor(job_name="__ASSET_JOB")
def file_arrival_sensor(context):
if s3_has_new_file():
yield RunRequest(run_key="latest_file")The sensor wakes every 30s by default. No more "wait 3 hours just in case the file shows up" cron jobs.
Compare
After running both:
| Airflow | Dagster | |
|---|---|---|
| Unit of work | Task (op) | Asset (output) |
| Schedule | DAG-wide cron | Per-asset / job |
| Type safety | Weak (XCom is Any) | Strong (function signatures) |
| Local dev | Docker compose stack | dagster dev (one process) |
| UI | Functional, dense | Modern, asset-graph-first |
| dbt integration | via cosmos package | first-class dagster-dbt |
| Maturity | Highest | High and growing |
| Best for | Teams already invested; ETL orchestrator hub | New projects; software-engineering-minded teams |
There's no wrong answer. Airflow is the safe inherit-an-existing-stack choice. Dagster is the modern green-field choice.
Argo Workflows (Kubernetes)
For K8s-native, ML, or CI-style pipelines:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata: { generateName: hello-world- }
spec:
entrypoint: main
templates:
- name: main
dag:
tasks:
- { name: extract, template: run, arguments: { parameters: [{name: step, value: extract}] } }
- { name: transform, dependencies: [extract], template: run, arguments: { parameters: [{name: step, value: transform}] } }
- { name: notify, dependencies: [transform], template: run, arguments: { parameters: [{name: step, value: notify}] } }
- name: run
inputs:
parameters: [{ name: step }]
container:
image: python:3.12
command: ['python', '-c']
args: ["print('running step {{inputs.parameters.step}}')"]argo submit -n argo workflow.yaml
argo list -n argo
argo logs <wf-name> -n argoEach task is a pod; independent scaling, complete isolation, native to Kubernetes. The price is YAML and learning Argo's conventions.
Cleanup
docker compose down -v # Airflow
# Dagster: Ctrl-C, that's itWhat's Next
- Patterns — DAG design, sensors, backfills, dynamic tasks, retries, secrets, ML pipelines
- Best Practices — code organization, testing, observability, capacity, scaling, pitfalls