SKILL.md
Airflow 2 to 3 Migration
This skill helps migrate Airflow 2.x DAG code to Airflow 3.x, focusing on code changes (imports, operators, hooks, context, API usage).
Important: Before migrating to Airflow 3, strongly recommend upgrading to Airflow 2.11 first, then to at least Airflow 3.0.11 (ideally directly to 3.1). Other upgrade paths would make rollbacks impossible. See: https://www.astronomer.io/docs/astro/airflow3/upgrade-af3#upgrade-your-airflow-2-deployment-to-airflow-3. Additionally, early 3.0 versions have many bugs - 3.1 provides a much better experience.
Migration at a Glance
- Run Ruff's Airflow migration rules to auto-fix detectable issues (AIR30/AIR301/AIR302/AIR31/AIR311/AIR312).
ruff check --preview --select AIR --fix --unsafe-fixes .
- Scan for remaining issues using the manual search checklist in reference/migration-checklist.md.
- Focus on: direct metadata DB access, legacy imports, scheduling/context keys, XCom pickling, datasets-to-assets, REST API/auth, plugins, and file paths.
- Hard behavior/config gotchas to explicitly review:
- Cron scheduling semantics: consider
AIRFLOW__SCHEDULER__CREATE_CRON_DATA_INTERVAL=Trueif you need Airflow 2-style cron data intervals.
.airflowignoresyntax changed from regexp to glob; setAIRFLOW__CORE__DAG_IGNORE_FILE_SYNTAX=regexpif you must keep regexp behavior.
- OAuth callback URLs add an
/auth/prefix (e.g./auth/oauth-authorized/google).
- Shared utility imports: Bare imports like
import commonfromdags/common/no longer work on Astro. Use fully qualified imports:import dags.common.
- Plan changes per file and issue type:
- Fix imports - update operators/hooks/providers - refactor metadata access to using the Airflow client instead of direct access - fix use of outdated context variables - fix scheduling logic.
- Implement changes incrementally, re-running Ruff and code searches after each major change.
- Explain changes to the user and caution them to test any updated logic such as refactored metadata, scheduling logic and use of the Airflow context.
Architecture & Metadata DB Access
Airflow 3 changes how components talk to the metadata database:
- Workers no longer connect directly to the metadata DB.
- Task code runs via the Task Execution API exposed by the API server.
- The DAG processor runs as an independent process separate from the scheduler.
- The Triggerer uses the task execution mechanism via an in-process API server.
Trigger implementation gotcha: If a trigger calls hooks synchronously inside the asyncio event loop, it may fail or block. Prefer calling hooks via sync_to_async(...) (or otherwise ensure hook calls are async-safe).
Key code impact: Task code can still import ORM sessions/models, but any attempt to use them to talk to the metadata DB will fail with:
RuntimeError: Direct database access via the ORM is not allowed in Airflow 3.x
Patterns to search for
When scanning DAGs, custom operators, and @task functions, look for:
- Session helpers:
provide_session,create_session,@provide_session
- Sessions from settings:
from airflow.settings import Session
- Engine access:
from airflow.settings import engine
- ORM usage with models:
session.query(DagModel)...,session.query(DagRun)...
Replacement: Airflow Python client
Preferred for rich metadata access patterns. Add to requirements.txt:
apache-airflow-client==<your-airflow-runtime-version>
Example usage:
import os
from airflow.sdk import BaseOperator
import airflow_client.client
from airflow_client.client.api.dag_api import DAGApi
_HOST = os.getenv("AIRFLOW__API__BASE_URL", "https://<your-org>.astronomer.run/<deployment>/")
_TOKEN = os.getenv("DEPLOYMENT_API_TOKEN")
class ListDagsOperator(BaseOperator):
def execute(self, context):
config = airflow_client.client.Configuration(host=_HOST, access_token=_TOKEN)
with airflow_client.client.ApiClient(config) as api_client:
dag_api = DAGApi(api_client)
dags = dag_api.get_dags(limit=10)
self.log.info("Found %d DAGs", len(dags.dags))
Replacement: Direct REST API calls
For simple cases, call the REST API directly using requests:
from airflow.sdk import task
import os
import requests
_HOST = os.getenv("AIRFLOW__API__BASE_URL", "https://<your-org>.astronomer.run/<deployment>/")
_TOKEN = os.getenv("DEPLOYMENT_API_TOKEN")
@task
def list_dags_via_api() -> None:
response = requests.get(
f"{_HOST}/api/v2/dags",
headers={"Accept": "application/json", "Authorization": f"Bearer {_TOKEN}"},
params={"limit": 10}
)
response.raise_for_status()
print(response.json())
Ruff Airflow Migration Rules
Use Ruff's Airflow rules to detect and fix many breaking changes automatically.
- AIR30 / AIR301 / AIR302: Removed code and imports in Airflow 3 - must be fixed.
- AIR31 / AIR311 / AIR312: Deprecated code and imports - still work but will be removed in future versions; should be fixed.
Commands to run (via uv) against the project root:
# Auto-fix all detectable Airflow issues (safe + unsafe)
ruff check --preview --select AIR --fix --unsafe-fixes .
# Check remaining Airflow issues without fixing
ruff check --preview --select AIR .
Reference Files
For detailed code examples and migration patterns, see:
- reference/config-changes.md -
airflow.cfgsection moves, renames, and removals
- reference/migration-patterns.md - Code examples for imports, scheduling, XCom, Assets, DAG bundles, runtime behavior changes
- reference/removed-methods.md - Removed model methods with SDK/API migration paths
- reference/migration-checklist.md - Search patterns and fixes for issues Ruff doesn't catch
Quick Reference Tables
Key Import Changes
Airflow 2.x
Airflow 3
airflow.operators.dummy_operator.DummyOperator
airflow.providers.standard.operators.empty.EmptyOperator
airflow.operators.bash.BashOperator
airflow.providers.standard.operators.bash.BashOperator
airflow.operators.python.PythonOperator
airflow.providers.standard.operators.python.PythonOperator
airflow.decorators.dag
airflow.sdk.dag
airflow.decorators.task
airflow.sdk.task
airflow.datasets.Dataset
airflow.sdk.Asset
Context Key Changes
Removed Key
Replacement
execution_date
context["dag_run"].logical_date
tomorrow_ds / yesterday_ds
Use ds with date math: macros.ds_add(ds, 1) / macros.ds_add(ds, -1)
prev_ds / next_ds
prev_start_date_success or timetable API
triggering_dataset_events
triggering_asset_events
templates_dict
context["params"]
Asset-triggered runs: logical_date may be None; use context["dag_run"].logical_date defensively.
**Cannot trigger with future logical_date**: Use logical_date=None and rely on run_id instead.
Cron note: for scheduled runs using cron, logical_date semantics differ under CronTriggerTimetable (aligning logical_date with run_after). If you need Airflow 2-style cron data intervals, consider AIRFLOW__SCHEDULER__CREATE_CRON_DATA_INTERVAL=True.
Default Behavior Changes
Setting
Airflow 2 Default
Airflow 3 Default
schedule
timedelta(days=1)
None
catchup
True
False
Callback Behavior Changes
on_success_callbackno longer runs on skip; useon_skipped_callbackif needed.
@teardownwithTriggerRule.ALWAYSnot allowed; teardowns now execute even if DAG run terminated early.
Resources
Related Skills
- testing-dags: For testing DAGs after migration
- debugging-dags: For troubleshooting migration issues
- deploying-airflow: For deploying migrated DAGs to production