前回Apach Airflowの概要について調べました。(こちら)
ジョブの実行やエラーだった時にメール送信やリトライする機能を実現しようとした時に、ジョブ管理システムの導入や導入しているシステムを利用することが一般的だと思います。ジョブ管理システムの有名なところとしては、JP1やSystemwalkerなどは聞いたことがある方もいるのではないでしょうか。
JP1やSystemwalkerなどの大規模基幹システムでは運用監視などを前提としているため、スモールスタートしたい場合にはどうしてもオーバースペックになってしまいます。
Airflowはそれらジョブ管理システムの代替になるわけではないのですが、データ基盤(ETL/ELT)やAI/MLパイプラインとして活用する機能を活用することで、スモールにシステム構成を構築できることができます。
今回は実際にジョブ管理システムとして利用することを想定した場合の利用方法について調査した内容の備忘録となります。
Dag間連携を必要とするケース
Dagには複数のタスクを定義して実行の順番を制御することができますが、Dag自体を分けてDagの実行順序を制御するDag間連携の方法もあります。
どのような設計思想でDag間連携を採用するか、
システム責務(担当領域)が異なるケース
部門が違う or 処理担当チームが違う場合、1つのDagにすると運用負荷や権限設計が複雑になるためDagを分割 → 依存連携するようなケースになります。
DAG_A:データ収集
DAG_B:データ加工・集約
DAG_C:分析結果をBIへ連携
長い処理を段階別に独立管理したいケース
大規模バッチ・ETLでは、途中で失敗したときの再実行範囲を細かく制御したいため、処理を分割する。
→ Dag失敗時の再実行単位として分けるのが合理的と判断するケースになります。
異なるスケジュールベースの処理を依存制御したい場合
常に時間ベースではなく、イベントベースで動きたい場合に有効。
| DAG | スケジュール | 条件 |
|---|---|---|
| DAG_A(入力データ収集) | 毎日 0:00 | 自律スケジュール |
| DAG_B(集計) | 実行条件:DAG_Aが成功したら動く | スケジュール不要 |
処理内容がモジュール化され、複数Dagから再利用したい場合
例えば、
DAG_X:日次処理
DAG_Y:月次処理
のように、再利用性・保守性の観点で DAG を分割し、トリガーで繋ぐのが合理的なケース。
検証/リリース境界が異なる場合
DAG_A の変更=DAG_B には関係ない場合、
ワークロード分割することで:
• CI/CD容易化
• ロールバック容易化
• 可観測性(ログ・監査)の明確化
につながるようなケース。
外部システム連携が挟まる場合
ステップ間に手動承認 or 外部依頼フェーズがある場合、DAGを分けると運用制御が柔軟になるようなケース。
DAG1 → DBロード
↓ 完了通知
DAG2 → 外部API呼び出し or BI更新
Dagが終わったら次のDagを実行する方法
今回は2つのDagを作成します。
親のDagを作成
from airflow.sdk import dag, task
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
import pendulum
@dag(
dag_id="parent_dag",
start_date=pendulum.now(),
schedule=None,
catchup=False, # 過去分(スケジュール分)の追いかけ実行有無。デフォルト:True
tags=["example", "dag_to_dag"]
)
def dag_parent():
@task
def prepare():
return "start child"
trigger = TriggerDagRunOperator(
task_id="trigger_child",
trigger_dag_id="child_dag", # ← 実行対象の別のDAG
wait_for_completion=True, # ← 子DAGの実行が終わるまで親DAGの次タスクへ進まない(同期実行)。デフォルト: False
poke_interval=10, # デフォルトは60秒
)
prepare() >> trigger
dag_parent()
ポイントはTriggerDagRunOperatorというOperatorを使いあるDagから別のDagを実行することができます。
| パラメータ | 意味 |
|---|---|
| trigger_dag_id | 実行したいDAGの dag_id |
| wait_for_completion=True | 子DAGが終わるまで待つ(同期実行) |
| wait_for_completion=False | 起動だけして次に進む(非同期) |
子のDagを作成
from airflow.sdk import dag, task
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
import pendulum
@dag(
dag_id="child_dag",
start_date=pendulum.now(),
schedule=None,
catchup=False, # 過去分(スケジュール分)の追いかけ実行有無
tags=["example", "dag_to_dag"]
)
def dag_child():
@task
def run_child():
return "Child DAG executed!"
run_child()
dag_child()
airflowの管理画面で起動してみる
親のDagparent_dagを実行すると子のDagchild_dagが自動で実行されます。

まとめ
今回はDagからDagを実行するケースを調査してみました。
1つのDagでどこまでまとめて定義するか、Dagを分割するかは基準を設け設計することでシステムの統一感を持たせるようにする方が望ましいでしょう。
TriggerDagRunOperatorに次に実行したいDagを指定することでDagからDagを実行することを確認しました。

コメント