KAKEHASHI Tech Blog

カケハシのEngineer Teamによるブログです。

DynamoDBのCDCデータを利用して、ニアリアルタイムにDatabricksへ取り込む

こちらの記事はDatabricks Advent Calendar 2023の16日目の記事になります。

こんにちは。カケハシのデータ基盤チームでデータエンジニアをしている松田です。カケハシでは、2022年7月ごろから全社的なデータ活用基盤のプラットフォームとしてDatabricksを採用し、1年以上経過しました。

今回の記事では、DynamoDBの変更データキャプチャ(CDC)データを利用して、ニアリアルタイムにDatabricksへデータを取り込んだことについての投稿になります。

DynamoDBのCDCデータ利用でリアルタイム化とコスト削減

DynamoDBはカケハシのメインプロダクトであるMusubiで利用しており、そのデータをETL処理して社内でデータ利活用したり、BIツールのMusubi Insightで利用しています。今までは、DynamoDB Exportを利用して全件更新するバッチ処理を行っていました。

しかし、このアプローチではリアルタイム性が不足しており、障害発生時の迅速な調査が難しく、他サービスとのデータ連携時には利用者の要件を満たさないなどの課題が生じていました。また、全件更新であるためインフラコストも大幅にかかっていました。

DynamoDBのCDCデータを利用することにより、リアルタイムなデータストリームが実現でき、これらの課題を解決することができました。

CDCデータをDatabricksへ取り込むアーキテクチャ

CDCデータを利用してDatabricksに取り込む大まかな構成とアーキテクチャは以下になります。

  • MusubiのDynamoDBからKinesis Data StreamとFirehoseを利用してCDCデータをS3に連携
  • 連携したS3データをDatabricksのAuto Loaderで読み込む
  • 読み込んだCDCデータからブロンズテーブルを生成

DynamoDBからKinesis Data Stream経由でCDCデータをS3に取り込み

はじめに、DynamoDBが構築されているAWS環境で各テーブルごとに、Kinesis Data StreamとFirehoseをTerraformでリソース追加します。以下、IAM周りのところを除いたTerraformのサンプルコードになります。

# locals.tf
locals {
  kinesis_data_stream_dynamodb_table_list = tolist([
    "<対象テーブル名>",
  ])
}
# main.tf
## Kinesis Data Stream
module "kinesis_data_stream" {
  for_each            = toset(local.kinesis_data_stream_dynamodb_table_list)
  source              = "./modules/kinesis_data_stream"
  dynamodb_table_name = each.key
}

## Kinesis Firehose Delivery-stream
resource "aws_kinesis_firehose_delivery_stream" "delivery-stream" {
  for_each    = toset(local.kinesis_data_stream_dynamodb_table_list)
  name        = "${each.key}-delivery-stream"
  destination = "extended_s3"

  kinesis_source_configuration {
    kinesis_stream_arn = module.kinesis_data_stream[each.key].kinesis_data_stream_arn
    role_arn           = "<kinesis firehoseのIAMロールARN>"
  }

  extended_s3_configuration {
    role_arn           = "<kinesis firehoseのIAMロールARN>"
    bucket_arn         = "<出力先のbucket ARN>"
    prefix             = "${each.key}/"
    compression_format = "GZIP"

    processing_configuration {
      enabled = "true"

      processors {
        type = "Lambda"

        parameters {
          parameter_name  = "LambdaArn"
          parameter_value = "<kinesis firehoseで利用するLambdaのARN>"
        }
        parameters {
          parameter_name  = "BufferSizeInMBs"
          parameter_value = "1"
        }
        # デフォルトの60sだとterraformのdiffが出続けるため、61sを指定(see: https://github.com/hashicorp/terraform-provider-aws/issues/9827)
        parameters {
          parameter_name  = "BufferIntervalInSeconds"
          parameter_value = "61"
        }
      }
    }

    cloudwatch_logging_options {
      enabled         = "true"
      log_group_name  = aws_cloudwatch_log_group.firehose-log-group.name
      log_stream_name = "S3Delivery"
    }
  }
}

## Kinesis Data Firehose output logs
resource "aws_cloudwatch_log_group" "firehose-log-group" {
  name = "/aws/kinesisfirehose/sample-dynamodb-firehose"
}

resource "aws_cloudwatch_log_stream" "firehose-log-stream-s3" {
  name           = "S3Delivery"
  log_group_name = aws_cloudwatch_log_group.firehose-log-group.name
}

## Kinesis Data Firehose Transform Function
resource "aws_lambda_function" "kinesis-firehose-ddb-cdc-function" {
  filename      = data.archive_file.kinesis-firehose-transform-ddb-cdc-function-zip.output_path
  function_name = "kinesis-firehose-ddb-cdc-transformer"

  role             = "<kinesis firehoseで利用するLambdaのARN>"
  handler          = "lambda_function.lambda_handler"
  source_code_hash = data.archive_file.kinesis-firehose-transform-ddb-cdc-function-zip.output_base64sha256
  runtime          = "python3.9"

  memory_size = 128
  timeout     = 60
}

data "archive_file" "kinesis-firehose-transform-ddb-cdc-function-zip" {
  type        = "zip"
  source_dir  = "lambda-functions/firehose-transform-ddb-cdc"
  output_path = "src/firehose-transform-ddb-cdc.zip"
}
# ./modules/kinesis_data_stream.tf
## Kinesis Data Stream
resource "aws_dynamodb_kinesis_streaming_destination" "stream-destination" {
  for_each =
  stream_arn = aws_kinesis_stream.data-stream.arn
  table_name = var.dynamodb_table_name
}

##########
## Kinesis Data Streams
resource "aws_kinesis_stream" "data-stream" {
  name             = "${var.dynamodb_table_name}-data-stream"
  retention_period = 24

  shard_level_metrics = [
    "IncomingBytes",
    "OutgoingBytes",
  ]
  encryption_type = "KMS"
  kms_key_id      = "<aliasの設定>"

  shard_count = 1
  stream_mode_details {
    stream_mode = "PROVISIONED"
  }
}

CDCデータをそのまま連携するとワンラインで出力されるため、後続で処理しやすいようにFirehoseでは改行コードを入れるLambdaを連携しています。

# lambda-functions/firehose-transform-ddb-cdc.py
import json
import boto3
import base64

def lambda_handler(event, context):
    output = []

    for record in event['records']:
        payload = base64.b64decode(record['data']).decode('utf-8')
        row_w_newline = payload + "\n"
        row_w_newline = base64.b64encode(row_w_newline.encode('utf-8'))

        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': row_w_newline
        }
        output.append(output_record)

    return {'records': output}

Terraform Applyした結果、以下のようにリソースが作成されます

DynamoDB Streams:

Kinesis Data Stream:

Kinesis Data Firehose:

Lambda:

Auto LoaderでDatabricksに読み込み

次に、Auto LoaderでS3からCDCデータをDatabricksに取り込みます。 Auto Loaderはクラウドストレージに到着する新規データファイルをインクリメンタルかつ効率的に処理をします。

取り込みに必要なパラメーターの設定を行います。

# load.py
import time
from pyspark.sql.functions import lit, expr

is_backfill = false # 過去データの取り込みするかどうか
input_s3_path = "<入力のS3パス>"
checkpoint_s3_path = "<チェックポイントのS3パス>"

catalog = "<保存先のcatalog名>"
schema = "<保存先のschema名>"
table = "<保存先のtable名>"

bronze_table_partition_key = "<パーティションキー>"
bronze_table_sort_key = "<ソートキー>"

bronze_table = f"{catalog}.{schema}.bronze_{table}"
bronze_cdc_table_name = f"{catalog}.{schema}.bronze_{table}_cdc"
temp_table_name = f"bronze_{table}_tmp"
session_time = int(time.time()) # 取り込んだ時間の記録に使用。後続のCDCデータの整合性チェックの時に利用する。

PySparkを使用して、DynamoDBから取得したデータをデシリアイズするためのUDF関数を定義します。DynamoDBの数値型はDecimal型であり、ParquetやDeltaとして出力する際に型エラーが発生する可能性があります。これを回避するために、UDF内でDecimal型をFloat型に変換します。

# load.py
def decimal_to_float(record):
    from decimal import Decimal, Inexact, Rounded
    from boto3.dynamodb.types import DYNAMODB_CONTEXT

    DYNAMODB_CONTEXT.traps[Inexact] = 0
    DYNAMODB_CONTEXT.traps[Rounded] = 0

    if isinstance(record, dict):
        return {k: decimal_to_float(v) for k, v in record.items()}
    if isinstance(record, list):
        return [decimal_to_float(v) for v in record if v is not None]
    if isinstance(record, Decimal):
        return float(record)
    return record


def deserialize(jstr):
    from boto3.dynamodb.types import TypeDeserializer
    import json

    if not jstr:
        return jstr
    deserializer = TypeDeserializer()
    jdict = json.loads(jstr)
    deserialized_jsdict = decimal_to_float(
        {k: deserializer.deserialize(v) for k, v in jdict.items()}
    )
    return json.dumps(deserialized_jsdict, ensure_ascii=False)

deserialize_udf = udf(deserialize, StringType())

def deserialize_single_column(jstr):
    from boto3.dynamodb.types import TypeDeserializer
    import json

    if not jstr:
        return jstr
    deserializer = TypeDeserializer()
    jdict = json.loads(jstr)
    deserialized_jsdict = decimal_to_float(deserializer.deserialize(jdict))
    return json.dumps(deserialized_jsdict, ensure_ascii=False).strip('"')

Auto Loaderを起動するために以下のコードを実行します。 Auto Loaderの設定項目については、以前のテックブログ「DatabricksのAuto Loaderを利用してプロダクトの監査ログを取得した」にまとめていますのでご参考にしてください。

さきほど、設定したパラメーターや作成したUDF関数を利用してS3のCDCデータを読み込み、DatabricksのブロンズCDCテーブルに書き込みをします。

# load.py
(
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", checkpoint_s3_path)
    .load(input_s3_path)
    .withColumn("session_time", lit(session_time))
    .withColumn(
        "approximate_creation_datetime", expr("dynamodb:ApproximateCreationDateTime")
    )
    .withColumn("dynamodb_new_image", deserialize_udf(expr("dynamodb:NewImage")))
    .withColumn("dynamodb_old_image", deserialize_udf(expr("dynamodb:OldImage")))
    .writeStream.format("delta")
    .option("checkpointLocation", checkpoint_s3_path)
    .option("mergeSchema", "true")
    .trigger(once=True)
    .toTable(bronze_cdc_table_name)
    .awaitTermination()
)

CDCデータからブロンズテーブルを生成

最後に、CDCデータを元にして差分更新でテーブルを作成します。

以下、サンプルコードになります。 Databricksに取り込んだCDCデータからイベントデータを抽出し、セッションとバックフィルの条件に基づいてフィルタリングします。その後、抽出したデータを最新のイベントに集約し、変更結果後のテーブルを一時生成します。

# load.py
def build_query_to_agg_cdc(cdc_table_name, primary_key, session_time, is_backfill, sort_key):
    sql = f"""
    WITH raw AS (
        SELECT
        eventID,
        eventName,
        dynamodb:ApproximateCreationDateTime,
        CASE
            WHEN eventName != 'REMOVE' THEN dynamodb:NewImage
            ELSE null
        END image,
        CASE
            WHEN eventName != 'REMOVE' THEN dynamodb:NewImage:{primary_key}
            ELSE dynamodb:OldImage:{primary_key}
        END {primary_key},
        CASE
            WHEN eventName != 'REMOVE' THEN dynamodb:NewImage:{sort_key}
            ELSE dynamodb:OldImage:{sort_key}
        END {sort_key}
        FROM {cdc_table_name}
        WHERE session_time = {session_time} OR {is_backfill}
    ), latest_events AS (
        SELECT
            {primary_key},
            {sort_key},
            MAX_BY(eventName, ApproximateCreationDateTime) eventName,
            MAX_BY(image, ApproximateCreationDateTime) newImage
        FROM raw
        GROUP BY {primary_key}, {sort_key}
    )
    SELECT
        {primary_key},
        {sort_key},
        newImage dynamodb_image,
        eventName latest_event_name
    FROM latest_events;
    """
    print(f"query_to_agg_cdc_query: {sql}")
    return sql

cdc_agg_sql = build_query_to_agg_cdc(
    bronze_cdc_table_name,
    bronze_table_primary_key,
    session_time,
    is_backfill,
    bronze_table_sort_key,
)
rdd = spark.sql(cdc_agg_sql).rdd

def create_bronze_table_schema(primary_key, sort_key=None):
    return types.StructType(
        [
            types.StructField(primary_key, types.StringType()),
            types.StructField(sort_key, types.StringType()),
            types.StructField("dynamodb_image", types.StringType()),
            types.StructField("latest_event_name", types.StringType()),
        ]
    )

spark.createDataFrame(
    rdd.map(
        lambda x: (
            deserialize_single_column(x[0]),
            deserialize_single_column(x[1]),
            deserialize(x[2]),
            x[3],
        )
    ),
    create_bronze_table_schema(bronze_table_primary_key, bronze_table_sort_key),
).createOrReplaceTempView(temp_table_name)

以下のサンプルコードでは、一時生成したテーブルから既存のテーブルが存在すればマージし、存在しなければテーブルを作成します。

# load.py
def build_query_to_merge_into_bronze_table(
    bronze_table, temp_table_name, primary_key, sort_key=None
):
    sql = f"""
    MERGE INTO {bronze_table} br USING {temp_table_name} mt
        ON br.{primary_key} = mt.{primary_key}
        AND br.{sort_key} = mt.{sort_key}
        WHEN MATCHED AND mt.latest_event_name = 'MODIFY' THEN UPDATE SET *
        WHEN MATCHED AND mt.latest_event_name = 'REMOVE' THEN DELETE
        WHEN MATCHED AND mt.latest_event_name = 'INSERT' THEN UPDATE SET *
        WHEN NOT MATCHED AND mt.latest_event_name = 'MODIFY' THEN INSERT *
        WHEN NOT MATCHED AND mt.latest_event_name = 'INSERT' THEN INSERT *
    """
    print(f"query_to_merge_into_bronze_table: {sql}")
    return sql

if spark.catalog.tableExists(bronze_table):
    spark.sql(
        build_query_to_merge_into_bronze_table(
            bronze_table,
            temp_table_name,
            bronze_table_primary_key,
            bronze_table_sort_key,
        )
    )
else:
    spark.sql(
        f"CREATE TABLE {bronze_table} AS SELECT * FROM {temp_table_name} WHERE latest_event_name != 'REMOVE'"
    )

上記の一連の処理をノートブック化して、ワークフローを構築すれば完成となります。 カケハシでそこまでのリアルタイム性が現在は求められていないため、毎時でワークフローを実行しています。

まとめ

今回、DynamoDBのCDCデータを利用して、Databricksにデータを取り組む方法についてご紹介させていただきました。それにより、プロダクト間のリアルタイムのデータ連携が可能になり、コストが大幅に削減されました。

カケハシのデータ基盤チームでは、引き続きDatabricksをフル活用していく想定です。 一緒にDatabricksを使ってデータ基盤構築しませんか。少しでも興味を持ってくださった方がいらっしゃれば以下からご応募お待ちしております。