こんにちは、カケハシの横田です。今回は我々の会社で実践しているテクノロジーについてお話しします。我々の開発チームは、日々 Apache Spark を活用し 100 個以上のバッチ処理を運用する中で、 AWS Glue という Apache Spark のマネージドサービスに大いにお世話になっています。
しかし、バッチ処理のテストを行う際にはいくつかの課題に直面します。特に想定外のデータへの網羅性を確認することが難しく、それを検証するために今までは本番データを使って毎回確認を行っていました。そうすると、1 回の確認のために数 10 分待つ必要があり、バッチ処理開発の開発に時間がかかってしまっていました。そこで、我々は開発段階からエラーを捉えるための特別なテスト環境を構築しました。今回の記事ではその詳細について紹介します。
この記事で紹介するコード類はGitHub で公開していますので気軽にご参照ください。
全体像
テストを GitHub Actions 上で CI/CD を行うときのイメージ図になります。Glue と互換性のある環境を Docker を使って構築し、そこでバッチ処理の PySpark のスクリプトをテストし、テストが通ったら S3 にデプロイします
ディレクトリ構成
. ├── docker-compose.yml ├── run_tests.sh ├── glue_jobs │ ├── __init__.py │ └── some_batch_pyspark_script.py ├── modules │ └── some_module.py ├── settings │ └── spark.conf └── tests ├── __init__.py ├── conftest.py └── test_some_batch_pyspark_script.py
ローカル環境では Docker を利用してテストします。Github Actions 上でも同様にテストを実行することができます。
テストの実行は pytest を想定しています。tests ディレクトリの中にテストファイルや fixture などを入れる conftest.py を配置しています。docker コンテナ上に対して pytest を実行するため、run_tests.sh というスクリプトを使って実行するようにしています。
run_tests.sh の内容は、下記のようになっています。(実際は、共通 module をコピーする処理が入っていますが今回は省略)
docker compose exec -T -u glue_user \ -w /home/glue_user/workspace/jupyter_workspace \ glue.dev /home/glue_user/.local/bin/pytest $1
tty がないとエラーになってしまうので、-T
オプションがポイントです。
Docker Compose ファイルの構成
次はテスト用のコンテナを作る docker-compose.yml ファイルの内容を紹介します
version: "3.5" services: glue.dev: container_name: glue.dev image: amazon/aws-glue-libs:glue_libs_4.0.0_image_01 volumes: - ./settings/spark.conf:/home/glue_user/spark/conf/spark-defaults.conf environment: - DISABLE_SSL=true # dummy configure - AWS_REGION=ap-northeast-1 - AWS_OUTPUT=json - AWS_ACCESS_KEY_ID=test - AWS_SECRET_ACCESS_KEY=test ports: # spark uiのポート - 4040:4040 command: /home/glue_user/jupyter/jupyter_start.sh
コンテナのイメージにはAWS が提供している Glue 4.0 のイメージを利用します。CPU のアーキテクチャはlinux/arm64
もlinux/amd64
もあるので、M2 Mac の方でも Intel Mac の方でも利用できますね!
本番環境で G.1X を調べてみたところx86_64(linux/amd64)
でしたので、細かい動作確認はアーキテクチャを合わせたほうが良いかもしれません。
Github Actions の Workflow
次は CI/CD を行う Github Actions を紹介します。
name: Test & Deploy GlueJobs jobs: test: runs-on: ubuntu-latest timeout-minutes: 15 steps: - uses: actions/checkout@v3 - name: Cache Docker Registry uses: actions/cache@v3 with: path: /tmp/docker-registry key: docker-registry-${{ github.ref }}-${{ github.sha }} restore-keys: | docker-registry-${{ github.ref }} docker-registry- - name: prepare container run: | docker-compose up -d - name: test run: | bash run_tests.sh deploy: runs-on: ubuntu-latest timeout-minutes: 5 steps: - uses: actions/checkout@v3 - name: Upload gluejob Files env: AWS_EC2_METADATA_DISABLED: true AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} run: | aws s3 cp glue-jobs/ s3://some_bucket/some_path/ --recursive --exclude="*" --include="*" --acl bucket-owner-full-control
GitHub Actions 上でテストするときもコンテナを立ち上げてから、テストのコマンドを実行すればテストができます。
テストが完了した後に S3 へコピーして本番環境に反映しています。
テスト専用の Spark の設定
上記の設定で実際に動かしてみると、単体テストに時間がかかっていることに気づきます。初め、上記の設定でテストを動かしたときにはテスト 1 つあたりの実行時間が 1 分程度かかっていました。本番環境で動かすよりは早く実行できるとはいえ、それだけテストに時間がかかるとローカルで実行するときにもストレスがかかりますし、CI/CD 環境で実行するときにもタイムアウトエラー問題にも悩まされますし、コスト面での影響もあります。 そこで、いくつかテスト用に設定を追加しています。それらの設定は conftest.py にまとめているので紹介します。
@pytest.fixture(scope="session") def glue_context(): """ テスト用の高速起動の設定を使ってglue_contextを作成する 参考: https://medium.com/constructor-engineering/faster-pyspark-unit-tests-1cb7dfa6bdf6 """ spark = ( SparkSession.builder.master("local[1]") .config("spark.sql.shuffle.partitions", "1") .config("spark.ui.showConsoleProgress", "false") .config("spark.ui.enabled", "false") .config("spark.ui.dagGraph.retainedRootRDD", "1") .config("spark.ui.retainedJobs", "1") .config("spark.ui.retainedStages", "1") .config("spark.ui.retainedTasks", "1") .config("spark.sql. ui.retainedExecutions", "1") .config("spark.worker.ui.retainedExecutors", "1") .config("spark.worker.ui.retainedDrivers", "1") .getOrCreate() ) yield GlueContext(spark.sparkContext) spark.stop()
主にこちらの記事を参考に設定しています。 ポイントは、
- fixture の scope を session に設定し、テストに対して 1 度だけ GlueContext が作成されるようにする
- テスト用のデータは少ないので、複数コアを使うオーバーヘッドを下げるために
local[1]
としてコアを 1 つだけ使うようにする - シャッフル後のパーティションの処理のオーバーヘッドを減らすために
shuffle.partitions=1
にする - SparkUI は不要なので OFF にする(SparkUI の設定の意味はこちら)
あたりかと思います。これらの設定を入れることでかなりテスト実行が早くなりました。
テストの書き方
最後にテストの書き方について紹介します。
テスト用データは色々な集計における分岐を網羅するパターンを作りたいので Python でテストデータファクトリを作って生成しています。
集計で検査する必要がある分岐は、主にNULL/非NULL
の違いや Enum の網羅、F.when
・F.filter
を使っている箇所の境界値であることが多いです。
テスト結果についてはスナップショットテストを行っています。
まずは、バッチ処理は下記のように書いています。
from awsglue.context import GlueContext from some_module import add_now_column from pyspark.sql import functions as F from pyspark.sql.session import SparkSession class DataStore: ''' データのI/Oを担う ''' def __init__(self, glue_context: GlueContext): self.gc = glue_context def get_some_df(self): self.gc.create_dynamic_frame.from_catalog( database="some_database", table_name="some_table", ) def to_store(self, spark_df): ( spark_df.write.mode("overwrite") .format("parquet") .option("compression", "snappy") .save("s3://some_bucket/some_key") ) def calculation(data_store): ''' 集計処理を担う ''' spark_df = data_store.get_some_df() # 何かしらの処理 spark_df = add_now_column(spark_df, '2020-01-01') return spark_df def main(data_store: DataStore): ''' 出力部分を担う ''' spark_df = calculation(data_store) data_store.to_store(spark_df) if __name__ == "__main__": spark = ( SparkSession.builder .enableHiveSupport() .getOrCreate() ) glue_context = GlueContext(spark.sparkContext) data_store = DataStore(glue_context) main(data_store)
集計処理は calculation 関数に任せています。テストの対象にするのはこの calculation 関数です。集計処理がデータへのアクセスに直接依存しないようにするため、データへの I/O 部分は DataStore クラスに任せます。テスト実行時にこのファイルは import されることになるので、その時に SparkSession を作らないようにするためセッション作成部分はif __name__ == "__main__":
部分に書いています。
次にテストのファイルは下記のようになります。
from sample_batch_script import ( DataStore, calculation, ) class DataStoreStub(DataStore): """ DataStoreになりすます、テスト用のDataStoreクラス """ def get_some_df(self): return self.gc.spark_session.createDataFrame( [ [1, "test1"], [2, "test2"], ], [ "a", "b", ], ) def test_calculation(glue_context): """ calculation関数のテスト """ ds = DataStoreStub(glue_context) result_df = calculation(ds) csv = result_df.toPandas().to_csv(index=False) print(csv) assert ( csv.strip() == """ a,b,now 1,test1,2020-01-01 2,test2,2020-01-01 """.strip() )
DataStore と同じ I/O のメソッドを持つ DataStoreStub で、ダミーデータを生成させます。上記ではオーバーライドしていますが、ダックタイピングでも良いかと思います。 テスト関数ではその DataStoreStub を calculation に渡して集計させ、結果をスナップショットと比較し検証します。
バッチ処理一つ一つに対して専用のデータを作るのは大変ですが、最小の構成でテストを作ることができます。冒頭で述べた想定外のデータに対しては、あらかじめここのテストケースをしっかり作ることで事前に対処します。もし想定できていないケースがあれば、適宜こちらのファイルに追加し二度と同じエラーを踏まないように対応します。
テストデータのファクトリ関数を共通化したい場合はモジュールに切り出しても良いかと思います。
まとめ
以上、AWS Glue のバッチ処理のテスト環境の構成を作ってみたので紹介させていただきました。 AWS Glue はサーバレスな Apache Spark 環境なので、スケーラビリティや耐障害性に優れているものの、動作確認のために毎回 AWS コンソール上で試すというのがなかなか手間だったので、この環境があることでかなり開発体験が良くなりました。 ご参考になれば幸いです。