airflow-dag-patterns

Production-ready patterns for Apache Airflow DAGs, operators, sensors, testing, and deployment. Covers DAG design principles (idempotent, atomic, incremental, observable) with task dependency patterns for linear, fan-out, fan-in, and complex workflows Includes TaskFlow API decorators for cleaner code with automatic XCom passing, dynamic DAG generation from configs, and branching with conditional logic Provides sensor patterns for S3 files, external task dependencies, and custom sensors; error handling with callbacks and trigger rules; and testing strategies with pytest fixtures Best practices section covers idempotency, timeouts, worker slot management, and anti-patterns like hardcoded dates and stateful tasks

INSTALLATION
npx skills add https://github.com/wshobson/agents --skill airflow-dag-patterns
Run in your project or agent environment. Adjust flags if your CLI version differs.

SKILL.md

Apache Airflow DAG Patterns

Production-ready patterns for Apache Airflow including DAG design, operators, sensors, testing, and deployment strategies.

When to Use This Skill

  • Creating data pipeline orchestration with Airflow
  • Designing DAG structures and dependencies
  • Implementing custom operators and sensors
  • Testing Airflow DAGs locally
  • Setting up Airflow in production
  • Debugging failed DAG runs

Core Concepts

1. DAG Design Principles

Principle

Description

Idempotent

Running twice produces same result

Atomic

Tasks succeed or fail completely

Incremental

Process only new/changed data

Observable

Logs, metrics, alerts at every step

2. Task Dependencies

# Linear

task1 >> task2 >> task3

# Fan-out

task1 >> [task2, task3, task4]

# Fan-in

[task1, task2, task3] >> task4

# Complex

task1 >> task2 >> task4

task1 >> task3 >> task4

Quick Start

# dags/example_dag.py

from datetime import datetime, timedelta

from airflow import DAG

from airflow.operators.python import PythonOperator

from airflow.operators.empty import EmptyOperator

default_args = {

    'owner': 'data-team',

    'depends_on_past': False,

    'email_on_failure': True,

    'email_on_retry': False,

    'retries': 3,

    'retry_delay': timedelta(minutes=5),

    'retry_exponential_backoff': True,

    'max_retry_delay': timedelta(hours=1),

}

with DAG(

    dag_id='example_etl',

    default_args=default_args,

    description='Example ETL pipeline',

    schedule='0 6 * * *',  # Daily at 6 AM

    start_date=datetime(2024, 1, 1),

    catchup=False,

    tags=['etl', 'example'],

    max_active_runs=1,

) as dag:

    start = EmptyOperator(task_id='start')

    def extract_data(**context):

        execution_date = context['ds']

        # Extract logic here

        return {'records': 1000}

    extract = PythonOperator(

        task_id='extract',

        python_callable=extract_data,

    )

    end = EmptyOperator(task_id='end')

    start >> extract >> end

Patterns

Pattern 1: TaskFlow API (Airflow 2.0+)

# dags/taskflow_example.py

from datetime import datetime

from airflow.decorators import dag, task

from airflow.models import Variable

@dag(

    dag_id='taskflow_etl',

    schedule='@daily',

    start_date=datetime(2024, 1, 1),

    catchup=False,

    tags=['etl', 'taskflow'],

)

def taskflow_etl():

    """ETL pipeline using TaskFlow API"""

    @task()

    def extract(source: str) -> dict:

        """Extract data from source"""

        import pandas as pd

        df = pd.read_csv(f's3://bucket/{source}/{{ ds }}.csv')

        return {'data': df.to_dict(), 'rows': len(df)}

    @task()

    def transform(extracted: dict) -> dict:

        """Transform extracted data"""

        import pandas as pd

        df = pd.DataFrame(extracted['data'])

        df['processed_at'] = datetime.now()

        df = df.dropna()

        return {'data': df.to_dict(), 'rows': len(df)}

    @task()

    def load(transformed: dict, target: str):

        """Load data to target"""

        import pandas as pd

        df = pd.DataFrame(transformed['data'])

        df.to_parquet(f's3://bucket/{target}/{{ ds }}.parquet')

        return transformed['rows']

    @task()

    def notify(rows_loaded: int):

        """Send notification"""

        print(f'Loaded {rows_loaded} rows')

    # Define dependencies with XCom passing

    extracted = extract(source='raw_data')

    transformed = transform(extracted)

    loaded = load(transformed, target='processed_data')

    notify(loaded)

# Instantiate the DAG

taskflow_etl()

Pattern 2: Dynamic DAG Generation

# dags/dynamic_dag_factory.py

from datetime import datetime, timedelta

from airflow import DAG

from airflow.operators.python import PythonOperator

from airflow.models import Variable

import json

# Configuration for multiple similar pipelines

PIPELINE_CONFIGS = [

    {'name': 'customers', 'schedule': '@daily', 'source': 's3://raw/customers'},

    {'name': 'orders', 'schedule': '@hourly', 'source': 's3://raw/orders'},

    {'name': 'products', 'schedule': '@weekly', 'source': 's3://raw/products'},

]

def create_dag(config: dict) -> DAG:

    """Factory function to create DAGs from config"""

    dag_id = f"etl_{config['name']}"

    default_args = {

        'owner': 'data-team',

        'retries': 3,

        'retry_delay': timedelta(minutes=5),

    }

    dag = DAG(

        dag_id=dag_id,

        default_args=default_args,

        schedule=config['schedule'],

        start_date=datetime(2024, 1, 1),

        catchup=False,

        tags=['etl', 'dynamic', config['name']],

    )

    with dag:

        def extract_fn(source, **context):

            print(f"Extracting from {source} for {context['ds']}")

        def transform_fn(**context):

            print(f"Transforming data for {context['ds']}")

        def load_fn(table_name, **context):

            print(f"Loading to {table_name} for {context['ds']}")

        extract = PythonOperator(

            task_id='extract',

            python_callable=extract_fn,

            op_kwargs={'source': config['source']},

        )

        transform = PythonOperator(

            task_id='transform',

            python_callable=transform_fn,

        )

        load = PythonOperator(

            task_id='load',

            python_callable=load_fn,

            op_kwargs={'table_name': config['name']},

        )

        extract >> transform >> load

    return dag

# Generate DAGs

for config in PIPELINE_CONFIGS:

    globals()[f"dag_{config['name']}"] = create_dag(config)

Pattern 3: Branching and Conditional Logic

# dags/branching_example.py

from airflow.decorators import dag, task

from airflow.operators.python import BranchPythonOperator

from airflow.operators.empty import EmptyOperator

from airflow.utils.trigger_rule import TriggerRule

@dag(

    dag_id='branching_pipeline',

    schedule='@daily',

    start_date=datetime(2024, 1, 1),

    catchup=False,

)

def branching_pipeline():

    @task()

    def check_data_quality() -> dict:

        """Check data quality and return metrics"""

        quality_score = 0.95  # Simulated

        return {'score': quality_score, 'rows': 10000}

    def choose_branch(**context) -> str:

        """Determine which branch to execute"""

        ti = context['ti']

        metrics = ti.xcom_pull(task_ids='check_data_quality')

        if metrics['score'] >= 0.9:

            return 'high_quality_path'

        elif metrics['score'] >= 0.7:

            return 'medium_quality_path'

        else:

            return 'low_quality_path'

    quality_check = check_data_quality()

    branch = BranchPythonOperator(

        task_id='branch',

        python_callable=choose_branch,

    )

    high_quality = EmptyOperator(task_id='high_quality_path')

    medium_quality = EmptyOperator(task_id='medium_quality_path')

    low_quality = EmptyOperator(task_id='low_quality_path')

    # Join point - runs after any branch completes

    join = EmptyOperator(

        task_id='join',

        trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,

    )

    quality_check >> branch >> [high_quality, medium_quality, low_quality] >> join

branching_pipeline()

Pattern 4: Sensors and External Dependencies

# dags/sensor_patterns.py

from datetime import datetime, timedelta

from airflow import DAG

from airflow.sensors.filesystem import FileSensor

from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor

from airflow.sensors.external_task import ExternalTaskSensor

from airflow.operators.python import PythonOperator

with DAG(

    dag_id='sensor_example',

    schedule='@daily',

    start_date=datetime(2024, 1, 1),

    catchup=False,

) as dag:

    # Wait for file on S3

    wait_for_file = S3KeySensor(

        task_id='wait_for_s3_file',

        bucket_name='data-lake',

        bucket_key='raw/{{ ds }}/data.parquet',

        aws_conn_id='aws_default',

        timeout=60 * 60 * 2,  # 2 hours

        poke_interval=60 * 5,  # Check every 5 minutes

        mode='reschedule',  # Free up worker slot while waiting

    )

    # Wait for another DAG to complete

    wait_for_upstream = ExternalTaskSensor(

        task_id='wait_for_upstream_dag',

        external_dag_id='upstream_etl',

        external_task_id='final_task',

        execution_date_fn=lambda dt: dt,  # Same execution date

        timeout=60 * 60 * 3,

        mode='reschedule',

    )

    # Custom sensor using @task.sensor decorator

    @task.sensor(poke_interval=60, timeout=3600, mode='reschedule')

    def wait_for_api() -> PokeReturnValue:

        """Custom sensor for API availability"""

        import requests

        response = requests.get('https://api.example.com/health')

        is_done = response.status_code == 200

        return PokeReturnValue(is_done=is_done, xcom_value=response.json())

    api_ready = wait_for_api()

    def process_data(**context):

        api_result = context['ti'].xcom_pull(task_ids='wait_for_api')

        print(f"API returned: {api_result}")

    process = PythonOperator(

        task_id='process',

        python_callable=process_data,

    )

    [wait_for_file, wait_for_upstream, api_ready] >> process

Pattern 5: Error Handling and Alerts

# dags/error_handling.py

from datetime import datetime, timedelta

from airflow import DAG

from airflow.operators.python import PythonOperator

from airflow.utils.trigger_rule import TriggerRule

from airflow.models import Variable

def task_failure_callback(context):

    """Callback on task failure"""

    task_instance = context['task_instance']

    exception = context.get('exception')

    # Send to Slack/PagerDuty/etc

    message = f"""

    Task Failed!

    DAG: {task_instance.dag_id}

    Task: {task_instance.task_id}

    Execution Date: {context['ds']}

    Error: {exception}

    Log URL: {task_instance.log_url}

    """

    # send_slack_alert(message)

    print(message)

def dag_failure_callback(context):

    """Callback on DAG failure"""

    # Aggregate failures, send summary

    pass

with DAG(

    dag_id='error_handling_example',

    schedule='@daily',

    start_date=datetime(2024, 1, 1),

    catchup=False,

    on_failure_callback=dag_failure_callback,

    default_args={

        'on_failure_callback': task_failure_callback,

        'retries': 3,

        'retry_delay': timedelta(minutes=5),

    },

) as dag:

    def might_fail(**context):

        import random

        if random.random() < 0.3:

            raise ValueError("Random failure!")

        return "Success"

    risky_task = PythonOperator(

        task_id='risky_task',

        python_callable=might_fail,

    )

    def cleanup(**context):

        """Cleanup runs regardless of upstream failures"""

        print("Cleaning up...")

    cleanup_task = PythonOperator(

        task_id='cleanup',

        python_callable=cleanup,

        trigger_rule=TriggerRule.ALL_DONE,  # Run even if upstream fails

    )

    def notify_success(**context):

        """Only runs if all upstream succeeded"""

        print("All tasks succeeded!")

    success_notification = PythonOperator(

        task_id='notify_success',

        python_callable=notify_success,

        trigger_rule=TriggerRule.ALL_SUCCESS,

    )

    risky_task >> [cleanup_task, success_notification]

Pattern 6: Testing DAGs

# tests/test_dags.py

import pytest

from datetime import datetime

from airflow.models import DagBag

@pytest.fixture

def dagbag():

    return DagBag(dag_folder='dags/', include_examples=False)

def test_dag_loaded(dagbag):

    """Test that all DAGs load without errors"""

    assert len(dagbag.import_errors) == 0, f"DAG import errors: {dagbag.import_errors}"

def test_dag_structure(dagbag):

    """Test specific DAG structure"""

    dag = dagbag.get_dag('example_etl')

    assert dag is not None

    assert len(dag.tasks) == 3

    assert dag.schedule_interval == '0 6 * * *'

def test_task_dependencies(dagbag):

    """Test task dependencies are correct"""

    dag = dagbag.get_dag('example_etl')

    extract_task = dag.get_task('extract')

    assert 'start' in [t.task_id for t in extract_task.upstream_list]

    assert 'end' in [t.task_id for t in extract_task.downstream_list]

def test_dag_integrity(dagbag):

    """Test DAG has no cycles and is valid"""

    for dag_id, dag in dagbag.dags.items():

        assert dag.test_cycle() is None, f"Cycle detected in {dag_id}"

# Test individual task logic

def test_extract_function():

    """Unit test for extract function"""

    from dags.example_dag import extract_data

    result = extract_data(ds='2024-01-01')

    assert 'records' in result

    assert isinstance(result['records'], int)

Project Structure

airflow/

├── dags/

│   ├── __init__.py

│   ├── common/

│   │   ├── __init__.py

│   │   ├── operators.py    # Custom operators

│   │   ├── sensors.py      # Custom sensors

│   │   └── callbacks.py    # Alert callbacks

│   ├── etl/

│   │   ├── customers.py

│   │   └── orders.py

│   └── ml/

│       └── training.py

├── plugins/

│   └── custom_plugin.py

├── tests/

│   ├── __init__.py

│   ├── test_dags.py

│   └── test_operators.py

├── docker-compose.yml

└── requirements.txt

Best Practices

Do's

  • Use TaskFlow API - Cleaner code, automatic XCom
  • Set timeouts - Prevent zombie tasks
  • **Use mode='reschedule'** - For sensors, free up workers
  • Test DAGs - Unit tests and integration tests
  • Idempotent tasks - Safe to retry

Don'ts

  • **Don't use depends_on_past=True** - Creates bottlenecks
  • Don't hardcode dates - Use {{ ds }} macros
  • Don't use global state - Tasks should be stateless
  • Don't skip catchup blindly - Understand implications
  • Don't put heavy logic in DAG file - Import from modules
BrowserAct

Let your agent run on any real-world website

Bypass CAPTCHA & anti-bot for free. Start local, scale to cloud.

Explore BrowserAct Skills →

Stop writing automation&scrapers

Install the CLI. Run your first Skill in 30 seconds. Scale when you're ready.

Start free
free · no credit card