Apache Airflowについて調べてみたこと。

icon_airflow

前々から分散データ処理やデータプラットフォームについて興味がありFluentdなどの書籍は読んだりしていたのですが、今回は自身の業務でもジョブ管理システムについて調べる必要があったりしたので、今回はワークフロー管理ツールである「Apache Airflow」についての内容をまとめてみました。

目次

Apache Airflowとは?💡

Apache Aireflowは、

ワークフローをプログラムで作成、スケジュール、監視するためにコミュニティによって作成されたプラットフォーム

となります。

なので、ジョブの実行の定義をPythonで作成したり、管理用画面が用意されていたりするプラットフォームになります。

Apache Airflow誕生の背景

2014年にAirbnbの元FacebookのエンジニアMaxime Beaucheminによって開発され、その当時、

  • ジョブの依存関係が見えない
  • 運用が属人化
  • ログが散財し追跡困難
  • 変更・追跡が容易でない

という課題を抱えていたとのことです。

Airbnbは、上記問題に対して「データパイプラインをコードで定義し、視覚的・運用的に制御可能にする」という思想でAirflowを開発したようです。

Apache Airflowの特徴

1.スケーラブル

モジュール型アーキテクチャを採用しており、メッセージキューを使用して任意の数のワーカーをオーケストレーションします。Airflow は無限に拡張可能。

2.動的管理

パイプラインはPythonで定義されており、動的なパイプライン生成が可能です。これにより、パイプラインを動的にインスタンス化するコードを記述できる。

3.伸長性

独自の演算子を簡単に定義し、環境に適した抽象化レベルに合わせてライブラリを拡張できる。

4.エレガント

Apache Airflow® パイプラインは、シンプルで明示的な設計。強力な Jinja テンプレートエンジンを使用したパラメータ化は、そのコア部分に組み込まれている。

支持される理由

Aireflowは業界標準のワークフローエンジンとされ、多くのクラウド・企業・OSSプロジェクトで採用されていますが、支持される理由としては以下があげられます。

  1. 柔軟性と拡張性
    • Pythonで定義するので基本的にcronなどのツールより柔軟に条件分岐や動的な処理を管理することができる。
  2. 可視化と運用性の高さ
    • DAGの構成、各タスクのログ、失敗タイミングが全てGUIで追える。
    • メール・Slack通知、Webhooksも設定可能。
  3. 豊富なOperatorとプラグイン群
    • BashOperator, PythonOperator, EmailOperator, KubernetesPodOperator, S3Operator, MySqlOperator… など豊富。
    • 自作オペレーターも作成して追加することができる。
    • AWS、Azure、GCPとの統合プラグインも豊富
  4. 大規模運用への耐性と分散処理
    • Executer(Celery、KubernetesExecutor)を使えば大規模分散環境にも対応可能
  5. ベンダーによるマネージサポートが増加
    • GCP→Cloud Composer
    • AWS→MWAA(Managed Workflows for Apache Aireflow)
    • Azure → Data Factory 統合 Managed Airflow

Aireflowが開発され、ここまで支持されている理由は単純に「ジョブをスケジュールできる」からではなく、コードベースで再現性のあるワークフローが管理できることで、属人化を排除したり拡張性・運用性のバランスがよいためとなります。

今後はLLMを活用できるエンジニアやデータエンジニア、インフラエンジニアなどが活躍するためにも、Airflowというデータプラットフォームの理解、活用できるようにすることが求められていくと思います。

Aireflowの要素

Dag(Directed Acyclic Graph)とは

Apache Airflow の根幹を成す概念の一つがDag(Directed Acyclic Graph)になります。私は、Airflowを調べるまでDagというものが何なのかわかりませんでした。。

Airflowを理解する上で「Dag」を理解することは重要になるので、用語を分解して理解していきたいと思います。

用語意味
Directed : 有向向きが有る。
taskA→taskBというような流れ。
Acyclic : 非巡回ループしない。
taskA→taskB→taskC→taskAのような循環禁止。
Graph : グラフノード(タスク)とエッジで構成される構造体。

Dagのイメージ

Dagの属性

Airflowの動作はDagが担っており、ワークフローの実行に必要なすべてをカプセル化したモデルになり、次の様な属性を含むものです。

Schedule - スケジュール

ワークフローを実行するタイミング。

Tasks - タスク

タスクはWorker上で実行される個別の作業単位。

Task Dependencies - タスクの依存関係

タスクが実行される順番と条件。

コールバック - Callbacks

ワークフロー全体が完了したときに実行されるアクション。

追加パラメータ - Additional Parameters

その他多くの操作詳細。

Dagのサンプルコード

Dagの基本が確認できるサンプルコードが以下になります。

from datetime import datetime

from airflow.sdk import DAG, task
from airflow.providers.standard.operators.bash import BashOperator

# A Dag represents a workflow, a collection of tasks
with DAG(dag_id="demo", start_date=datetime(2025, 11, 1), schedule="0 0 * * *") as dag:
    # Tasks are represented as operators
    hello = BashOperator(task_id="hello", bash_command="echo hello")

    @task()
    def airflow():
        print("airflow")

    # Set dependencies between tasks
    hello >> airflow()
  • dag_id="demo"がDagの名前を指定する部分になります。
  • start_date=datetime(2025, 11, 1), schedule="0 0 * * *" が2025年11月1日の0時0分になります。
    • "0 0 * * *"の部分は前の値から「時」「日」「月」「年」「週」の設定となります。
  • helloはBashOperatorを使用してshell scriptを実行し、airflow@taskデコレータを使用して定義しています。BashOperator の他には以下のようなオペレータが利用可能です。
    • PythonOperator- Python関数をコール
    • EmailOperator - Eメールを送信
    • SimpleHttpOperatro - HTTPリクエストを送信
    • Sensor - 特定の時間、ファイル、データベース行、S3 キーなどを待機。
  • >>は2つのタスク間の依存関係を定義して、実行順序を制御するものになります。
    • >> は想像どおり、op1 >> op2 のときに op1が終わってからop2を実行する指定方法。op1.set_downstream(op2)という指定も同じ意味になる。
    • <<op2 << op1とop1 が完了してから op2 を実行する。op2.set_upstream(op1)も同じ意味になる。
    • op3.set_upstream([op1, op2])はop1,op2が完了したらop3を実行するという意味になります。
op1 >> op3
op2 >> op3

Airflowを動かしてみる(ローカルPC by Docker)

Airflowの公式がdocker composeを提供しているのでdocker composeでサクッと起動していきます。

Version: 3.1.3 が最新の安定版となるので、この手順を元に実施していきます。

docker composeを取得する。

Airflowをデプロイするためのdocker-compose.yamlを取得します。

curl -LfO 'https://airflow.apache.org/docs/apache-airflow/3.1.3/docker-compose.yaml'

導入のための初期設定

ディレクトリ作成と.envの作成

mkdir -p ./dags ./logs ./plugins ./config
echo -e "AIRFLOW_UID=$(id -u)" > .env

docker composeの起動

docker compose run airflow-cli airflow config list

# airflow-cli:docker-compose.yaml 内で定義されたサービス名。Airflow CLI 専用の軽量コンテナ
# airflow config list:Airflow の設定(airflow.cfg)をリスト表示する CLI コマンド

データベースの初期化

docker compose up airflow-init

初期化が完了すると、ファイル、フォルダー、プラグインに関連する出力が表示され、最後に次のようなメッセージが表示されます。

airflow-init-1 exited with code 0

全てのサービスを実行させる

docker compose up

docker-compose.yaml で定義されているサービスは以下になります。ダウンロードされるdocker-compose.yamlのコメント部分を除外した内容についても記載します。

airflow-scheduler

すべてのtaskとDagを監視し、依存関係が完了するとタスクインスタンスをトリガーします。

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully
airflow-dag-processor

DagプロセッサはDagファイルを解析するためのサービスです。

  airflow-dag-processor:
    <<: *airflow-common
    command: dag-processor
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type DagProcessorJob --hostname "$${HOSTNAME}"']
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

docker compose up を行い出力内容に以下のような出力を定期的に行われることが確認できます。

:
:
airflow-dag-processor-1  | 2025-11-23T09:39:29.685973Z [info     ] Not time to refresh bundle example_dags [airflow.dag_processing.manager.DagFileProcessorManager] loc=manager.py:550
airflow-dag-processor-1  | 2025-11-23T09:39:30.697466Z [info     ]
airflow-dag-processor-1  | ================================================================================
airflow-dag-processor-1  | DAG File Processing Stats
airflow-dag-processor-1  |
airflow-dag-processor-1  | Bundle        File Path                                                   PID    Current Duration      # DAGs    # Errors  Last Duration    Last Run At
airflow-dag-processor-1  | ------------  ----------------------------------------------------------  -----  ------------------  --------  ----------  ---------------  -------------------
airflow-dag-processor-1  | example_dags  tutorial_objectstorage.py                                                                     1           0  1.09s            2025-11-23T09:39:08
airflow-dag-processor-1  | example_dags  example_setup_teardown_taskflow.py                                                            1           0  1.08s            2025-11-23T09:39:15
airflow-dag-processor-1  | example_dags  example_dag_decorator.py                                                                      1           0  1.07s            2025-11-23T09:39:13
:
:
airflow-api-server

api server は http://localhost:8080で利用できます。

  airflow-apiserver:
    <<: *airflow-common
    command: api-server
    ports:
      - "8080:8080"
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/api/v2/version"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully
airflow-worker

スケジューラによって指定されたタスクを実行する役割を担います。

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    environment:
      <<: *airflow-common-env
      DUMB_INIT_SETSID: "0"
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-apiserver:
        condition: service_healthy
      airflow-init:
        condition: service_completed_successfully
airflow-triggerer

Airflow 2.2 以降に導入された新機能である Deferrable Operators(遅延オペレーター) に対応するための Triggererプロセス を起動するサービスです。Deferrable Operator は、非同期で待機可能なタスク(例:センサー系処理)を リソース消費を抑えたまま待機 できる仕組みで、その処理の管理役がこの triggerer です。

  airflow-triggerer:
    <<: *airflow-common
    command: triggerer
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully
airflow-init

初期化用サービスです。

  airflow-init:
    <<: *airflow-common
    entrypoint: /bin/bash
    command:
      - -c
      - |
        if [[ -z "${AIRFLOW_UID}" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
          echo "If you are on Linux, you SHOULD follow the instructions below to set "
          echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
          echo "For other operating systems you can get rid of the warning with manually created .env file:"
          echo "    See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
          echo
          export AIRFLOW_UID=$$(id -u)
        fi
        one_meg=1048576
        mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
        cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
        disk_available=$$(df / | tail -1 | awk '{print $$4}')
        warning_resources="false"
        if (( mem_available < 4000 )) ; then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
          echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
          echo
          warning_resources="true"
        fi
        if (( cpus_available < 2 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
          echo "At least 2 CPUs recommended. You have $${cpus_available}"
          echo
          warning_resources="true"
        fi
        if (( disk_available < one_meg * 10 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
          echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
          echo
          warning_resources="true"
        fi
        if [[ $${warning_resources} == "true" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
          echo "Please follow the instructions to increase amount of resources available:"
          echo "   https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
          echo
        fi
        echo
        echo "Creating missing opt dirs if missing:"
        echo
        mkdir -v -p /opt/airflow/{logs,dags,plugins,config}
        echo
        echo "Airflow version:"
        /entrypoint airflow version
        echo
        echo "Files in shared volumes:"
        echo
        ls -la /opt/airflow/{logs,dags,plugins,config}
        echo
        echo "Running airflow config list to create default config file if missing."
        echo
        /entrypoint airflow config list >/dev/null
        echo
        echo "Files in shared volumes:"
        echo
        ls -la /opt/airflow/{logs,dags,plugins,config}
        echo
        echo "Change ownership of files in /opt/airflow to ${AIRFLOW_UID}:0"
        echo
        chown -R "${AIRFLOW_UID}:0" /opt/airflow/
        echo
        echo "Change ownership of files in shared volumes to ${AIRFLOW_UID}:0"
        echo
        chown -v -R "${AIRFLOW_UID}:0" /opt/airflow/{logs,dags,plugins,config}
        echo
        echo "Files in shared volumes:"
        echo
        ls -la /opt/airflow/{logs,dags,plugins,config}
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_MIGRATE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
      _PIP_ADDITIONAL_REQUIREMENTS: ''
    user: "0:0"
postgres

airflowで利用するデータベースサービスです。

  postgres:
    image: postgres:16
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 10s
      retries: 5
      start_period: 5s
    restart: always
redis

Airflow 自体は CeleryExecutor を使っているときに、Redis(またはRabbitMQ)をメッセージブローカーとして使用します。

CeleryExecutorにおけるBrokerとしてのRedis
• Airflow Scheduler や Web UI から送られてくるジョブ情報(TaskInstance)を、Redis 経由で各 Worker に配信する役目
• メッセージキューとして動作し、「このタスクを実行してください」という命令を Redis に一旦貯めて、Worker がそれを取りにいくイメージ

Redis を使う理由
• Celery + Redis の組み合わせにより、タスクの並列実行が可能となる。
• タスクの 分散処理(スケールアウト) に強い。
• RabbitMQ より導入が簡単で軽量、Docker との相性も良い。

  redis:
    image: redis:7.2-bookworm
    expose:
      - 6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 30s
      retries: 50
      start_period: 30s
    restart: always
flower

Flowerとは?

Celery の監視UIツールであり、Airflowが CeleryExecutor を使用している場合に有効です。
以下のような情報をリアルタイムで Web UI から確認できます:
• 各 Worker の状態(起動済み/停止中など)
• 各タスクの実行状況(成功/失敗/保留中)
• タスク実行ログ
• キューの深さ
• リソース負荷状況 など

Celeryとは

Pythonで書かれた分散タスクキュー/ジョブキューシステムで、非同期処理や並列処理のために広く使われていて以下の特徴があります。

  • 複数のワーカーが同時にタスクを処理
  • 処理の依頼者(プロデューサー)と、実行者(コンシューマー)を分離
  • RedisやRabbitMQをメッセージブローカー(キュー)として利用
  • キューにタスクを入れると、Workerがそれを取り出して処理

結局のところ、AirflowにおけるCeleryの役割としては、Airflow で CeleryExecutor を使っている場合、Celeryは以下のように機能するイメージとなります。

  • Airflow Scheduler が Redis に「このDAGタスクを実行して」と依頼
  • Celery Worker が Redis からタスクを取り出して実行
  • 結果を Airflow の DB に記録

つまり、タスクを分散して並列実行したいときの実行基盤として利用します。

  flower:
    <<: *airflow-common
    command: celery flower
    profiles:
      - flower
    ports:
      - "5555:5555"
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

※flowerには、profiles: キーが設定されているので、通常の docker compose up コマンドでは起動されないため、以下のコマンドを別で実行する必要があります。

docker compose --profile flower up

Airflowの管理画面にアクセスする

docker compose upでairflowの各種サービスを立ち上げたら、http://localhost:8080 をブラウザで開くと以下のようなログイン画面が表示さるので、Username, Passwordを入力してログインします。

  • Username: airflow
  • Password: airflow

ログイン後の管理画面トップページ

Airflowの管理画面の説明については以下公式サイトを参考にしてみてください。

Dagを新規作成してみる

DagのファイルはPythonで記載していきます。

今回は、シンプルにbashを起動するタスクを実行する処理を作成してDagを追加するシンプルな方法を確認していきたいと思います。

事前に押さえておきたいこととしては、Dagの作成方法には2通りのやり方があります。

1つめが、Operatorを使う方法。これは従来の方法でbashを実行する場合は、BashOperatorがあります。

もう1つ目が、Airflow2.0で導入されたTaskFlow APIを使う方法となります。

作成前の予備知識 💡

  • ファイルの配置場所
    • 今回はAirflow の docker-compose.yaml を使っているので、Dag ファイルはホスト側の./dagsディレクトリに置きます。
    • ./dagsディレクトリはdocker volumesでairflowの各コンテナで共有されるようになっています。

TaskFlow APIで Dagを作成してみる

import json
import pendulum

from airflow.sdk import dag, task

@dag(
  schedule=None,
  start_date=pendulum.datetime(2025, 11, 29, tz="UTC"),
  catchup=False,
  tags=["example"]
)

def tutorial_taskflow_api():
  @task()
  def extract():
    data_string = '{ "1001": 301.27, "1002": 433.21, "1003": 502.22 }'

    order_data_dict = json.loads(data_string)
    return order_data_dict
  
  @task(multiple_outputs=True)
  def transform(order_data_dict: dict):
    total_order_value = 0

    for value in (order_data_dict.values()):
      total_order_value += value

    return { "total_order_value": total_order_value }

  @task()
  def load(total_order_value: float):
    print(f"Total order value is: {total_order_value:.2f}")
  
  order_data = extract()
  order_summary = transform(order_data)
  load(order_summary["total_order_value"])

tutorial_taskflow_api()

Airflowの管理画面に追加したDagが表示されます。

▷ボタンで実行することができます。

実行した結果`load`タスクでprintしている結果を確認することができます。

Operator(旧方式)でDagを作成したときと比較してみる

以下が同じ処理を旧方式で定義したDagとなります。

import json
import pendulum
from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator


def extract():
  data_string = '{ "1001": 301.27, "1002": 433.21, "1003": 502.22 }'
  return json.loads(data_string)
  

def transform(ti):
  order_data_dict = ti.xcom_pull(task_ids="extract")
  total_order_value = sum(order_data_dict.values())
  return {"total_order_value": total_order_value}


def load(ti):
  total = ti.xcom_pull(task_ids="transform")["total_order_value"]
  print(f"Total order value is: {total:.2f}")


with DAG(
  dag_id="legacy_etl_pipeline",
  schedule=None,
  start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
  catchup=False,
  tags=["example"]
) as dag:
  extract_task = PythonOperator(task_id="extract", python_callable=extract)
  transform_task = PythonOperator(task_id="transform", python_callable=transform)
  load_task = PythonOperator(task_id="load", python_callable=load)

  extract_task >> transform_task >> load_task

TaskFlow APIで定義している方がシンプルに定義できるように見えますね。

以上、今回はAirflowの紹介でした。

今後Airflowで調査したいこと

  • ユースケース
    • ある特定のディレクトリにファイルが配置されたら起動する。
    • javaで作成されたバッチをイメージしてjarファイルを起動する。
  • 実際の環境どうする?
    • クラウドに構築するとしたら?
    • クラウドにマネージサービスある?
  • データプラットフォームとしての活用方法
  • Airflowの機能紹介

コメント

コメントする

目次