KAKEHASHI Tech Blog

カケハシのEngineer Teamによるブログです。

Airflow Zombie Task への対処法

こんにちは。AI 在庫管理チーム ソフトウェアエンジニアの坂本です。
今回は AI 在庫管理のバッチ処理で利用しているワークフローツール Airflow (MWAA) の Zombie Task に苦労したケースとその対処法について紹介しようと思います。

概要

  • Airflow で動かしているバッチ処理が Zombie Task という事象によってエラーになってしまうことが高頻度で発生しました。
  • Airflow のタスクを適切に分離することで Zombie Task が発生してもバッチ処理が失敗することがないように変更を加えることができました。

Airflow の Zombie Task とは

  • Airflow の公式ドキュメントには以下のように記述されています。

    Zombie tasks are tasks that are supposed to be running but suddenly died (e.g. their process was killed, or the machine died). Airflow will find these periodically, clean them up, and either fail or retry the task depending on its settings.

  • Airflow が分散システムであるため、タスクのプロセス応答がない場合やマシンの停止によって、タスクが失敗した判定となって Airflow に反映される事象です。

Zombie Task で実際に起こった障害ケース

AI 在庫管理では Zombie Task を起因とした以下のようなケースに苦しめられました。

  • DAG(ワークフロー) を親子構成にしており、親 DAG から子 DAG を実行している最中にZombie Taskが発生。親 DAG が異常終了した。

  • 時間がかかるタスクを実行中に Zombie Task が発生。 リトライをするとさらに時間がかかってしまい後続処理の実行が遅れてしまった。

  • Airflow タスクから実行している Glue Job のポーリング中に Zombie Task が発生。 冪等性を保つために Glue Job の同時実行数を 1 にしていたため、タスクのリトライ時に同時実行数の制限を超過してしまい DAG が異常終了した。

どうやって対処しようとしたか

以下のような方法で Zombie Task に対処しようとしました。

① Zombie Task の発生を極力防ぐ方法

  • Airflow (MWAA) の設定値を変更することで Zombie Task の発生をなくす/減らすことができないかと調査を行いました。
  • しかし、Zombie Task が明確に減少する設定を見つけることはできませんでした。

② ワークフロー構成を作成する方法

  • ワークフローの構成を見直すことで Zombie Task が発生しても問題が発生しないようにできないか検討しました。
  • DAG の中でリソースの実行/監視のタスクを分離することで Zombie Task に強いワークフローにすることが可能であると判断できました。

③ 別のワークフローツールに乗り換える

  • Airflow に代わるワークフローを検討しました。
  • サーバーレスで実行できる Step Functions が有力ですが、ワークフロー全体を移換するコストが大きいため、今後取り組んでいきたい課題です。

実際に②の方法が最も効果的であると判断し、Airflow DAG に反映しました。

実際の対処法

基本的な対処法

Airflow の PythonSensorExternalTaskSensor を利用することで実行/監視のタスクを分離しました。

Python コード例

def execute_job(**kwargs: Any):
    job_id = execute_something()
    return job_id # job の実行 ID を XCOM に push しておく


def polling_job(target_task_id: str, **kwargs: Any):
    job_id = kwargs["task_instance"].xcom_pull(task_ids=target_task_id)
    job_status = describe_job_status(job_id)

    # ステータスによって True/False を切り替える
    if status == "SUCCESS":
        return True
    elif status == "IN_PROGESS":
        return False
    else
        raise Exception()

with DAG(...) as dag:
    # 実行タスク
    task_execute = PythonOperator(
        task_id="task_execute",
        python_callable=execute_job,
    )

    # 監視タスク
    task_polling = PythonSensor(
        task_id="task_polling",
        op_kwargs={
            "target_task_id": task_execute.z_id, # 実行したタスクのジョブ ID を取得できるようにしておく
        }
        python_callable=polling_job,
        poke_interval=60 * 2,
        timeout=60 * 120,
        mode="poke",
    )

    task_execute >> task_polling

コードを反映すると、以下のような流れになります。
実行/監視タスクを分離することで、Zombie Task が発生したとしても Lambda や Glue Job 等の リソースの再実行や DAG の異常終了を防ぐことができます。

まとめ

AI 在庫管理で苦しんだ Zombie Task 発生ケースとその対処例について紹介しました。
Zombie Task で苦しんでいる方の参考になりましたら幸いです。

参考