KAKEHASHI でバックエンドエンジニアをしている横田です。
今回は、OLTP から OLAP へのデータ同期処理で、気になった技術を使ってみました。
背景・目的
- DynamoDB のデータを S3 に日次で同期する必要がありました
- プロダクトで作っているデータをどうやって データレイク(S3) に持って来るのか?は難しい問題です
- データ量がそこまで多くない場合は、DynamoDB Exportを使って、毎日全量のデータを同期することができていたのですが、毎日全量の同期は時間が間に合わない量になってきたため差分更新の仕組みを作る必要が出てきました
- AWS Glue Elastic Viewsでも解決できそうですが、まだプレビュー中でした
- 今回は、Apache Hudi を使って DynamoDB のデータを S3 に同期する仕組みを作りましたので、苦労も含め紹介します
Apache Hudi とは
Apache Hudiは、Uber が作ったオープンソースのストリーミング データレイク プラットフォームです。 一般的には、S3 などのストレージ上に行列データを格納する時は、数行の更新処理を行う場合でも、全量の置き換えが必要になってしまいます。しかし、Hudi は S3 上で全量ではなく、部分的な更新を実現してくれます。単純にいえば、データレイク上のデータの行の更新・削除処理ができます。
具体的には、Hudi はストレージ上に下図のようにファイルを配置します。
(引用: 公式ドキュメント Merge On Read Table について)
こちらは Merge On Read テーブルと呼ばれる種類のデータ構造ですが、差分データは行指向の Avro フォーマットなどで持ち、ある時点での状態は列指向の Parquet フォーマットで持ちます。それらのファイルの配置を把握するためのメタデータもセットで格納されており、MySQL のテーブルのように型付きのテーブルとして扱えるようになっています。差分データが、全量データとは別に管理されているので、差分の書き込みコストが比較的少なくて済みます。
更新処理の実現方法として、Apache Hudi には COW(Copy On Write: 書き込み時に更新する)と MOR(Merge On Read: 読み込み時に更新する)の二種類の方法があり、テーブルを作成する際にテーブルタイプとして選択します。 イベントソーシングのように CURD イベントが細かく流れてくるようなケースでは、差分データを Avro で格納して読み込み時に最新の状態を作る MOR テーブルタイプを採用したり 書き込み頻度が少ない場合は Avro を使わず、Parquet だけで書き込み時に更新を行う COW テーブルタイプを採用したりなど、 用途に応じて使い分けることができます。
今回は、更新頻度が夜間のバッチ処理 1 日 1 度だけなので、COW を使って構築してみました。
DynamoDB のデータを S3 に同期する仕組み
今回作成した仕組みは、下図の通り DynamoDB Streams を使うものです。
- プロダクトが利用しているテーブルに DynamoDB Streams を設定します
- kinesis で Streams のデータを受け取り、N 分間バッファリングした後、S3 に吐き出します
- 日次で Glue の PySparkJob を実行し、Hudi 形式で S3 に出力します
後で詳しく紹介しますが、Glue の SparkJob では、DataFrame を Hudi フォーマットで出力することができます。(追加の設定が必要です)
ここでのポイントは、N 分間・日次という部分を変更すれば更新頻度を柔軟に変えられるところで、頻度を増やせば、ニアリアルタイムな(10 分間隔で更新など)同期処理にも変更できます。
Hudi は Hive Metastore も出力するオプションがあるので、それを利用して Glue のデータカタログ(Hive Metastore 互換のサービス)を作ることにより、後続の処理で別の PySparkJob や Athena でのデータ利用をしやすくしています。
Glue の PySparkJob
Amazon Kinesis の出力を読み、Apache Hudi 形式で出力する処理を下記のように作りました
import boto3 from awsglue.context import GlueContext from awsglue.transforms import DropNullFields from pyspark.sql.session import SparkSession from datetime import date from decimal import Decimal INPUT_PATH = "s3://BUCKET_NAME/KINESIS_OUTPUT" OUTPUT_PATH = "s3://BUCKET_NAME/HUDI_OUTPUT" boto3.resource("dynamodb") # boto3.dynamodbを呼び出すために必要 deserializer = boto3.dynamodb.types.TypeDeserializer() def decimal_to_float(record): """ Decimal型をFloat型に変換, Nullリストの排除 Decimal型は桁数が一致していないとParquetとして出力する際に型エラーになるため、 Floatに変換しておく Listの場合は、Null型のListにならないようにNullを除いておく """ if isinstance(record, dict): return {k: decimal_to_float(v) for k, v in record.items()} if isinstance(record, list): return [decimal_to_float(v) for v in record if v is not None] if isinstance(record, Decimal): return float(record) return record def deserialize(record): """ DynamoDB Streamの出力は、下表のように型を表すキーが付与されている キーはデータカタログを扱いづらくするので、取り除いておく Python DynamoDB ------ -------- None {'NULL': True} True/False {'BOOL': True/False} int/Decimal {'N': str(value)} string {'S': string} Binary/bytearray/bytes (py3 only) {'B': bytes} set([int/Decimal]) {'NS': [str(value)]} set([string]) {'SS': [string]) set([Binary/bytearray/bytes]) {'BS': [bytes]} list {'L': list} dict {'M': dict} """ return decimal_to_float({k: deserializer.deserialize(v) for k, v in record.items()}) def convert(record): """ DynamoDB Streamのデータを整形 1. 更新後のデータ(NewImage)を抜き出す 2. レコードの削除イベントをhudiの削除フラグに変換 3. キー重複が発生したときにDynamoDBのイベント生成時刻が新しいもので更新できるように列追加 """ new_image = record["dynamodb"].get("NewImage") old_image = record["dynamodb"].get("OldImage") if old_image: # 古いデータの場合はスキーマを壊す可能性があるため、idに関わる部分だけに絞る old_image = { "id": old_image["id"], } new_record = new_image or old_image or {} new_record = deserialize(new_record) new_record["_hoodie_is_deleted"] = record["eventName"] == "REMOVE" new_record["event_created_at"] = record["dynamodb"]["ApproximateCreationDateTime"] return new_record def save_as_hudi(df, s3_path, key, partition, comb_key, database, table): # Hudiのオプション hudi_options = { "hoodie.table.name": table, # テーブル名 # 書き込みオプション "hoodie.datasource.write.storage.type": "COPY_ON_WRITE", "hoodie.datasource.write.recordkey.field": key, # レコードキーのカラム名 "hoodie.datasource.write.partitionpath.field": partition, # パーティション対象のカラム名 "hoodie.datasource.write.table.name": table, # テーブル名 "hoodie.datasource.write.operation": "upsert", # 書き込み操作種別 "hoodie.datasource.write.precombine.field": comb_key, # レコードの重複制御用カラム名(同じ値のレコードが存在する場合、この値が大きい方が使用される) "hoodie.upsert.shuffle.parallelism": 1, # upsert時の並列数 "hoodie.insert.shuffle.parallelism": 1, # insert時の並列数 # データカタログ連携オプション(hive_sync) "hoodie.datasource.hive_sync.enable": "true", # 連携を有効にする "hoodie.datasource.hive_sync.database": database, # 連携先のデータベース名 "hoodie.datasource.hive_sync.table": table, # 連携先のテーブル名 "hoodie.datasource.hive_sync.partition_fields": partition, # パーティションのフィールド名 "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", # パーティションの方式を指定 "hoodie.datasource.hive_sync.use_jdbc": "false", # jdbcを利用すると接続エラーになったのでfalseにする。 } df.write.format("hudi").options(**hudi_options).mode("append").save(s3_path) def read_kinesis_output_on_s3(glue_context): return glue_context.create_dynamic_frame.from_options( connection_type="s3", connection_options={"paths": [INPUT_PATH], "recurse": True}, format="json", ) def main(target_date): # Hudiを利用するときに使うシリアライザを設定 spark = SparkSession.builder.config( "spark.serializer", "org.apache.spark.serializer.KryoSerializer" ).getOrCreate() glue_context = GlueContext(spark.sparkContext) # dyf: GlueのDynamicFrame, df: SparkのDataFrame dyf = read_kinesis_output_on_s3(glue_context) dyf = dyf.map(convert) # Null型を持つフィールドの削除(ネストされたフィールドも含む) df = DropNullFields.apply(dyf).toDF() save_as_hudi( df, s3_path=OUTPUT_PATH, key="id", partition="partition_key", comb_key="event_created_at", database="glue_database_name", table="glue_table_name", ) main(date(2022, 1, 1))
Job 設定
- Glue version: 3.0
- Python のバージョン: 3
- Spark version: 3.1
- ジョブのブックマーク: 無効化
- ジョブパラメータ
- --enable-glue-datacatalog : (空文字列)
- JAR ライブラリパス
s3://<MODULE_BUCKET>/hudi-spark3.1.2-bundle_2.12-0.10.1.jar
s3://<MODULE_BUCKET>/spark-avro_2.12-3.1.1.jar
s3://<MODULE_BUCKET>/libfb303-0.9.3.jar
s3://<MODULE_BUCKET>/calcite-core-1.16.0.jar
- IAM Role
- S3 への読み書き
- Glue の Job 実行
ポイント
- DynamoDB Stream の出力は、Hudi 形式(Parquet, Avro)への変換時に型の問題を起こしやすいので前処理を行なっている
- Decimal 型を float 型に変換
- 前処理は行列の構造を大きく変えるので Spark の DataFrame ではなく、Glue の DynamicFrame の map で変換している
- Null 値を持つ列の排除
- データカタログ連携オプションを有効にし、Job でデータカタログを更新できるようにしている
- DynamoDB Stream の出力は、Hudi 形式(Parquet, Avro)への変換時に型の問題を起こしやすいので前処理を行なっている
スキーマ進化への対応
当初、上記の Job で問題なかったのですが、運用していると DynamoDB に新たな列が追加されるという話になり、問題が発生しました。
具体的には、追加された列は新機能に利用されるものでしたので、日次バッチで処理する場合、日によって列があったりなかったりするという状況が生まれてしまいました。
Hudi では、Schema Evolution の機能によって列の増加には対応できるのですが、列が足りない場合にエラーになってしまうため Job の修正を行いました。
具体的には、
例えば、下記のようなスキーマのデータがあり
root |-- field1: string |-- nested_field1: struct | |-- field2: string
機能追加により field3 が追加され下記のスキーマに変更になったが
root |-- field1: string |-- nested_field1: struct | |-- field2: string | |-- field3: string
たまたま、ある日に全く新機能が使われなかった場合は、差分データの型が下記のようになってしまいました。 (DynamoDB Streams は JSON 形式で出力を行うため、Spark の JSON 読み込みでは存在しないデータ列を検知できないため)
root |-- field1: string |-- nested_field1: struct | |-- field2: string
この時点で、nested_field1 の列の型が構造型(struct)として、field3 が必要であると扱われるため、列が足りないという型エラーになってしまいます。
今回は、この問題を回避するため、下記のように対応を行いました。
from pyspark.sql.types import StructType, StructField, ArrayType def merge_schemas(s1, s2): """ スキーマをマージする 仕様: 1. 同じフィールド名で、Null型とそれ以外の型が競合した場合、それ以外の型の方を採用する 2. 片方にフィールド名がない場合は、フィールド名を増やす 3. 全てNullable=Trueになる 使い方: df1とdf2のスキーマをマージする場合 new_schema = merge_schema(df1.schema, df2.schema) ※ データはマージされないので注意 データのマージは、unionByNameなどで行うこと https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.unionByName.html """ s1_keys = {f.name for f in s1} s2_keys = {f.name for f in s2} errors = [] new_fields = {} for key in s1_keys & s2_keys: s1_type = s1[key].dataType.typeName() s2_type = s2[key].dataType.typeName() if s1_type == s2_type: if s1_type == "struct": new_fields[key] = merge_schemas(s1[key].dataType, s2[key].dataType) elif s1_type == 'array': new_fields[key] = ArrayType( merge_schemas(s1[key].dataType.elementType, s2[key].dataType.elementType) ) else: new_fields[key] = s1[key].dataType elif s1_type == "null" or s2_type != "null": new_fields[key] = s2[key].dataType elif s1_type != "null" or s2_type == "null": new_fields[key] = s1[key].dataType else: errors.append(f"型のマージに失敗 {s1_type} != {s2_type}") for key in s1_keys - s2_keys: new_fields[key] = s1[key].dataType for key in s2_keys - s1_keys: new_fields[key] = s2[key].dataType if errors: raise Exception("\n".join(errors)) return StructType( [StructField(key, f_type, True) for key, f_type in new_fields.items()] ) def fill_null_columns(glue_context, hudi_data_path, df): """ 既存のテーブルとの型の互換性を担保する 1. ある日にデータが全く存在しないような列を挿入 2. 今回集計対象分のデータが全てNullのためにNull型になってしまっている列をキャスト 背景: 新しい機能が追加された時など、列にデータが全く存在しない日がありうる そのような場合に、列が足りずにHudiのUpsertでエラーになる エラーを防ぐために、Nullの値を埋めて想定される型にキャストする 備考: 既存のHudiデータの列には、_hoodie_commit_timeなどHudiのメタデータの列が存在するので除いている """ try: old_schema = glue_context.read.format("hudi").load(hudi_data_path).schema except Exception as e: print('table not found', e) return df new_schema = merge_schemas(df.schema, old_schema) hoodie_cols = [ "_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name", ] current_cols = set([f.name for f in df.schema.fields]) new_cols = [f.name for f in new_schema.fields if f.name not in hoodie_cols] for col in new_cols: if col not in current_cols: # 1. ある日にデータが全く存在しないような列を挿入 df = df.withColumn(col, F.lit(None).cast(new_schema[col].dataType)) else: # 2. 今回集計対象分のデータが全てNullのためにNull型になってしまっている列をキャスト df = df.withColumn(col, F.col(col).cast(new_schema[col].dataType)) return df def main(target_date): # Hudiを利用するときに使うシリアライザを設定 spark = SparkSession.builder.config( "spark.serializer", "org.apache.spark.serializer.KryoSerializer" ).getOrCreate() glue_context = GlueContext(spark.sparkContext) dyf = glue_context.create_dynamic_frame.from_options( connection_type="s3", connection_options={"paths": [INPUT_PATH], "recurse": True}, format="json", ) df = dyf.map(convert).toDF() df = fill_null_columns(glue_context, OUTPUT_PATH, df) dyf = DynamicFrame.fromDF(df, glue_context, "to_dynamic_frame") df = DropNullFields.apply(dyf).toDF() save_as_hudi( df, s3_path=OUTPUT_PATH, key="id", partition="partition_key", comb_key="event_created_at", database="glue_database_name", table="glue_table_name", )
- ポイント
- 一度 Hudi テーブルを出力している場合、既存のデータのスキーマを使って、列と型を合わせる処理を追加しエラーが発生しないようにしている
df = df.withColumn(<足りない列>, F.lit(None).cast(<既存のデータ型>))
の処理によって、型を合わせた上で Null 値が入る列を追加している
マージ処理は今後も使うと考えられたので、少し改良を行い Python ライブラリとして公開しました。https://github.com/n-yokota/pysparkschema
Hudi を使わない場合
ここまででは、Hudi を使った実装について紹介してきました。
やりたいことに対してコード量も意外と多く、本当に有用なのか?という点が気になったので、 Hudi を使わない場合、Copy On Write と同様の処理を行うためにどのようなことが必要か?を知るために、処理を書いてみました。
先程の save_as_hudi の中身を書き換えて、Parquet で出力するようにしてみます。スキーマ進化やデータカタログへの同期は難しいので省略します
def save_as_hudi(df, s3_path, key, partition, comb_key, database, table): old_df = glue_context.read.format("parquet").load(hudi_data_path) column_names = [f.name for f in old_df.schema.fields] new_df = (old_df # 差分データを足し合わせる .union(df) # comb_keyが一番大きい列で更新 .groupBy(key) .agg( *[ F.expr(f"max_by({col}, {comb_key})").alias(col) for col in column_names ] ) # 削除フラグが立っている場合は行を除去 .filter(F.col("_hoodie_is_deleted")) ) (new_df.write .mode("overwrite") .format("parquet") .save(s3_path) )
前後でスキーマが一致している前提での Copy On Write 処理だとざっくりこんな感じでしょうか。
スキーマ進化への対応をするとなると、もう少し複雑になってきそうかなと思われます。 この処理だけ見ると、記述量も多くないので、そこまで Hudi 形式をわざわざ使うほどのことはないかも?という気もしますね・・。
Hudi を採用するとしたら、スキーマ進化の想定・Merge On Read が必要なぐらい頻繁な更新 みたいな要件がポイントになってくるかもしれません。
まとめ
今回は DynamoDB のデータを S3 に同期する仕組みについて紹介しました。
Apache Hudi は便利な機能ではあるものの、列・型がしっかりしている分、前処理が必要であるということがわかりました。
COW の場合、Hudi を使わなくても同様の処理はできそうかなということも確認しました。
今後 Apache Hudi の採用を検討されている方への参考になれば幸いです。
最後に
社内のエンジニアがDevOpsDays Tokyo 2022に登壇します。イベントに参加される方はぜひ。 https://confengine.com/conferences/devopsdays-tokyo-2022/proposal/16477/remote-devops-learning-from-other-fields
一緒に働く仲間も募集してます。