KAKEHASHI Tech Blog

カケハシのEngineer Teamによるブログです。

2022年版 AWS Glue の Spark Job で Aurora MySQL にデータを書き込む方法まとめ

KAKEHASHI でバックエンドエンジニアをしている横田です。

私が運用している Web サービスでは、AWS Glue で ETL 処理をしたデータを Aurora MySQL に投入することでユーザーが利用できるようにしています。 その中でも「データを Aurora MySQL に投入する」方法に関して、今まで色々なパターンを試してきました。

AWS Glue の Job で作成したデータを Aurora に投入するいくつかのパターンとそのメリット・デメリットについて紹介できればと思います。

Aurora MySQL にロードする 3 つのパターン

データを MySQL に insert することを以降「ロード」と呼びます。

以下の三つのパターンについて紹介します。

  1. Aurora から SQL でロードする
  2. Spark でロードする
  3. pandas でロードする

結論としては下表のようになると考えています。

手法 データ量のスケーラビリティ 安定性 Upsert 設定の容易さ データ型の柔軟性
Aurora から SQL でロードする o o o x
Spark でロード o? ? x o
pandas でロード x o x o o

それぞれ、詳しくみていきましょう。

1. Aurora から SQL でロードする

Aurora MySQL には S3 上の csv データを取り込むクエリ を備えています。

大まかな準備の手順としては、下記のようになります

  1. Aurora が S3 にアクセスする際に利用する IAM Role を作成し、Aurora にアタッチする
  2. クエリを実行する

クエリは多機能で、下記のような文法になっています。今我々がメインで利用しているのはこちらになります。

LOAD DATA FROM S3 [FILE | PREFIX | MANIFEST] 'S3-URI'
    [REPLACE | IGNORE]
    INTO TABLE tbl_name
    [PARTITION (partition_name,...)]
    [CHARACTER SET charset_name]
    [{FIELDS | COLUMNS}
        [TERMINATED BY 'string']
        [[OPTIONALLY] ENCLOSED BY 'char']
        [ESCAPED BY 'char']
    ]
    [LINES
        [STARTING BY 'string']
        [TERMINATED BY 'string']
    ]
    [IGNORE number {LINES | ROWS}]
    [(col_name_or_user_var,...)]
    [SET col_name = expr,...]

メインで利用している理由は以下。

  • ロード処理する時に余計な課金を増やさなくても良い
    • Glue でロードする場合、ロード中の時間も課金対象になる。ロード中は Glue の CPU はほとんど使わないので勿体無い
  • Upsert をすることができる
    • load data from s3 prefix '[s3_path]' replace into table [table] などで、プライマリキーが重複した時に上書きさせることができる

デメリットは、

  • csv データが入力なので型情報が失われてしまう
    • 文字列の中に,を含むケースなど、特別な処理が必要

運用時は VPC の中で Aurora にクエリを投げる必要があるので Airflow から Python Shell を(1/16 DPU の設定で)起動し、その中で Aurora に対してload data from s3クエリを実行しています。

2. Spark でロードする

下記のように、Spark から直接 Aurora MySQL にデータを投入することもできます。

conf = {
    "url": "jdbc:mysql://some_endpoint.ap-northeast-1.rds.amazonaws.com:3306/db_name",
    "user": "some_user",
    "password": "some_password",
}
df.write.option("truncate", "true").jdbc(
    url=conf["url"],
    table=f"{db_name}.{table_name}",
    mode="overwrite",
    properties={
        "user": conf["user"],
        "password": conf["password"],
    },
)

AWS Glue で Spark Job を動かす場合、通常の起動方法だと VPC の外で動くことになるので private subnet 内に居る Aurora にアクセスすることはできません。 なので、接続(AWS Glue Connector)を作る必要があります。Connector を Aurora にアクセス可能な VPC に紐づけてやれば、Glue Job はその VPC の中で起動するため、ネットワークの問題は解決できます。

もう一点上記の conf の中に user, password などをベタがきしているところも気になると思うのですが、接続がその情報も保持してくれるためセキュアに接続情報を管理することもできます。接続を使った実装方法は下記のようになります。glueContext から接続情報を取り出し conf を作る部分だけ変更します。

from awsglue.context import GlueContext

conf = glueContext.extract_jdbc_conf("<接続名>")
df.write.option("truncate", "true").jdbc(
    url=conf["url"],
    table=f"{db_name}.{table_name}",
    mode="overwrite",
    properties={
        "user": conf["user"],
        "password": conf["password"],
    },
)

glue の DynamicFrameWriter の機能を使えば、さらにシンプルに書くことができます。事前に spark の DataFrame を glue の DynamicFrame に変換する必要があります。

from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame

dyf = DynamicFrame.fromDF(df, glueContext, "some_processing_name")
glueContext.write_dynamic_frame.from_jdbc_conf(
    frame=dyf,
    catalog_connection="<接続名>",
    connection_options={"database": "some_db_name", "dbtable": "some_table_name"},
)

書き込み先の mysql のテーブルが AWS Glue の Data Catalog 上に存在すれば、from_catalogを使うこともできます

from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame

dyf = DynamicFrame.fromDF(df, glueContext, "some_processing_name")
glueContext.write_dynamic_frame.from_catalog(
    frame=dyf,
    database="some_db_name",
    table_name="some_table_name",
)

glueContext の API を使う場合は、上書きができない(2023/01/05 時点)ことに注意が必要です。

これらを使うメリットは、ETL が 1 スクリプトで完結するので管理しやすいというところが大きいかなと思います。 いまこちらを採用していないのですが、その理由は Spark は複数サーバで動くので書き込みが複数走ることになり Aurora に同時に書き込みに行った場合に何が起こるのかが予想しづらいからです。そもそも同時書き込みになるのか?Aurora の負荷がどうなるのか?同時書き込みによるロックがどうなるのか?実行時間はどうなるのか?など、未検証な点が多くまだ採用しきれていない状況です。また、Upsert が難しいのもデメリットですね・・。

また、仕様がわかったら記事化したいと思います。

3. pandas でロードする

最終的に出力すべきデータ量が小さい場合は、pandas でロードしても良いかもしれません。

some_connection = <pymysqlなど何かのMySQLのConnectionオブジェクトを生成するもの>
pandas_df = spark_df.toPandas()
pandas_df.to_sql(name="some_table_name", con=some_connection)

当初データ量が小さい場合はこちらのアプローチを利用していたのですが、データ量が増え Spark に置き換えるタイミングで撤廃しました。 AWS Glue の PythonShell で利用する時には良い方法になると思います。Spark で使うとすると、集計するデータ量は多いが出力するデータ量は小さいケースになるかと思うので、ユースケースがかなり限られそうかなという印象です。

awswranglerを使うと pandas の DataFrame 周りの I/O が楽になるのですが、ライブラリ自体のサイズが大きく起動のオーバーヘッドの時間が長くなってしまうこともあり、結局使わなくなってしまいました。

まとめ

今回は、Spark Job で作ったデータを Aurora MySQL にロードするアプローチを紹介しました。 2022 年は AWS も ETL 周りのアップデートが多かったので、2023 年もアップデートが入ればもっと便利な方法が出てくるかもしれません。 引き続き知見をためて定番アプローチを作っていきたいと思います。

参考になれば幸いです。