KAKEHASHI Tech Blog

カケハシのEngineer Teamによるブログです。

DynamoDBからS3へApache Hudiを使って同期してみた

KAKEHASHI でバックエンドエンジニアをしている横田です。

今回は、OLTP から OLAP へのデータ同期処理で、気になった技術を使ってみました。

背景・目的

  • DynamoDB のデータを S3 に日次で同期する必要がありました
  • プロダクトで作っているデータをどうやって データレイク(S3) に持って来るのか?は難しい問題です
  • データ量がそこまで多くない場合は、DynamoDB Exportを使って、毎日全量のデータを同期することができていたのですが、毎日全量の同期は時間が間に合わない量になってきたため差分更新の仕組みを作る必要が出てきました
  • AWS Glue Elastic Viewsでも解決できそうですが、まだプレビュー中でした
  • 今回は、Apache Hudi を使って DynamoDB のデータを S3 に同期する仕組みを作りましたので、苦労も含め紹介します

Apache Hudi とは

Apache Hudiは、Uber が作ったオープンソースのストリーミング データレイク プラットフォームです。 一般的には、S3 などのストレージ上に行列データを格納する時は、数行の更新処理を行う場合でも、全量の置き換えが必要になってしまいます。しかし、Hudi は S3 上で全量ではなく、部分的な更新を実現してくれます。単純にいえば、データレイク上のデータの行の更新・削除処理ができます。

具体的には、Hudi はストレージ上に下図のようにファイルを配置します。

図1

(引用: 公式ドキュメント Merge On Read Table について)

こちらは Merge On Read テーブルと呼ばれる種類のデータ構造ですが、差分データは行指向の Avro フォーマットなどで持ち、ある時点での状態は列指向の Parquet フォーマットで持ちます。それらのファイルの配置を把握するためのメタデータもセットで格納されており、MySQL のテーブルのように型付きのテーブルとして扱えるようになっています。差分データが、全量データとは別に管理されているので、差分の書き込みコストが比較的少なくて済みます。

更新処理の実現方法として、Apache Hudi には COW(Copy On Write: 書き込み時に更新する)と MOR(Merge On Read: 読み込み時に更新する)の二種類の方法があり、テーブルを作成する際にテーブルタイプとして選択します。 イベントソーシングのように CURD イベントが細かく流れてくるようなケースでは、差分データを Avro で格納して読み込み時に最新の状態を作る MOR テーブルタイプを採用したり 書き込み頻度が少ない場合は Avro を使わず、Parquet だけで書き込み時に更新を行う COW テーブルタイプを採用したりなど、 用途に応じて使い分けることができます。

今回は、更新頻度が夜間のバッチ処理 1 日 1 度だけなので、COW を使って構築してみました。

DynamoDB のデータを S3 に同期する仕組み

今回作成した仕組みは、下図の通り DynamoDB Streams を使うものです。

図2

  1. プロダクトが利用しているテーブルに DynamoDB Streams を設定します
  2. kinesis で Streams のデータを受け取り、N 分間バッファリングした後、S3 に吐き出します
  3. 日次で Glue の PySparkJob を実行し、Hudi 形式で S3 に出力します

後で詳しく紹介しますが、Glue の SparkJob では、DataFrame を Hudi フォーマットで出力することができます。(追加の設定が必要です)

ここでのポイントは、N 分間・日次という部分を変更すれば更新頻度を柔軟に変えられるところで、頻度を増やせば、ニアリアルタイムな(10 分間隔で更新など)同期処理にも変更できます。

Hudi は Hive Metastore も出力するオプションがあるので、それを利用して Glue のデータカタログ(Hive Metastore 互換のサービス)を作ることにより、後続の処理で別の PySparkJob や Athena でのデータ利用をしやすくしています。

Glue の PySparkJob

Amazon Kinesis の出力を読み、Apache Hudi 形式で出力する処理を下記のように作りました

import boto3
from awsglue.context import GlueContext
from awsglue.transforms import DropNullFields
from pyspark.sql.session import SparkSession
from datetime import date
from decimal import Decimal

INPUT_PATH = "s3://BUCKET_NAME/KINESIS_OUTPUT"
OUTPUT_PATH = "s3://BUCKET_NAME/HUDI_OUTPUT"


boto3.resource("dynamodb")  # boto3.dynamodbを呼び出すために必要
deserializer = boto3.dynamodb.types.TypeDeserializer()


def decimal_to_float(record):
    """
    Decimal型をFloat型に変換, Nullリストの排除

    Decimal型は桁数が一致していないとParquetとして出力する際に型エラーになるため、
    Floatに変換しておく

    Listの場合は、Null型のListにならないようにNullを除いておく
    """
    if isinstance(record, dict):
        return {k: decimal_to_float(v) for k, v in record.items()}
    if isinstance(record, list):
        return [decimal_to_float(v) for v in record if v is not None]
    if isinstance(record, Decimal):
        return float(record)
    return record


def deserialize(record):
    """
    DynamoDB Streamの出力は、下表のように型を表すキーが付与されている

    キーはデータカタログを扱いづらくするので、取り除いておく

    Python                                  DynamoDB
    ------                                  --------
    None                                    {'NULL': True}
    True/False                              {'BOOL': True/False}
    int/Decimal                             {'N': str(value)}
    string                                  {'S': string}
    Binary/bytearray/bytes (py3 only)       {'B': bytes}
    set([int/Decimal])                      {'NS': [str(value)]}
    set([string])                           {'SS': [string])
    set([Binary/bytearray/bytes])           {'BS': [bytes]}
    list                                    {'L': list}
    dict                                    {'M': dict}
    """
    return decimal_to_float({k: deserializer.deserialize(v) for k, v in record.items()})

def convert(record):
    """
    DynamoDB Streamのデータを整形

    1. 更新後のデータ(NewImage)を抜き出す
    2. レコードの削除イベントをhudiの削除フラグに変換
    3. キー重複が発生したときにDynamoDBのイベント生成時刻が新しいもので更新できるように列追加
    """
    new_image = record["dynamodb"].get("NewImage")
    old_image = record["dynamodb"].get("OldImage")
    if old_image:
        # 古いデータの場合はスキーマを壊す可能性があるため、idに関わる部分だけに絞る
        old_image = {
            "id": old_image["id"],
        }
    new_record = new_image or old_image or {}
    new_record = deserialize(new_record)
    new_record["_hoodie_is_deleted"] = record["eventName"] == "REMOVE"
    new_record["event_created_at"] = record["dynamodb"]["ApproximateCreationDateTime"]
    return new_record


def save_as_hudi(df, s3_path, key, partition, comb_key, database, table):

    # Hudiのオプション
    hudi_options = {
        "hoodie.table.name": table,  # テーブル名
        # 書き込みオプション
        "hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
        "hoodie.datasource.write.recordkey.field": key,            # レコードキーのカラム名
        "hoodie.datasource.write.partitionpath.field": partition,  # パーティション対象のカラム名
        "hoodie.datasource.write.table.name": table,               # テーブル名
        "hoodie.datasource.write.operation": "upsert",             # 書き込み操作種別
        "hoodie.datasource.write.precombine.field": comb_key,  # レコードの重複制御用カラム名(同じ値のレコードが存在する場合、この値が大きい方が使用される)
        "hoodie.upsert.shuffle.parallelism": 1,  # upsert時の並列数
        "hoodie.insert.shuffle.parallelism": 1,  # insert時の並列数
        # データカタログ連携オプション(hive_sync)
        "hoodie.datasource.hive_sync.enable": "true",      # 連携を有効にする
        "hoodie.datasource.hive_sync.database": database,  # 連携先のデータベース名
        "hoodie.datasource.hive_sync.table": table,        # 連携先のテーブル名
        "hoodie.datasource.hive_sync.partition_fields": partition,  # パーティションのフィールド名
        "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",  # パーティションの方式を指定
        "hoodie.datasource.hive_sync.use_jdbc": "false",  # jdbcを利用すると接続エラーになったのでfalseにする。
    }
    df.write.format("hudi").options(**hudi_options).mode("append").save(s3_path)


def read_kinesis_output_on_s3(glue_context):
    return glue_context.create_dynamic_frame.from_options(
        connection_type="s3",
        connection_options={"paths": [INPUT_PATH], "recurse": True},
        format="json",
    )


def main(target_date):
    # Hudiを利用するときに使うシリアライザを設定
    spark = SparkSession.builder.config(
        "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
    ).getOrCreate()
    glue_context = GlueContext(spark.sparkContext)

    # dyf: GlueのDynamicFrame, df: SparkのDataFrame

    dyf = read_kinesis_output_on_s3(glue_context)
    dyf = dyf.map(convert)

    # Null型を持つフィールドの削除(ネストされたフィールドも含む)
    df = DropNullFields.apply(dyf).toDF()

    save_as_hudi(
        df,
        s3_path=OUTPUT_PATH,
        key="id",
        partition="partition_key",
        comb_key="event_created_at",
        database="glue_database_name",
        table="glue_table_name",
    )

main(date(2022, 1, 1))
  • Job 設定

    • Glue version: 3.0
    • Python のバージョン: 3
    • Spark version: 3.1
    • ジョブのブックマーク: 無効化
    • ジョブパラメータ
      • --enable-glue-datacatalog : (空文字列)
    • JAR ライブラリパス
      • s3://<MODULE_BUCKET>/hudi-spark3.1.2-bundle_2.12-0.10.1.jar
      • s3://<MODULE_BUCKET>/spark-avro_2.12-3.1.1.jar
      • s3://<MODULE_BUCKET>/libfb303-0.9.3.jar
      • s3://<MODULE_BUCKET>/calcite-core-1.16.0.jar
    • IAM Role
      • S3 への読み書き
      • Glue の Job 実行
  • ポイント

    • DynamoDB Stream の出力は、Hudi 形式(Parquet, Avro)への変換時に型の問題を起こしやすいので前処理を行なっている
      • Decimal 型を float 型に変換
      • 前処理は行列の構造を大きく変えるので Spark の DataFrame ではなく、Glue の DynamicFrame の map で変換している
      • Null 値を持つ列の排除
    • データカタログ連携オプションを有効にし、Job でデータカタログを更新できるようにしている

スキーマ進化への対応

当初、上記の Job で問題なかったのですが、運用していると DynamoDB に新たな列が追加されるという話になり、問題が発生しました。

具体的には、追加された列は新機能に利用されるものでしたので、日次バッチで処理する場合、日によって列があったりなかったりするという状況が生まれてしまいました。

Hudi では、Schema Evolution の機能によって列の増加には対応できるのですが、列が足りない場合にエラーになってしまうため Job の修正を行いました。

具体的には、

例えば、下記のようなスキーマのデータがあり

root
 |-- field1: string
 |-- nested_field1: struct
 |    |-- field2: string

機能追加により field3 が追加され下記のスキーマに変更になったが

root
 |-- field1: string
 |-- nested_field1: struct
 |    |-- field2: string
 |    |-- field3: string

たまたま、ある日に全く新機能が使われなかった場合は、差分データの型が下記のようになってしまいました。 (DynamoDB Streams は JSON 形式で出力を行うため、Spark の JSON 読み込みでは存在しないデータ列を検知できないため)

root
 |-- field1: string
 |-- nested_field1: struct
 |    |-- field2: string

この時点で、nested_field1 の列の型が構造型(struct)として、field3 が必要であると扱われるため、列が足りないという型エラーになってしまいます。

今回は、この問題を回避するため、下記のように対応を行いました。

from pyspark.sql.types import StructType, StructField, ArrayType

def merge_schemas(s1, s2):
    """
    スキーマをマージする

    仕様:
        1. 同じフィールド名で、Null型とそれ以外の型が競合した場合、それ以外の型の方を採用する
        2. 片方にフィールド名がない場合は、フィールド名を増やす
        3. 全てNullable=Trueになる

    使い方:
        df1とdf2のスキーマをマージする場合
        new_schema = merge_schema(df1.schema, df2.schema)

        ※ データはマージされないので注意
        データのマージは、unionByNameなどで行うこと
        https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.unionByName.html
    """

    s1_keys = {f.name for f in s1}
    s2_keys = {f.name for f in s2}

    errors = []
    new_fields = {}
    for key in s1_keys & s2_keys:
        s1_type = s1[key].dataType.typeName()
        s2_type = s2[key].dataType.typeName()
        if s1_type == s2_type:
            if s1_type == "struct":
                new_fields[key] = merge_schemas(s1[key].dataType, s2[key].dataType)
            elif s1_type == 'array':
                new_fields[key] = ArrayType(
                    merge_schemas(s1[key].dataType.elementType, s2[key].dataType.elementType)
                )
            else:
                new_fields[key] = s1[key].dataType
        elif s1_type == "null" or s2_type != "null":
            new_fields[key] = s2[key].dataType
        elif s1_type != "null" or s2_type == "null":
            new_fields[key] = s1[key].dataType
        else:
            errors.append(f"型のマージに失敗 {s1_type} != {s2_type}")

    for key in s1_keys - s2_keys:
        new_fields[key] = s1[key].dataType
    for key in s2_keys - s1_keys:
        new_fields[key] = s2[key].dataType

    if errors:
        raise Exception("\n".join(errors))
    return StructType(
        [StructField(key, f_type, True) for key, f_type in new_fields.items()]
    )


def fill_null_columns(glue_context, hudi_data_path, df):
    """
    既存のテーブルとの型の互換性を担保する
    1. ある日にデータが全く存在しないような列を挿入
    2. 今回集計対象分のデータが全てNullのためにNull型になってしまっている列をキャスト

    背景:
        新しい機能が追加された時など、列にデータが全く存在しない日がありうる
        そのような場合に、列が足りずにHudiのUpsertでエラーになる
        エラーを防ぐために、Nullの値を埋めて想定される型にキャストする
    備考:
        既存のHudiデータの列には、_hoodie_commit_timeなどHudiのメタデータの列が存在するので除いている

    """
    try:
        old_schema = glue_context.read.format("hudi").load(hudi_data_path).schema
    except Exception as e:
        print('table not found', e)
        return df
    new_schema = merge_schemas(df.schema, old_schema)

    hoodie_cols = [
        "_hoodie_commit_time",
        "_hoodie_commit_seqno",
        "_hoodie_record_key",
        "_hoodie_partition_path",
        "_hoodie_file_name",
    ]

    current_cols = set([f.name for f in df.schema.fields])
    new_cols = [f.name for f in new_schema.fields if f.name not in hoodie_cols]

    for col in new_cols:
        if col not in current_cols:
            # 1. ある日にデータが全く存在しないような列を挿入
            df = df.withColumn(col, F.lit(None).cast(new_schema[col].dataType))
        else:
            # 2. 今回集計対象分のデータが全てNullのためにNull型になってしまっている列をキャスト
            df = df.withColumn(col, F.col(col).cast(new_schema[col].dataType))

    return df


def main(target_date):
    # Hudiを利用するときに使うシリアライザを設定
    spark = SparkSession.builder.config(
        "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
    ).getOrCreate()
    glue_context = GlueContext(spark.sparkContext)

    dyf = glue_context.create_dynamic_frame.from_options(
        connection_type="s3",
        connection_options={"paths": [INPUT_PATH], "recurse": True},
        format="json",
    )

    df = dyf.map(convert).toDF()
    df = fill_null_columns(glue_context, OUTPUT_PATH, df)

    dyf = DynamicFrame.fromDF(df, glue_context, "to_dynamic_frame")
    df = DropNullFields.apply(dyf).toDF()

    save_as_hudi(
        df,
        s3_path=OUTPUT_PATH,
        key="id",
        partition="partition_key",
        comb_key="event_created_at",
        database="glue_database_name",
        table="glue_table_name",
    )
  • ポイント
    • 一度 Hudi テーブルを出力している場合、既存のデータのスキーマを使って、列と型を合わせる処理を追加しエラーが発生しないようにしている
    • df = df.withColumn(<足りない列>, F.lit(None).cast(<既存のデータ型>)) の処理によって、型を合わせた上で Null 値が入る列を追加している

マージ処理は今後も使うと考えられたので、少し改良を行い Python ライブラリとして公開しました。https://github.com/n-yokota/pysparkschema

Hudi を使わない場合

ここまででは、Hudi を使った実装について紹介してきました。

やりたいことに対してコード量も意外と多く、本当に有用なのか?という点が気になったので、 Hudi を使わない場合、Copy On Write と同様の処理を行うためにどのようなことが必要か?を知るために、処理を書いてみました。

先程の save_as_hudi の中身を書き換えて、Parquet で出力するようにしてみます。スキーマ進化やデータカタログへの同期は難しいので省略します

def save_as_hudi(df, s3_path, key, partition, comb_key, database, table):

    old_df = glue_context.read.format("parquet").load(hudi_data_path)

    column_names = [f.name for f in old_df.schema.fields]
    new_df = (old_df
        # 差分データを足し合わせる
        .union(df)
        # comb_keyが一番大きい列で更新
        .groupBy(key)
        .agg(
            *[
                F.expr(f"max_by({col}, {comb_key})").alias(col)
                for col in column_names
            ]
        )
        # 削除フラグが立っている場合は行を除去
        .filter(F.col("_hoodie_is_deleted"))
    )

    (new_df.write
        .mode("overwrite")
        .format("parquet")
        .save(s3_path)
    )

前後でスキーマが一致している前提での Copy On Write 処理だとざっくりこんな感じでしょうか。

スキーマ進化への対応をするとなると、もう少し複雑になってきそうかなと思われます。 この処理だけ見ると、記述量も多くないので、そこまで Hudi 形式をわざわざ使うほどのことはないかも?という気もしますね・・。

Hudi を採用するとしたら、スキーマ進化の想定・Merge On Read が必要なぐらい頻繁な更新 みたいな要件がポイントになってくるかもしれません。

まとめ

今回は DynamoDB のデータを S3 に同期する仕組みについて紹介しました。

Apache Hudi は便利な機能ではあるものの、列・型がしっかりしている分、前処理が必要であるということがわかりました。

COW の場合、Hudi を使わなくても同様の処理はできそうかなということも確認しました。

今後 Apache Hudi の採用を検討されている方への参考になれば幸いです。

最後に

社内のエンジニアがDevOpsDays Tokyo 2022に登壇します。イベントに参加される方はぜひ。

一緒に働く仲間も募集してます。