Steven's Knowledge

Getting Started

Spin up Airflow locally, write a DAG, trigger a dbt run, explore Dagster's asset model, compare execution

Getting Started

This page runs two orchestrators side by side — Airflow (the incumbent) and Dagster (the modern asset-centric option) — building the same pipeline in each.

Path A: Airflow with Docker Compose

The official Quick Start is the fastest path to a working Airflow.

curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.10.0/docker-compose.yaml'
mkdir -p ./dags ./logs ./plugins ./config
echo -e "AIRFLOW_UID=$(id -u)" > .env

docker compose up airflow-init  # one-time
docker compose up -d            # bring up everything

Open http://localhost:8080. Login airflow / airflow.

You're now running: PostgreSQL (metadata), Redis (broker), webserver, scheduler, worker (CeleryExecutor by default).

Write Your First DAG

dags/hello_world.py:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

def say_hello(**context):
    print(f"Hello from Airflow! Run date: {context['ds']}")
    return "ok"

def fail_sometimes():
    import random
    if random.random() < 0.3:
        raise Exception("Random failure to demonstrate retries")
    print("Succeeded this time")

with DAG(
    dag_id="hello_world",
    start_date=datetime(2026, 1, 1),
    schedule="@hourly",
    catchup=False,
    default_args={
        "owner": "platform-team",
        "retries": 2,
        "retry_delay": timedelta(minutes=1),
    },
    tags=["demo"],
) as dag:
    hello = PythonOperator(task_id="say_hello", python_callable=say_hello)
    flaky = PythonOperator(task_id="might_fail", python_callable=fail_sometimes)
    bash  = BashOperator(task_id="run_command", bash_command="echo 'date={{ ds }}'")

    hello >> flaky >> bash

Save it. Within ~30s the scheduler picks it up. Refresh the UI; you'll see hello_world. Toggle it ON. It'll run hourly.

Trigger and Observe

In the UI:

  • Trigger DAG ▶ runs it now
  • Click into a run → see task status
  • Click a task → Log to see what happened
  • Failed tasks turn red; retries are visible; the second-or-third try succeeds

The Tree / Graph / Gantt views show different aspects. The Graph is what you check most.

Real Pipeline: Extract → Transform → Notify

dags/daily_report.py:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.http.operators.http import SimpleHttpOperator

def extract_data(**context):
    """Pretend to extract from a SaaS API"""
    import json
    data = [{"region": "US", "revenue": 1234}, {"region": "EU", "revenue": 567}]
    context['ti'].xcom_push(key='rows', value=data)
    return len(data)

def transform_data(**context):
    rows = context['ti'].xcom_pull(task_ids='extract', key='rows')
    total = sum(r['revenue'] for r in rows)
    context['ti'].xcom_push(key='total', value=total)
    return total

def report(**context):
    total = context['ti'].xcom_pull(task_ids='transform', key='total')
    print(f"Daily total revenue: ${total}")

with DAG(
    "daily_report",
    start_date=datetime(2026, 1, 1),
    schedule="0 3 * * *",   # 03:00 UTC
    catchup=False,
    default_args={"retries": 3, "retry_delay": timedelta(minutes=5)},
) as dag:
    extract = PythonOperator(task_id="extract", python_callable=extract_data)
    transform = PythonOperator(task_id="transform", python_callable=transform_data)
    notify = PythonOperator(task_id="report", python_callable=report)

    extract >> transform >> notify

xcom_push / xcom_pull pass small values between tasks. Don't push large datasets through XCom — the metadata DB will choke. Reference S3/warehouse locations instead.

Run dbt from Airflow

The most common Airflow-meets-real-life pattern:

from airflow.operators.bash import BashOperator

dbt_run = BashOperator(
    task_id="dbt_run",
    bash_command="cd /opt/dbt_project && dbt run --target prod",
    env={"DBT_PROFILES_DIR": "/opt/dbt_project"},
)

dbt_test = BashOperator(
    task_id="dbt_test",
    bash_command="cd /opt/dbt_project && dbt test --target prod",
)

dbt_run >> dbt_test

For something nicer: the airflow-dbt-python or cosmos packages give you a task per dbt model with proper dependencies in the Airflow UI.

Path B: Dagster

Dagster has a more modern model — assets, types, software-engineering ergonomics.

pip install dagster dagster-webserver

mkdir my_dagster && cd my_dagster
dagster project scaffold --name my_pipeline
cd my_pipeline

Write Some Assets

my_pipeline/assets.py:

from dagster import asset
import pandas as pd

@asset
def raw_sales() -> pd.DataFrame:
    """Pretend extract from a source."""
    return pd.DataFrame({
        "day": ["2026-05-19", "2026-05-20", "2026-05-21"],
        "region": ["US", "US", "EU"],
        "revenue": [1200, 1500, 800],
    })

@asset
def daily_totals(raw_sales: pd.DataFrame) -> pd.DataFrame:
    return raw_sales.groupby("day", as_index=False)["revenue"].sum()

@asset
def regional_totals(raw_sales: pd.DataFrame) -> pd.DataFrame:
    return raw_sales.groupby("region", as_index=False)["revenue"].sum()

my_pipeline/__init__.py registers the assets (created by the scaffold).

Run It

dagster dev

Open http://localhost:3000. You see the asset graph: raw_sales produces daily_totals and regional_totals in parallel.

  • Materialize all runs the pipeline.
  • Each asset has a history of materializations and lineage view.
  • Click an asset → see code, type, partitions, freshness policy.

This is conceptually different from Airflow: the data is what you orchestrate, not the tasks. Equivalent power, more aligned with how data engineers think.

Schedules and Sensors

# my_pipeline/schedules.py
from dagster import schedule

@schedule(cron_schedule="0 3 * * *", job_name="__ASSET_JOB")
def daily_schedule(_context):
    return {}

Or sensors that trigger on external conditions:

from dagster import sensor, RunRequest

@sensor(job_name="__ASSET_JOB")
def file_arrival_sensor(context):
    if s3_has_new_file():
        yield RunRequest(run_key="latest_file")

The sensor wakes every 30s by default. No more "wait 3 hours just in case the file shows up" cron jobs.

Compare

After running both:

AirflowDagster
Unit of workTask (op)Asset (output)
ScheduleDAG-wide cronPer-asset / job
Type safetyWeak (XCom is Any)Strong (function signatures)
Local devDocker compose stackdagster dev (one process)
UIFunctional, denseModern, asset-graph-first
dbt integrationvia cosmos packagefirst-class dagster-dbt
MaturityHighestHigh and growing
Best forTeams already invested; ETL orchestrator hubNew projects; software-engineering-minded teams

There's no wrong answer. Airflow is the safe inherit-an-existing-stack choice. Dagster is the modern green-field choice.

Argo Workflows (Kubernetes)

For K8s-native, ML, or CI-style pipelines:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata: { generateName: hello-world- }
spec:
  entrypoint: main
  templates:
    - name: main
      dag:
        tasks:
          - { name: extract, template: run, arguments: { parameters: [{name: step, value: extract}] } }
          - { name: transform, dependencies: [extract], template: run, arguments: { parameters: [{name: step, value: transform}] } }
          - { name: notify, dependencies: [transform], template: run, arguments: { parameters: [{name: step, value: notify}] } }
    - name: run
      inputs:
        parameters: [{ name: step }]
      container:
        image: python:3.12
        command: ['python', '-c']
        args: ["print('running step {{inputs.parameters.step}}')"]
argo submit -n argo workflow.yaml
argo list -n argo
argo logs <wf-name> -n argo

Each task is a pod; independent scaling, complete isolation, native to Kubernetes. The price is YAML and learning Argo's conventions.

Cleanup

docker compose down -v    # Airflow
# Dagster: Ctrl-C, that's it

What's Next

  • Patterns — DAG design, sensors, backfills, dynamic tasks, retries, secrets, ML pipelines
  • Best Practices — code organization, testing, observability, capacity, scaling, pitfalls

On this page