この記事は秋の技術特集2024の1記事目です。
背景・目的
- 我々のチームではDBをホスティングせず、S3上のDatabricksのdeltaテーブルを使って社内プラットフォームのシステムを提供しています
- データの登録はDatabricksのnotebookを使って行うのですが、それをチーム外のメンバーに利用してもらうにあたり入力バリデーションを行う必要がありました
- そこで、notebookのセルの中身を検証する方法を調査しました
graph LR user((チーム外の\n社内メンバー)) gh[GitHub] memo>ここでデータ読み込みクエリの\nバリデーションをしたい] subgraph Databricks notebook[データ操作Notebook] end subgraph AWS環境 subgraph S3 deltaテーブル[チーム内のテーブル] end end user <-- データ操作 --> notebook gh -- 同期 --> notebook notebook <-- データ読み書き --> deltaテーブル notebook -.- memo
要件
- ユーザーは、GitHubのレポジトリをDatabricksに同期したファイルを使ってデータ操作を行う
ユーザーは、notebookのセルに
%sql
マジックコマンドを使ってSQLを記述するか、spark
を使ってデータフレームを操作するコードを記述する例: SQL
%sql SELECT id, name FROM some_catalog.some_schema.some_table LEFT ANTI JOIN some_catalog.some_schema.test_patient_view USING(id)
例: コード
df = spark.sql("SELECT id, name FROM some_catalog.some_schema.some_table") df = df.join(test_patient_view, on="id", how="left_anti")
ユーザーが記述したSQLやコードで、指定したViewを使っているかどうかを検証したい
- 例: テストデータのViewを使ってテストデータを除外できているか?を確認するなど
結論
databricks.sdkのWorkspaceClientのworkspace.export
を使ってnotebookのセルの中身を取得することで、その内容に対して検証処理を適用することができました
def get_notebook_contents() -> list[str]: """ 実行中のnotebookの中身を取得する Returns: list[str]: notebookの各セルの入力内容 """ from base64 import b64decode from databricks.sdk import WorkspaceClient from databricks.sdk.runtime import dbutils from databricks.sdk.service.workspace import ExportFormat w = WorkspaceClient() notebook_path = ( dbutils.notebook.entry_point.getDbutils() .notebook() .getContext() .notebookPath() .get() ) return ( b64decode( w.workspace.export(notebook_path, format=ExportFormat("SOURCE")).content ) .decode("utf-8") .split("\n") )
こちらのアプローチは、実行しているnotebookを別口で取得するというアプローチになります。 実行環境にファイル自体はあるのに、ネットワーク経由で取得する冗長に感じる手法ではあるのですが、このアプローチで実現することが良いと考えました。
以下、上手くいかなかったアプローチを記載します
他の選択肢
_ih
変数を使う
Jupyter notebookの実行された入力内容は入力キャッシュの仕組みによって_ih
変数に格納されているため、これを使ってセルの中身を取得することができます。
これを使ってセルの中身を取得できないか?と考えて試してみたのですが%sql
マジックコマンドを使ったセルの中身を取得できないという問題がありました。
これでは、ユーザーが記述したSQLを検証することができません。
調べてみたところ、クラスタの設定・Runtimeのバージョンによって_ih
の中身についての挙動が変わっていました
ある設定では、下記のようにセルの中身が格納されていました
try: def ____databricks_percent_sql(): import base64 df = spark.sql(base64.standard_b64decode("XXXXX").decode()) display(df) return df _sqldf = ____databricks_percent_sql() finally: del ____databricks_percent_sql
また、別の設定では、下記のようにセルの中身が格納されていました
_sqldf = __import__('pyspark').sql.dataframe.DataFrame(dbutils.entry_point.getImplicitDataFrame(), sqlContext)
これでは、SQLの中身を文字列で取得することができません。(1番目の場合はbase64をデコードすれば中身を見ることはできました)
Runtimeやクラスタの設定によって挙動が変わるのであれば、バージョンアップなどで挙動が変わる可能性があるため、この方法は避けるべきだと考えこの案は却下しました。
ファイルを直接読む
Databricksではnotebookは実行環境にファイルとして保存されているため、これを直接読むことでセルの中身を取得することができるのでは?と考えました。
しかし、下記のコードを実行すると
with open('./notebook_name') as f: print(f.read())
OSError: [Errno 95] Operation not supported: './notebook_name'
というエラーが出てしまい、ファイルを直接読むことができませんでした。 Databricks側にファイルを取得することがブロックされている可能性が高く、この方法も避けるべきだと考えました。
原因を調査してみたところ、/Workspace
がfuse(Filesystem in Userspace)でマウントされていることがわかりました。
これはファイルシステムの挙動をアプリケーションに委譲する仕組みでファイルオープンのリクエストに対してアプリケーション側で挙動を制御することができます。そのため、Databricksのアプリケーションがファイルの読み込みをブロックしている可能性が高いと考えました。 notebook上で生成したファイルに対しては問題なくアクセスできたのですが、DatabricksのnotebookではGitHubレポジトリから同期されているファイルは読めないようになっているのかもしれません。
IPythonの拡張機能を使う
IPythonの拡張機能を使ってセルの中身を取得することができるのでは?と考えました。 下記のように、IPythonのイベントフックを使ってセルの実行前後のイベントを取得することができます。
from IPython import get_ipython ip = get_ipython() # セル実行前のイベントフックを設定 def pre_run_cell(info): print(f"About to run cell: {info.raw_cell}") # セル実行後のイベントフックを設定 def post_run_cell(result): print(f"Finished running cell, result: {result.result}") # イベントフックをIPythonに登録 ip.events.register('pre_run_cell', pre_run_cell) ip.events.register('post_run_cell', post_run_cell)
しかし、こちらのアプローチでも%sql
マジックコマンドを使ったセルの中身を取得することができませんでした。
まとめ
- Databricks上のセルの内容を取得する方法としては、
WorkspaceClientのworkspace.export
を使う方法が最適だと考えました - 他の方法も試してみましたが、
%sql
マジックコマンドを使ったセルの中身を取得することができなかったため、Databricksのセキュリティ設定によってファイルの読み込みがブロックされている可能性が高いと考えました - 同様の課題を感じている方の解決策になれば幸いです