KAKEHASHI Tech Blog

カケハシのEngineer Teamによるブログです。

ETL処理がシンプルになる!AWS Glue 3.0で使えるようになったPySparkの関数紹介

KAKEHASHI の、Musubi Insight チームのエンジニアの横田です。

KAKEHASHI では BI ツールの Musubi Insight という Web アプリケーションを提供しています。 BI ツールでは薬剤師さんの業務データを可視化しておりますが、そのデータの集計処理には AWS Glue を使っています。 今年 AWS Glue 3.0が使えるようになり、できることが増えました。 チームのデータ基盤の概要と、AWS Glue 3.0 になって新たに使えるようになった PySpark の関数をいくつか紹介していきます。

Musubi Insight チームでの AWS Glue の利用について

まず、簡単にデータ基盤の概要について紹介します。

弊社では AWS を利用しサービスを提供しているのですが、各サービスで作られたデータは S3 上に集まってくるようになっています。 集まったデータは日次のバッチ処理によって集計され、BI ツールが参照しているデータベースに格納されます。日次のバッチ処理を作るにあたって、稼働時間のみ課金され、さらに、計算リソースも柔軟に増やせるため AWS Glue を使っています。主に使っている機能は、Glue job (PySpark) や クローラー、データカタログで、バッチ処理のワークフロー管理は MWAA(Amazon Managed Workflows for Apache Airflow)で行っています。

データの流れは下記のようになっています

  1. 各種弊社プロダクトで生成されるデータが S3 に同期される
  2. S3 のデータを ETL 処理(Glue job) で集計し、その結果を S3 に保存する
  3. BI ツールで利用できるデータまで集計できたら、そのデータを S3 から Aurora MySQL に同期する
  4. BI ツールは Aurora MySQL のデータを可視化する

S3 への同期処理には、Aurora の export 機能や DynamoDB の export 機能を使っており、S3 から Aurora MySQL への同期にはload data from s3クエリを使っています。上記の図からは省略していますが、弊社ではサービスごとに AWS アカウントが分かれているのでクロスアカウントでのデータのやり取りが必要になっています。S3 はクロスアカウントでのアクセス制御が比較的容易なので、基本的に S3 経由でデータのやり取りを行なっています。

ワークフローについては、当初は Glue のワークフロー機能を使っていたのですが、複数チームでのワークフローの管理が難しいということもあり、それらをコードで管理できる Airflow を採用しています。

S3 のデータを集計するにあたって、あたかも MySQL のテーブルのようにデータにアクセスできるように、メタデータストアとして Glue のデータカタログを使っています。データカタログではデータベース・テーブルなどの粒度でスキーマを管理していて、テーブルは S3 にあるデータの実体への参照を保持しています。PySpark からデータを読み込むときは Glue のライブラリがテーブル名を指定するだけでデータを読み書きできる関数を提供してくれているので、データの入出力のコードも書きやすく読みやすくなります。(データの検証や社内利用のために Amazon Athena 経由でデータを取得する際にもデータカタログを利用できます)

データカタログを作るためには、Glue クローラーを使っています。クローラーにはデータの実体がある S3 のパスを指定しておくだけで、自動的にスキーマを読み取りデータカタログ化してくれます。クローラーも Airflow 上の一つの job として起動していて、ETL 処理の直後にクローラーを動かして後続の ETL 処理が更新後のデータカタログにアクセスできるようにする、というような使い方が多いです。

このように AWS の Glue では サーバレスの ETL 処理の基盤が充実していて、それを最大限利用する形で基盤を構築しています。現状、バッチ処理は日次で朝しか動いていないので、稼働しない時間帯には課金されないサーバレスの基盤をメインにしています。

この記事では、上記の PySpark Job で利用している Glue の新しいバージョン(3.0)が使えるようになったので、その魅力を少し紹介できればと思います。

新たに使えるようになった関数

AWS Glue 3.0 は Spark 3.1.1 に対応しているので、PySpark の集計処理の便利な関数が増えました!いくつか紹介します。

読み方

以下、説明では下記の宣言を前提に使い方を説明しています。関数の挙動を確認する際は、docker の pyspark 環境などを利用すると確認しやすいです

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.appName('app1').getOrCreate()

max_by

百聞は一見にしかず。イベントソーシングを使っているときに、最終更新のレコードの値を取ってきたいときに便利。今まで同じことをするために Window 関数や self join で書く必要があったところをかなりシンプルに書けるのはありがたいですね!

from datetime import date
df1 = spark.createDataFrame([
    ['1', '3', date(2021,1,1)],
    ['1', '4', date(2021,1,2)],
    ['2', '5', date(2021,1,1)],
    ['2', '6', date(2021,1,2)],
], ['a', 'b', 'updated_at'])

df1 = df1.groupBy('a').agg(F.expr('max_by(b, updated_at)'))

print(df1.toPandas().to_csv(index=False))

結果

a,"max_by(b, updated_at)"
1,4
2,6

make_date

列の値を使って日付データを作ることができる。year=YYYY/month=MMのようにパーティションを切っているときに便利。文字列型を経由して日付データに変換する必要がなくなりますね!

df1 = spark.createDataFrame([
    ['2021', '4'],
    ['2021', '5'],
], ['year', 'month'])

df1 = df1.withColumn('date', F.expr('make_date(year, month, "01")'))

df1.printSchema()
print(df1.toPandas().to_csv(index=False))

結果

root
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- date: date (nullable = true)

year,month,date
2021,4,2021-04-01
2021,5,2021-05-01

extract, date_part

make_date の逆で、Date 型などから年や月などを抜き出せる。こちらも、文字列型を経由してデータを取り出す必要がなくなります!

from datetime import date
df1 = spark.createDataFrame([
    [date(2021,1,1)],
    [date(2021,1,2)],
    [date(2021,2,1)],
    [date(2021,2,2)],
], ['date'])

df1 = df1.select(
    F.expr("date_part('year', date)"),
    F.expr("date_part('month', date)"),
    F.expr("extract(year from date)"),
    F.expr("extract(month from date)"),
)

print(df1.toPandas().to_csv(index=False))

結果

"date_part(year, date)","date_part(month, date)",extract(year FROM date),extract(month FROM date)
2021,1,2021,1
2021,1,2021,1
2021,2,2021,2
2021,2,2021,2

count_if

列のうち、条件に合うものの数を返してくれる。when 関数などを使う必要がなくなりますね!

df1 = spark.createDataFrame([
    ['1', '1'],
    ['1', '2'],
    ['1', '3'],
    ['2', '4'],
    ['2', '5'],
    ['2', '6'],
    ['2', '7'],
    ['2', '8'],
], ['a', 'b'])

df1 = df1.groupBy('a').agg(F.expr('count_if(b % 2 == 0)').alias('result'))

print(df1.toPandas().to_csv(index=False))

結果

a,result
1,1
2,3

from_csv

CSV 形式の文字列の列をパースして ROW にすることができる。ストレージに保存するまでもない小さいデータを作る時とかに使えそうですね!

型を指定して読み込み

df1 = spark.createDataFrame([
    ['1,2021-01-01,test'],
    ['1,2021-01-01,test'],
    ['1,2021-01-01,test'],
], ['csv_row'])

df1 = df1.withColumn('parsed', F.from_csv("csv_row", "a INT, b DATE, c STRING"))

df1.printSchema()
print(df1.toPandas().to_csv(index=False))

結果

root
 |-- csv_row: string (nullable = true)
 |-- parsed: struct (nullable = true)
 |    |-- a: integer (nullable = true)
 |    |-- b: date (nullable = true)
 |    |-- c: string (nullable = true)

csv_row,parsed
"1,2021-01-01,test","Row(a=1, b=datetime.date(2021, 1, 1), c='test')"
"1,2021-01-01,test","Row(a=1, b=datetime.date(2021, 1, 1), c='test')"
"1,2021-01-01,test","Row(a=1, b=datetime.date(2021, 1, 1), c='test')"

schema_of_csv を使って型を自動で判定して読み込み

df1 = spark.createDataFrame([
    ['1,2021-01-01,test'],
    ['1,2021-01-01,test'],
    ['1,2021-01-01,test'],
], ['csv_row'])

df1 = (df1
       .withColumn('type', F.schema_of_csv("1,2021-01-01,test", {'sep':','}))
       .withColumn('parsed',  F.from_csv("csv_row", F.schema_of_csv("1,2021-01-01,test", {'sep':','})))
)

df1.printSchema()
print(df1.toPandas().to_csv(index=False))

結果

root
 |-- csv_row: string (nullable = true)
 |-- type: string (nullable = false)
 |-- parsed: struct (nullable = true)
 |    |-- _c0: integer (nullable = true)
 |    |-- _c1: string (nullable = true)
 |    |-- _c2: string (nullable = true)

csv_row,type,parsed
"1,2021-01-01,test","STRUCT<`_c0`: INT, `_c1`: STRING, `_c2`: STRING>","Row(_c0=1, _c1='2021-01-01', _c2='test')"
"1,2021-01-01,test","STRUCT<`_c0`: INT, `_c1`: STRING, `_c2`: STRING>","Row(_c0=1, _c1='2021-01-01', _c2='test')"
"1,2021-01-01,test","STRUCT<`_c0`: INT, `_c1`: STRING, `_c2`: STRING>","Row(_c0=1, _c1='2021-01-01', _c2='test')"
  • 備考
    • 列名は自動で決まる
    • schema_of_csv に列を渡すことはできず、型判定をしたい文字列を渡す
    • YYYY-MM-DD を date 型の判定はできていない
    • YYYY-MM-DDTHH:mm:ssの形式は TIMESTAMP 型になる

その他

  • sinh, cosh, tanh, asinh, acosh, atanh などの関数が使えるようになった
  • bit 演算が強化された。bit_and, bit_or, bit_count, bit_xor
  • 日付型演算が強化された。make_interval, make_timestamp
  • Array 型に対して、any, every, some, forall が使えるようになった
  • Map 型演算が強化された。map_entries, map_filter, map_zip_with, transform_keys, transform_values
  • その他
    • typeof, version, xxhash64

まとめ

いかがでしたでしょうか。今回は KAKEHASHI のデータ基盤の仕組みと Spark 3.1.1 の便利な関数について紹介させていただきました。 今回紹介しきれませんでしたが、Glue 3.0・Spark 3.1.1 は色々な関数が使えるようになっただけではなく、さまざまなパフォーマンス改善も行われているのでバージョンを上げるメリットがたくさんあります。 是非 Glue 3.0 系にトライいただければと思います!

注記

Spark 2 系 から Spark 3 系のアップデートで、2 系で出力した Parquet データの読み込みができなくなるケースがありますので、ご注意ください

参考資料