初めまして、カケハシのデータ基盤チームでデータエンジニアをしている伊藤と申します。
最近の悩みは、二郎ラーメンを食べていないのに「二郎ラーメンの匂い(臭い?)がする」と同居人に言われることです。私のニュースは置いといて、カケハシでは全社的なデータ活用基盤のプラットフォームとしてDatabricksを採用してから半年以上経過しました。
Databricksを導入してから今まではバッチ処理しかしてませんでしたが、最近になってAutoLoaderを利用してストリーム処理をするようになりました。その対象として、弊社が提供している薬歴システムのMusubiの監査ログを扱ったので紹介させて頂きます。
AutoLoaderとは
Databricks上で特別な設定無しで、クラウドストレージに到着した新しいデータを段階的かつ効率的にDelta Lakeに取り込む仕組みです。AutoLoaderにはディレクトリごとに数百万のファイルにまで拡張できるスケーラビリティの対応や導入が容易で使いやすいといったメリットがあります。
以前、spark streamingでストリーム処理を行い、Delta Lakeに取り込んた際の課題として、読み込み対象のスキーマが想定外に更新されてデータ取得によく失敗していました。AutoLoaderにはスキーマの変更が起きた際に通知を行い、無視するか失われるデータを救助できる仕組みがあるため、その課題を解決できます。
AutoLoaderの動作原理
AutoLoaderでは新規ファイルを検知するためにディレクトリ一覧モードとファイル通知モードがあります。
本記事で紹介する事例では、Kinesis Firehoseを使用してファイルを日付順にアップロードしており、ディレクトリ一覧モードでAutoLoaderを起動し、API呼び出し数を削減しております。
アーキテクチャ
Musubiの監査ログはCloudWatchに出力されています。AWSのKinesisを利用してCloudWatchからJSONデータをS3に格納しています。AutoLoaderで監査ログを処理する前は、日次でバッチ処理を行っていたので以下のようなアーキテクチャとなっていました。
最近になって監査ログデータのリアルタイム性が求められてきました。今までのアーキテクチャだとデータ抽出まではリアルタイムになっていたのですが、Databricksに読み込む際にバッチ処理をしていたため、利用者にとってはリアルタイム性に欠けていました。そのためアーキテクチャの見直しを行い、日次バッチからAutoLoaderで処理を行うよう変更しました。
Databricksでは、増分データの取り込みにはDelta Live Tables(DLT)でAutoLoaderを使用することが推奨されていますが、Unity Catalog/DLTの統合は本記事執筆時点の2023年1月31日時点ではサポートされていないため、現時点ではDelta Live Tablesを使用していません。
AutoLoaderでJSONファイルの読み込み
以下を実行することでS3に到着するJSONが順次処理されるストリームが起動します。
df = spark.readStream.format("cloudFiles") \ .option("cloudFiles.format", "json") \ .option("cloudFiles.schemaLocation", "checkpointに使用するpath") \ .option("cloudFiles.schemaHints","type string, target string, action string, year string, month string, day string, hour string) \ .option("cloudFiles.partitionColumns", "year,month,day,hour") \ .option("cloudFiles.useIncrementalListing", True) \ .option("lineSep", "\n") \ .load("取り込みデータのpath")
.format("cloudFiles")
と指定することでAutoLoaderを利用しています。
.option("cloudFiles.schemaLocation")
にチェックポイントディレクトリを指定することで、AutoLoaderがどこまでファイルを処理したか記録しています。
予めスキーマ情報がわかっていたので.option("cloudFiles.schemaHints"
にカラム名と型を指定することでスキーマ情報を強制させています。
Kinesisで順次パーティション区切りでストリーミングデータが送られてくるので、.option("cloudFiles.partitionColumns")
に年/月/日/時を指定しています。
AutoLoaderの動作原理で触れましたがディレクトリ一覧のモードを使用しており、日付順でS3にファイルがアップロードされるため、.option("cloudFiles.useIncrementalListing", True)
を指定して、ディレクトリ一を完全に一覧するのではなく、インクリメンタルな一覧を適用することにより新規ファイルを検知するのに必要なAPI呼び出し数を削減しています。
.option("lineSep)
には二つの連続するJSONレコードの区切り文字として\nを指定しています。
Databricks Unity Catalogに書き込み
以下を実行することで読み込んだストリームをDatabricksのUnity Catalogに書き込みます。
df = spark.writeStream.format("delta") \ .option("checkpointLocation", "checkpointに使用するpath") \ .outputMode("append") \ .option("mergeSchema", "true") \ .partitionBy("year", "month", "day", "hour") \ .trigger(processingTime="1 minutes") \ .toTable("unity catalogに保存するテーブル名")
.option("mergeSchema", "true")
で新規カラムが追加された場合に自動でDeltaに取り込むようオプション指定しています。
.trigger(processingTime="1 minutes")
に1
minutesを指定することで、1分間隔でJSONを順次処理しています。
JSONファイル処理部分で躓いた
ストリーミングでデータを処理すると、スキーマの予期しない変更や取り込み対象ファイルのデータが不正といった様々な要因でエラーに遭遇するかと思います。今回AutoLoaderでJSONファイルを処理する際に以下のエラーに遭遇しました。
org.apache.spark.sql.catalyst.util.UnknownFieldException: Encountered unknown field(s) during parsing: {}
上記のエラー原因は、データスキーマ変更が要因でない場合、データの中に空のJSONが存在すること起因の不具合によるものです。対処方法としては、以下2パターンのどちらかになるかと思います。
- 取り込み対象データの空のJSONを削除する
- エラー後、AutoLoaderを再実行
今回はAutoLoaderを再実行することで問題なくデータを取り込むことができて、データ利用者に最新の情報を提供することができました。
まとめ
以上、DatabricksのAutoLoaderを紹介させて頂きました。簡単にセットアップができて最新の情報をいち早くデータ利用者に届けられる仕組みは大変素敵なものだと実感しました。
本記事ではDelta Live Tablesについて触れませんでしたが、Unity Catalog/Delta Live Tablesの統合が正式版となった暁には社内でDelta Live Tables(DLT)でAutoLoaderを使用する動きが加速するかと思います。
一緒に手を動かしてデータ基盤構築しませんか。少しでも興味を持ってくださった方がいらっしゃれば以下からご応募お待ちしております。