Airflow の Dynamic Task Mapping を使ってみた # doda Developer Group Advent Calendar 2024

この記事は、doda Developer Group Advent Calendar 2024の19日目の記事としてお届けします。

 

はじめに

こんにちは。パーソルキャリア株式会社の佐藤政美です。

私は主に「dodaサイトのアーキテクチャ改善」を担当しており、これまでAWSのさまざまなサービスを活用し、改善活動を行ってきました。

今回の記事では、「Amazon Managed Workflows for Apache Airflow」を実際に使用した際に得られた知見や学びを共有します。

この内容が、皆さまの参考になれば幸いです。

この記事を読んで得られる知見

  • Airflow の Dynamic Task Mapping(動的タスク)の使い方
  • Airflow から ECSタスクを実行する方法

Dynamic Task Mappingとは

Apache Airflow 2.3.0から導入された Dynamic Task Mapping は、同じタスクを動的に複数回実行したり、引数ごとにタスクを並列実行したりできます。

この機能を使うと、タスクの定義を1回書くだけで対象データの数に応じたタスクを自動的に生成できます。

今回のユースケース

dodaの求人検索で「ベクトル検索」が有効かどうかを検証する実験的なプロジェクトにおいて、Dynamic Task Mappingを使用しました。

このプロジェクトでは、求人データ(複数のParquetファイル)をベクトル変換する処理をECSタスクで実行します。
具体的には、S3バケット内から最新のフォルダを特定し、そのフォルダ内に存在するファイルごとにECSタスクを動的に生成・並列実行する仕組みを作りました。

Dynamic Task Mappingを採用することで、ファイル数に応じたタスクのスケーラブルな実行が可能になり、効率的な処理が実現しています。

処理の流れ

  1. S3 バケットから最新フォルダとファイルを探索

    • boto3を使用して、指定されたS3バケット内のフォルダとファイルの一覧を取得

    • 最新フォルダを特定し、そのフォルダ内に存在するすべてのファイルをリストアップ

  2. XCom にファイルリストを保存

    • 取得したファイルリストをXComを通じて保存

    • これにより、後続のタスクでファイルリストを動的に利用可能に!
  3. Dynamic Task Mapping を使用した ECSタスクの動的生成

    • Dynamic Task Mapping機能を使用して、取得したファイルリストをもとにECSタスクを動的に生成し、実行

処理のイメージ

作成したDAGファイル

ここでは、作成したDAGファイルの全体コードを紹介します。
このコードは Airflow 2.7.2 を使用して動作確認を行っています。

※マスキング済み のコメントがある箇所(バケット名やARNなど)は、環境に応じて実際の値に変更してください。

from airflow import DAG
from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator
from airflow.decorators import task
from datetime import datetime
import boto3
import re
import logging

# 所定のバケットから最新フォルダを探し、そのフォルダ内のファイルをリストアップ
@task
def find_input_files():
    session = boto3.Session()
    sts_client = session.client(
        'sts',
        region_name='ap-northeast-1',
        endpoint_url='https://sts.ap-northeast-1.amazonaws.com'
    )
    assumed_role = sts_client.assume_role(
        RoleArn='arn:aws:iam::123456789012:role/masked-role',  # マスキング済み: IAM Role ARN
        RoleSessionName='assume_role_session'
    )
    credentials = assumed_role['Credentials']
    s3_client = boto3.client(
        's3',
        aws_access_key_id=credentials['AccessKeyId'],
        aws_secret_access_key=credentials['SecretAccessKey'],
        aws_session_token=credentials['SessionToken']
    )
    
    bucket_name = 'masked-bucket-name'  # マスキング済み: S3バケット名
    prefix = 'doda/text_vector_batch/input/'
    latest_folder = None
    latest_timestamp = None

    response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix, Delimiter='/')
    folder_pattern = re.compile(r"^doda/text_vector_batch/input/(\d{8}_\d{4})/$")  # フォルダのパターン

    # 最新フォルダを探す
    if 'CommonPrefixes' in response:
        for common_prefix in response['CommonPrefixes']:
            folder = common_prefix['Prefix']
            match = folder_pattern.match(folder)
            if match:
                folder_timestamp = datetime.strptime(match.group(1), '%Y%m%d_%H%M')
                if latest_timestamp is None or folder_timestamp > latest_timestamp:
                    latest_timestamp = folder_timestamp
                    latest_folder = folder

    # 最新フォルダ内のファイルだけを取得
    input_files = []
    if latest_folder:
        folder_response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=latest_folder)
        if 'Contents' in folder_response:
            # ファイルのみをリストに追加
            input_files = [obj['Key'] for obj in folder_response['Contents'] if not obj['Key'].endswith('/')]
                
    logging.info("取得したinputファイル一覧: %s", input_files)

    # input_files をXComに保存
    return input_files

@task
def create_ecs_task(file_key, **kwargs):
    logging.info(file_key)

    ecs_task = EcsRunTaskOperator(
        task_id=f'run_ecs_task_{file_key.split("/")[-1]}',
        aws_conn_id='masked-aws-connection',  # マスキング済み: Airflow Connectionsで設定したID(Connectionsは、AirflowからAirflow外部に接続するための設定)
        cluster='arn:aws:ecs:ap-northeast-1:123456789012:cluster/masked-cluster',  # マスキング済み: ECSクラスターARN
        task_definition='arn:aws:ecs:ap-northeast-1:123456789012:task-definition/masked-task-definition',  # マスキング済み: タスク定義ARN
        launch_type='FARGATE',
        overrides={
            'containerOverrides': [
                {
                    'name': 'masked-container-name',  # マスキング済み: コンテナ名
                    'environment': [
                        {
                            'name': 'INPUT',
                            'value': f's3://masked-bucket-name/{file_key}'  # マスキング済み: S3バケットとパス
                        },
                        {
                            'name': 'OUTPUT',
                            'value': f's3://masked-bucket-name/doda/text_vector_batch/output/{file_key.split("/")[-2]}/output_{file_key.split("/")[-1].replace("input_", "")}'
                        }
                    ],
                },
            ],
        },
        network_configuration={
            'awsvpcConfiguration': {
                'subnets': ['subnet-xxxxxxxx', 'subnet-yyyyyyyy'],  # マスキング済み: サブネットID
                'securityGroups': ['sg-xxxxxxxxxxxx'],             # マスキング済み: セキュリティグループID
                'assignPublicIp': 'DISABLED',
            }
        }
    )

    # ECSタスクを実行
    ecs_task.execute(context=kwargs)
    return f"ECSタスク {file_key} は正常に実行されました。"

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
}

# DAG定義
with DAG(
    dag_id="test-dynamic-ecs-tasks",
    default_args=default_args,
    description='Dynamic Task Mapping テスト',
    schedule_interval=None,
    start_date=datetime(2024, 12, 16)
) as dag:

    # 入力ファイルを特定
    input_files = find_input_files()

    # ECSタスクを動的に生成・実行
    run_ecs_tasks = create_ecs_task.expand(file_key=input_files)

    input_files >> run_ecs_tasks

コードの解説

1. S3から最新フォルダとファイルリストを取得

S3バケット内の最新日時フォルダを見つけ、そのフォルダ内のファイルリストを取得します。

解説

find_input_files関数

  • boto3ライブラリを使用して、指定したS3バケット内の対象パス (doda/text_vector_batch/input/) 配下を調べます。
  • フォルダ名は YYYYMMDD_HHMM 形式で構成されており、正規表現で一致するフォルダを検出します。
  • フォルダ内のタイムスタンプを比較して、最新フォルダを決定します。
  • 最新フォルダ内のファイルをリストアップし、フォルダやサブフォルダを除外します。(ファイルのみを対象にしています。)
  • 取得したファイルリスト(input_files)をXComを通じて次のタスクに渡します。

 

input_filesには、以下のような値が格納されます。
['doda/text_vector_batch/input/20241213_1850/input_data_5_part_01.parquet',
 'doda/text_vector_batch/input/20241213_1850/input_data_5_part_02.parquet',
 'doda/text_vector_batch/input/20241213_1850/input_data_5_part_03.parquet',
 'doda/text_vector_batch/input/20241213_1850/input_data_5_part_04.parquet',
 'doda/text_vector_batch/input/20241213_1850/input_data_5_part_05.parquet']
XComについて

Airflowでは、タスク間でデータをやり取りするために XCom (Cross-Communication) 機能を使用します。
XComは、タスク実行中に生成されたデータや結果を保存し、それを後続タスクで参照する仕組みです。

今回のDAGでは、find_input_filesタスクで取得した入力ファイルのリストをXComに保存し、そのデータを動的タスクマッピングの対象として利用しています。

具体的には、@taskデコレーターを使うことで、find_input_filesの戻り値が自動的にXComへ格納され、後続のタスクでその値を利用できます。

この仕組みにより、タスク間での柔軟なデータ受け渡しが可能になり、動的に変化する処理対象にも対応できます。

2. Dynamic Task MappingでECSタスクを生成

Dynamic Task Mappingを使用して、S3から取得した各ファイルに対して個別にECSタスクを生成・実行します。

解説

create_ecs_task関数

  • S3から取得した file_key(ファイルのキー)を入力として受け取ります。
  • EcsRunTaskOperator を用いて、Fargateで動作するECSタスクを生成・実行します。
  • 実行されるコンテナには、file_key から生成した INPUTOUTPUT の環境変数を渡します。
    • INPUT: 処理対象のS3ファイルのパス。
    • OUTPUT: 処理結果を格納するS3のパス。
  • ネットワーク設定 (subnetsecurityGroup) を指定して、ECSタスクが適切に動作するよう構成しています。

Dynamic Task Mapping

  • create_ecs_task.expand(file_key=input_files) のところで、取得した複数のファイルに対して個別のタスクを動的に生成します。
  • これにより、S3のファイル数に応じてタスク数を動的に決定できます。

3. DAGの定義

ワークフロー(DAG)を定義し、スケジュールや基本設定を構成します。

解説

DAGの基本設定:

  • dag_id="test-dynamic-ecs-tasks"
    DAGの名前(一意)を指定しています。
  • schedule_interval=None
    スケジュールを無効化し、手動実行のみに設定しています。
  • start_date=datetime(2024, 12, 16)
    ワークフローの開始日時を指定しています。

default_args:

  • DAG全体に適用されるデフォルトの設定を定義しています。
  • ここでは、ownerdepends_on_past(過去タスクの依存を無効化)を定義しています。

4. Airflowタスクの依存関係の定義

各タスクの実行順序や依存関係を定義します。

解説

input_files >> run_ecs_tasks

  • find_input_files タスクの出力(S3ファイルリスト)を次のタスク create_ecs_task に渡します。
  • Dynamic Task Mappingにより、取得したファイルリストの各ファイルに対して、create_ecs_task が並列実行されます。
  • >> 演算子を使用してタスクの依存関係を指定することで、find_input_files タスクが完了してから create_ecs_task が実行されます。

動作確認の結果

まず、DAGファイルをアップした直後は、AirflowUI上で以下のように表示されます。

「create_ecs_task」のところが、Dynamic Task Mapping であり、まだ並列数が決まっていないので、[ ] という表現になっています。

このDAGをAirflow上で実行すると、対象フォルダのファイル数に応じてECSタスクが動的に生成され、並列で実行されます。

結果として、入力ファイルごとに独立した処理を効率的に実行することができました。

同時実行するタスク数を制限する

Airflowでは、ワークフロー内のタスクが同時に実行される数を制御することが可能です。これは、システムリソースの過負荷を防ぐだけでなく、他のワークフローとリソースを共有する環境で有効です。

DAGの定義で max_active_tasks を設定することで、DAG全体で同時実行されるタスク数を制限できます。たとえば、以下のように設定することで、同時に実行されるタスクの上限を3つに制限できます。

with DAG(
    dag_id="test-dynamic-ecs-tasks",
    default_args=default_args,
    description='Dynamic Task Mapping テスト',
    schedule_interval=None,
    start_date=datetime(2024, 12, 16),
    max_active_tasks=3  # 同時実行タスク数を3に制限
) as dag:

これにより、たとえDAG内に10個以上のタスクが定義されていたとしても、同時に実行されるタスクは最大で3つまでとなります。Dynamic Task Mappingを使用している場合でも、この制限は適用され、リソースの消費を管理しやすくなります。

必要に応じて、タスクの並列度を調整することで、リソース効率を最大化しつつ、安全で安定したタスク実行ができます。

まとめ

Dynamic Task Mappingを活用することで、処理対象が動的に変化するユースケースにも柔軟に対応できるAirflowワークフローを構築できます。

今回は、ECSタスクを動的に並列実行するやり方を紹介しましたが、同様のアプローチでEKSクラスターのPodを並列実行することも検証済みです。

こうした仕組みは、さまざまなバッチジョブの運用において非常に役立つと思います。ぜひこの記事を参考にして、実際の業務に取り入れていただきたいです。

プロダクト開発統括部 エンジニアリング部 dodaエンジニアリンググループ シニアエンジニア 佐藤 政美の写真

佐藤 政美 Masami Sato

プロダクト開発統括部 dodaシステムアーキテクト部 dodaアーキテクト第2グループ シニアエンジニア

SIer、会計パッケージベンダーを経て、2020年7月にパーソルキャリアに入社。入社後は、dodaサイト開発に携わりつつ、AWSを活用した新たな開発に取り組んでいる。

※2024年12月現在の情報です。