KAKEHASHI でバックエンドエンジニアをしている横田です。
今回は、一般的にクローラーを使って作成するであろう、データカタログのテーブルを Spark だけで実現する方法について紹介できればと思います。
背景・目的
- MusubiInsight のプロダクトでは、薬剤師さんや薬局のマネージャーさん向けに業務実績データの可視化を行っています
- 可視化するデータは、夜間にバッチ処理で計算しています
- データ量としては数十 ~ 数百 GB あるので、集計処理基盤として AWS Glue の Spark Job を使っています
- Spark Job の結果を AWS Glue の Hive メタストア互換のマネージドのデータカタログと連携すると、ワークフローがシンプルになったので、その方法について共有します
データカタログとは
概要
AWS Glue には、Hive メタストア互換のマネージドのデータカタログサービスがあります。 ETL 処理を書く時には、一般的には入出力のデータの場所やデータのファイルフォーマットなどを意識して、下記のようにデータを読み書きする必要があるかと思うのですが、
from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() df = spark.read.format("parquet").load("s3://some_bucket/some_data_file_or_dir_path")
データカタログでは、RDB を使っている人には馴染みやすいデータベース・テーブルとして、それらのデータのメタデータ(データの場所・ファイルフォーマット・スキーマなど)を管理してくれます。 AWS Glue では、データカタログ上のデータベース名・テーブル名を指定して、下記のようにデータを読み込むことができます
import pyspark from pyspark.context import SparkContext from awsglue.context import GlueContext glueContext = GlueContext(SparkContext()) df = glueContext.create_dynamic_frame.from_catalog( database="some_db_name", table_name="some_table_name", )
作成したテーブルは下画像のようになります。
データカタログのメリット
コード量を増やしてまで、データカタログ経由でデータにアクセスするメリットとしては、下記のようなものがあると思います。
- ファイルの中身をみなくてもデータのスキーマを確認できるようになる
- データの場所とコードが疎結合になり、コードを修正しなくてもデータの場所を変えられるようになる
- Amazon Athena など、他のクエリエンジンからも参照しやすくなる
- メタデータ情報によってデータ入力量削減などパフォーマンス上の恩恵を受けられる
- Parquet のパーティションフィルタなど
弊社では、データの中身の確認やデータマートとして Redash から Athena 経由でデータにアクセスする際にデータカタログを使っているので、とても恩恵を受けています。
データカタログの作り方
AWS Glue のデータカタログの作成方法として、一般的なのは AWS Glue のクローラーを使う方法かと思います。 クローラーといっても Web クローラーとは少し異なり、S3 などのデータの場所を指定し実行するとデータカタログのテーブルを作ってくれるというものになります。 便利なのは、データのスキーマを自動判定してくれる点で、csv や json 形式であってもデータを解析してスキーマ付きのテーブルとして作成してくれます。
課題感
改めて、この記事を書くきっかけとなった背景について説明します。
MusubiInsight チームの AWS Glue と MWAA(Airflow)を使った、バッチ処理基盤は下記の図のようになっていました
Spark の Job ではデータを作成するだけで、データカタログ化についてはクローラーを使っていたのですが、Airflow 上で集計処理とクローラーをそれぞれ一つのタスクとして扱っていたので、集計 Job -> クローラー -> 集計 Job -> クローラーというに、処理の間にクローラーを挟む必要があり、ここをスキップできれば開発時の工数を短縮できるのになという思いがありました。 基本的にデータ集計時点では、データフォーマットは Parquet を使う想定なので、実はクローラーでスキーマを自動判定しなくてもよく、Spark Job でデータカタログのテーブルを作れたら良いなと思っていたのですが、最近ようやくその方法が分かりましたので、以下にまとめます。
クローラーなしでデータカタログにテーブルを作成する方法
大まかな手順
- Glue Job の設定で、Hive メタストア連携を有効化する
- スクリプトの中で SparkSession の設定を変更する
- saveAsTable で書き込む
1. Glue Job の設定で、Hive メタストア連携を有効化する
旧 Job のページの場合、Job の作成時に設定できる(後から変更できない)
新 Job のページ(AWS Glue Studio)の場合、Job details の画面から設定できる
terraform の場合は下記のように
--enable-glue-datacatalog=true
を設定する
resource "aws_glue_job" "some_job_resource_name" {
name = "some_job_resource_name"
role_arn = var.glue_role
command {
name = "glueetl"
script_location = "s3://some_script_path_on_s3"
}
default_arguments = {
"--job-bookmark-option" = "job-bookmark-disable"
"--job-language" = "python"
"--TempDir" = var.temporary_bucket
"--enable-glue-datacatalog" = "true"
}
glue_version = "3.0"
worker_type = "G.1X"
number_of_workers = 10
}
2. スクリプトの中で SparkSession の設定を変更する
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
- enableHiveSupport 関数を呼んでから Session を作成する
3. saveAsTable で書き込む
( spark_df .write.mode("overwrite") .format("parquet") .option("compression", "snappy") .option( "path", "s3://some_data_path_on_s3", ) .saveAsTable("db_name.table_name") )
- saveAsTable でカタログ上のデータベース名・テーブル名を指定する
備考
上記の手順で Glue のデータカタログ上にテーブルを作成することはできたのですが、いくつか懸念事項もありました。
データカタログのテーブルの分類が Unknown になる
- クローラーでテーブルを作成した場合、csv, json などのファイルフォーマットが値として入るのですが、Spark で作成した場合 Unknown となってしまいました
テーブルのプロパティを見ると、分類が入っているテーブルは、パラメータに
classification
という値が入っていたので、boto3 の update_tableなどで設定してやれば、分類を入れられそうですが、Spark の API を使ってこの値を入れる方法は見つけられませんでした{ "StorageDescriptor": { ... }, "parameters": { "classification": "parquet" } }
実行されるたびに新たなテーブルが再作成される
- クローラーでテーブルを作成する場合、既にテーブルがあった場合は新規バージョンとして、バージョン管理した上でテーブルを更新する形になります
- しかし、Spark の saveAsTable で作成した場合は、毎回全く新しいバージョンとして作成されていました
- スキーマの変更があった場合など、スキーマの差分の調査などをすることができなくなるので、バージョンを利用している人は注意が必要かもしれません
まとめ
今回は AWS Glue で クローラーを使わずに Spark Job で データカタログにテーブルを作成する方法についてまとめました。
AWS Glue を使い始めた当初から、できそうなのに、なかなか情報が見つからなかったので、参考になれば幸いです。
参考資料
- Spark SQL Job のデータカタログサポートについて
- データカタログのテーブルのファイルの分類について
- Apache Spark で DataFrame をテーブルとして保存する方法
- Apache Spark と Hive テーブルとの連携
最後に
KAKEHASHI ではデータ基盤にも力を入れていて、薬局業界をデータ駆動で DX を一緒に進めていってくださる仲間を募集してます。興味がある方は、ぜひご応募ください!