Airflowの基礎とワークフローとして利用するユースケースについて記事にしてきましたが、今回は運用しようとした時にどの様なアーキテクチャでAirflowを構築することになるのか検討てみようと思います。
過去2回の記事についてはこちら。👇
第1回:Apache Airflowについて調べてみたこと。
第2回:Apache Airflow:Dagが終了したら別のDagを実行する方法
前提
シチュエーションとしてはAzureのVM(Linux)にJava + SpringBootのバッチをjar化したモジュールをcronでスケジュールさせて実行している前提として、そこから移植するとしたらどのような構成が考えられるかという内容になります。また、比較対象としてAWSのアーキテクチャについても確認してみようと思います。
設計の軸について考える
基本的にはクラウドサービス上にAirflowを載せると言ってもどの程度クラウドに任せるかを考える必要があります。
極力運用に手をかけたくないのであれば、マネージドなサービスが選択対象に入ってきますが、自由度を求めるのであれば自前で構築する必要があります。この辺りは、チームや組織の方針に則って検討をすることが求められるでしょう。
また、スケジューラーやワーカーや管理画面用のサービスなどAirflowにはいくつか構成するコンポーネントの要素がありますが、これらを同じ環境に同居させるか、スケールや分離させてより柔軟な構成をとれるようにするかでも構成を考えなければならないですね。
もう一つ考える要素として「実行させるモジュールをどこで実行するか」ということを整理、検討しアーキテクチャを設計していくようにします。
マネージドな構成パターン案
【Azure】Azure Data Factoryの Managed Airflowを使う
ADF(Azure Data Factory)にAirflow環境を統合してDAGを構築させ、Airflowの運用をADFに寄せることで監視 / 統制もADFで管理がしやすくなります。
【AWS】Amazon MWAA + 実行基盤は外出し
AWS MWAA(Managed Workflows for Apache Airflow)は、AWSが提供するApache Airflowのマネージドサービスになります。MWAA上に構築することで、デプロイやスケーリング、CloudWatchでの監視連携などAWSサービスをフルに利用した構成とすることができます。
クラウドサービスと自由度を持たせたパターン
【Azure】AKS + Airflow + Azureマネージドサービス
AKS上にAirflowをデプロイし、DBにはAzure Database for PostgreSQL(Flexible Server)、キューには、Azure Cache for Redis(利用する場合)という構成にします。
【AWS】EKS + Airflow + マネージドDB/Redis
Azureと同じ様に、EKS上にAirflowをデプロイしながらもDBはマネージドなRDS for PostgreSQLとキューの管理には、ElasticCache Redis(利用する場合)という構成にします。
自前パターン
【Azure】VMに構築
VMにAireflowを構築してしまうパターンになります。現在VMを中心に利用している場合やチーム内など限定した範囲で構築したいようなパターンには向く構成になります。
【AWS】ECS/Fargateに構築
Airflow 自体を ECS 上で動かす(自前運用寄り、K8s ほど重くしない)。実行基盤も同じ ECS/Fargate に寄せやすい場合などの構成に適しているでしょう。Azure同様に場合によっては、EC2でもよいと思います。
Java jar をどこで実行するか?について考える。
Airflowはオーケストレートさせるものとして、実際の実行はJavaやPythonなどのプログラムでバッチを実行していると思いますが、それらをどこの実行基盤で回すかということについて検討する必要もあります。
一般的な実行基盤としてはクラウドにあると思いますが、VM+cron+jarで実行していたものをAirflowから起動するとしたらBatchOperatorというライブラリを利用するのが王道っぽいですね。
AWSであれば、以下の様に BatchOperator を利用します。
AWS Batchの実行基盤にはEC2やFargate上に構築したバッチをAirflowから実行するイメージです。
from airflow.providers.amazon.aws.operators.batch import BatchOperator
JOB_OVERRIDES = {
"command": ["java", "-jar", "/app/fuga.jar", "JOBA"],
# 必要なら vCPU / memory / env などもここで上書き(BatchのcontainerOverridesに相当)
# "environment": [{"name": "ENV", "value": "prod"}],
}
submit_batch_job = BatchOperator(
task_id="submit_batch_job",
job_name="joba",
job_queue="my-batch-queue",
job_definition="my-java-jobdef:1",
container_overrides=JOB_OVERRIDES,
)
AWS Batch以外の選択肢としてはFargateのECSにjarをおいて実行させるイメージもありますが、その場合のDagは以下の様にEcsRunTaskOperatorを利用した設定になる様です。
from airflow.sdk import dag
import pendulum
from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator
@dag(start_date=pendulum.datetime(2025, 1, 1), schedule=None, catchup=False)
def run_java_on_fargate():
run = EcsRunTaskOperator(
task_id="run_joba",
cluster="my-ecs-cluster",
task_definition="my-java-taskdef", # 事前に作成済みのTaskDefinition
launch_type="FARGATE",
overrides={
"containerOverrides": [
{
"name": "app",
"command": ["java", "-jar", "/app/fuga.jar", "JOBA"],
}
]
},
network_configuration={
"awsvpcConfiguration": {
"subnets": ["subnet-xxx", "subnet-yyy"],
"securityGroups": ["sg-zzz"],
"assignPublicIp": "ENABLED",
}
},
)
run_java_on_fargate()
Azureはどの様な構成が考えられるのか確認してみると、まずはWebappsをWebアプリとして利用しているのでコンテナサービスを検討してみるとAzure Container Apps のジョブというものがありました(Learn/Azure/Azure Container Apps のジョブ)。
ただ、AzureのOperatorにContainer Appsを操作するライブラリはないっぽいので、BashOperatorからazコマンドを実行する感じになるかなと思います。
以下のairflow.providers.microsoft.azure.operators.container_instances¶でAirflowから実行できそうです(要検証)。
Azure Container Instance(ACI)というシンプルにコンテナを単発で実行できる仕組みがあるようなので、こちらを活用するのもありかもしれないです。
コメント