Steven's Knowledge

Patterns

DAG design, sensors, backfills, dynamic task generation, retries and idempotency, secrets, ML pipelines

Patterns

The patterns that turn a working orchestrator into one teams can rely on.

DAG Design

Idempotency is the foundation. Every task should be safe to re-run any number of times:

  • Inserts that detect duplicates (UPSERT, MERGE, dbt's incremental with unique_key)
  • Files written with deterministic paths and overwrite semantics
  • API calls with idempotency keys
  • "Process all events between start_ts and end_ts" instead of "process the next batch"

The orchestrator will retry; non-idempotent tasks corrupt state.

Pure functions of inputs: a task should produce the same output given the same date / partition. This makes backfills correct.

Small, focused tasks: one task = one concept. A 500-line script as a single task is hard to debug, retry, or parallelize. Split it.

Heavy work outside the orchestrator process: the orchestrator dispatches; the actual computation runs in a warehouse, on Spark, in a separate worker pod. The orchestrator's process stays small and fast.

Partitioned DAGs and Backfills

The killer feature of a real orchestrator is backfill: re-running a date range when something changes.

In Airflow:

with DAG(
    "daily_facts",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=True,  # !! key
) as dag:
    ...

If you turn this DAG on with catchup=True and start_date in the past, Airflow runs every missed schedule. To re-run from a specific date:

airflow dags backfill daily_facts --start-date 2026-03-01 --end-date 2026-03-31

In Dagster (asset-centric):

from dagster import asset, DailyPartitionsDefinition

daily = DailyPartitionsDefinition(start_date="2026-01-01")

@asset(partitions_def=daily)
def daily_sales(context):
    day = context.partition_key   # e.g., "2026-05-21"
    fetch_data_for_date(day)

Backfill from the UI: select asset, pick a date range, "materialize partitions." Dagster shows the partition status grid (green = complete, red = failed, gray = missing).

Always design with partitions in mind. "Run this for the last day" is universal. Tasks that don't know what time range they're for cause backfill chaos.

Sensors and Event-Driven

Cron isn't always right. Sensors trigger when an external condition is met:

SensorTriggers when
File sensorA specific S3 / GCS / HDFS path appears
External task sensorAnother DAG task succeeds (cross-DAG dependencies)
Database row sensorA query returns rows
API sensorAn external API endpoint returns a value
Time delta sensorAfter N seconds since DAG start
CustomAny condition your Python can express

Airflow:

from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor

wait_for_file = S3KeySensor(
    task_id="wait_for_data",
    bucket_name="data-lake",
    bucket_key="exports/{{ ds }}/data.parquet",
    timeout=60 * 60 * 6,   # 6 hours max wait
    poke_interval=60,       # check every minute
    mode="reschedule",      # release the worker between checks
)

mode="reschedule" is critical: instead of holding a worker for 6 hours, the sensor releases between polls. Always use it for long-waiting sensors.

Dagster equivalents are @sensor functions that wake regularly and decide whether to trigger.

Sensors replace "scheduled too early" cron anti-patterns: you don't need to schedule a job at 04:00 hoping the file arrives by then. Wait for the file.

Dynamic Task Generation

Sometimes the number of tasks isn't known until runtime:

# Airflow 2.x dynamic task mapping
@task
def get_regions():
    return ["us-east-1", "us-west-2", "eu-west-1", "ap-southeast-1"]

@task
def process_region(region: str):
    print(f"Processing {region}")

process_region.expand(region=get_regions())

The DAG now has 4 task instances (or N — whatever get_regions returns), all running in parallel where allowed. Fan-out is built-in.

In Dagster, dynamic outputs:

@op(out=DynamicOut())
def fan_out():
    for region in get_regions():
        yield DynamicOutput(region, mapping_key=region)

@op
def process(region):
    ...

Use cases: per-region jobs, per-customer batches, per-file processing, per-shard.

Retries and Failure Strategy

Retries should be calibrated:

Failure typeStrategy
Transient network glitchRetry 3-5 times, exponential backoff
Upstream not readyRetry with longer interval; or use a sensor
Auth failureNo retry — fail loud, page someone
Data quality failureNo retry (won't fix itself) — alert + manual investigation
Timeout / OOMRetry once with more resources, then fail

In Airflow:

PythonOperator(
    task_id="...",
    retries=3,
    retry_delay=timedelta(minutes=2),
    retry_exponential_backoff=True,
    max_retry_delay=timedelta(minutes=30),
    on_failure_callback=alert_pagerduty,
    on_retry_callback=log_retry,
)

For some failure types, you want a clean dead-letter: failed records get archived for manual triage rather than blocking the whole DAG.

Secrets Management

Pipelines need credentials: warehouse passwords, API keys, OAuth tokens.

MethodPros / Cons
Environment variablesEasy; risk of leakage in logs
Airflow Connections / VariablesBuilt-in encryption; UI managed
External Secrets Operator + VaultBest; rotation supported
Cloud-native (AWS Secrets Manager, GCP Secret Manager)Good; native to your cloud

In Airflow:

from airflow.models import Variable
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook

# Variable (encrypted in Airflow's metastore)
api_key = Variable.get("stripe_api_key")

# Or fetch from AWS Secrets Manager
import boto3
secret = boto3.client('secretsmanager').get_secret_value(SecretId='prod/stripe')

Connect to Secrets. Never commit secrets to DAG source; never print them in task logs.

ML Pipelines

ML adds its own shape on top of orchestration:

ingest_features → featurize → train → evaluate → register → deploy_if_better

Each step has unique needs:

  • featurize: heavy compute, often Spark, idempotent per date range
  • train: GPU; takes hours; produces a model artifact
  • evaluate: compares to existing production model
  • register: writes the model to MLflow / SageMaker / MLOps registry
  • deploy_if_better: a conditional task; deploys only if metrics improve

In Airflow with the KubernetesPodOperator or EmrAddStepsOperator. In Dagster, type-aware: input/output are typed; pipeline catches "model A expected but B given" before runtime.

Dedicated ML pipeline tools (Kubeflow Pipelines, Flyte, Metaflow) handle this domain better than generic orchestrators. Many teams use both: data orchestrator (Airflow/Dagster) calls into ML pipeline (Kubeflow/Flyte).

Cross-DAG / Cross-Team

When pipelines span teams:

  • Dataset triggers (Airflow 2.4+): a DAG can be scheduled on the materialization of a dataset (URI), regardless of which DAG produced it
  • External task sensor: explicit wait on another DAG's task
  • Asset-centric (Dagster): downstream assets automatically depend on upstream regardless of "job"

Cross-team is where orchestrator-vs-orchestrator integration matters. Dagster's "you depend on the asset, not the team's DAG" composes better than Airflow's task-level wiring.

Observability

Each run is a story. Capture:

  • Status: success / failed / running / skipped — for each task
  • Duration: detect creep; if a job that takes 10m suddenly takes 2h, find out
  • Inputs / outputs: what date / partition was processed
  • Logs: stdout/stderr, ideally streamed live for long-running
  • Lineage: which assets did this affect downstream

Wire into Monitoring:

Airflow → StatsD / Prometheus exporter → Grafana
Dagster → built-in metrics + OpenTelemetry export

Alerts on: failed runs (especially repeated), duration anomalies, missed schedules (DAG didn't start when expected), backlog (tasks queued beyond threshold).

SLAs and Freshness

For business-critical pipelines:

# Airflow
PythonOperator(
    task_id="critical_report",
    sla=timedelta(hours=2),  # alert if not done within 2h of expected
    ...
)
# Dagster
@asset(freshness_policy=FreshnessPolicy(maximum_lag_minutes=120))
def daily_revenue(...): ...

The orchestrator alerts the on-call if the deadline is missed. The dashboard isn't fresh? You know before the executive does.

Data Quality Gates

A task can fail the run when data quality is off:

@task
def quality_check(**context):
    rows = check_row_count_today()
    if rows < expected_min:
        raise AirflowFailException(f"Only {rows} rows; expected >= {expected_min}")
    if has_nulls_in_required_columns():
        raise AirflowFailException("Required columns have NULLs")

The downstream publish_to_bi waits on quality_check; bad data doesn't reach dashboards.

Wire to dbt tests, Great Expectations, Soda, or custom checks.

Anti-Patterns

Cron-style thinking in a DAG world. Scheduling tasks to "start late enough that everything's done." Use sensors / explicit dependencies.

XCom as a data pipeline. Pushing dataframes through XCom kills the metadata DB. Pass references (S3 paths, warehouse tables), not data.

Long-running tasks holding workers. A task that waits 6 hours holds a worker the whole time. Use sensors in reschedule mode or break into smaller tasks.

The God DAG. One 500-task DAG that no one understands. Decompose by concern (extract DAG, transform DAG, publish DAG) — and trigger via dataset/sensor.

No backfill thinking. Tasks that assume "today" but can't be re-run for an arbitrary date. Pass the partition explicitly.

Orchestrator does the work. pandas.read_sql 50GB in the orchestrator worker → OOM. Push compute to the warehouse / Spark / dedicated workers.

Cron-as-code orphans. People sneak cron jobs onto servers because the orchestrator is annoying. The orchestrator must be easier than crontab -e.

What's Next

  • Best Practices — code organization, testing, observability, capacity, scaling, pitfalls

On this page