KAKEHASHI Tech Blog

カケハシのEngineer Teamによるブログです。

Databricksでnotebookのセルの中身を検証する

この記事は秋の技術特集2024の1記事目です。

背景・目的

  • 我々のチームではDBをホスティングせず、S3上のDatabricksのdeltaテーブルを使って社内プラットフォームのシステムを提供しています
  • データの登録はDatabricksのnotebookを使って行うのですが、それをチーム外のメンバーに利用してもらうにあたり入力バリデーションを行う必要がありました
  • そこで、notebookのセルの中身を検証する方法を調査しました
graph LR
    user((チーム外の\n社内メンバー))
    gh[GitHub]
    memo>ここでデータ読み込みクエリの\nバリデーションをしたい]
    subgraph Databricks
        notebook[データ操作Notebook]
    end
    subgraph AWS環境
        subgraph S3
            deltaテーブル[チーム内のテーブル]
        end
    end

    user <-- データ操作 --> notebook
    gh -- 同期 --> notebook
    notebook <-- データ読み書き --> deltaテーブル
    notebook -.- memo

要件

  • ユーザーは、GitHubのレポジトリをDatabricksに同期したファイルを使ってデータ操作を行う
  • ユーザーは、notebookのセルに%sqlマジックコマンドを使ってSQLを記述するか、sparkを使ってデータフレームを操作するコードを記述する

    • 例: SQL

        %sql
        SELECT id, name FROM some_catalog.some_schema.some_table
        LEFT ANTI JOIN some_catalog.some_schema.test_patient_view USING(id)
      
    • 例: コード

        df = spark.sql("SELECT id, name FROM some_catalog.some_schema.some_table")
        df = df.join(test_patient_view, on="id", how="left_anti")
      
  • ユーザーが記述したSQLやコードで、指定したViewを使っているかどうかを検証したい

    • 例: テストデータのViewを使ってテストデータを除外できているか?を確認するなど

結論

databricks.sdkのWorkspaceClientのworkspace.exportを使ってnotebookのセルの中身を取得することで、その内容に対して検証処理を適用することができました

def get_notebook_contents() -> list[str]:
    """
    実行中のnotebookの中身を取得する

    Returns:
        list[str]: notebookの各セルの入力内容
    """
    from base64 import b64decode

    from databricks.sdk import WorkspaceClient
    from databricks.sdk.runtime import dbutils
    from databricks.sdk.service.workspace import ExportFormat

    w = WorkspaceClient()
    notebook_path = (
        dbutils.notebook.entry_point.getDbutils()
        .notebook()
        .getContext()
        .notebookPath()
        .get()
    )
    return (
        b64decode(
            w.workspace.export(notebook_path, format=ExportFormat("SOURCE")).content
        )
        .decode("utf-8")
        .split("\n")
    )

こちらのアプローチは、実行しているnotebookを別口で取得するというアプローチになります。 実行環境にファイル自体はあるのに、ネットワーク経由で取得する冗長に感じる手法ではあるのですが、このアプローチで実現することが良いと考えました。

以下、上手くいかなかったアプローチを記載します

他の選択肢

_ih変数を使う

Jupyter notebookの実行された入力内容は入力キャッシュの仕組みによって_ih変数に格納されているため、これを使ってセルの中身を取得することができます。 これを使ってセルの中身を取得できないか?と考えて試してみたのですが%sqlマジックコマンドを使ったセルの中身を取得できないという問題がありました。 これでは、ユーザーが記述したSQLを検証することができません。

調べてみたところ、クラスタの設定・Runtimeのバージョンによって_ihの中身についての挙動が変わっていました

ある設定では、下記のようにセルの中身が格納されていました

try:
  def ____databricks_percent_sql():
    import base64
    df = spark.sql(base64.standard_b64decode("XXXXX").decode())
    display(df)
    return df
  _sqldf = ____databricks_percent_sql()
finally:
  del ____databricks_percent_sql

また、別の設定では、下記のようにセルの中身が格納されていました

_sqldf = __import__('pyspark').sql.dataframe.DataFrame(dbutils.entry_point.getImplicitDataFrame(), sqlContext)

これでは、SQLの中身を文字列で取得することができません。(1番目の場合はbase64をデコードすれば中身を見ることはできました)

Runtimeやクラスタの設定によって挙動が変わるのであれば、バージョンアップなどで挙動が変わる可能性があるため、この方法は避けるべきだと考えこの案は却下しました。

ファイルを直接読む

Databricksではnotebookは実行環境にファイルとして保存されているため、これを直接読むことでセルの中身を取得することができるのでは?と考えました。

しかし、下記のコードを実行すると

with open('./notebook_name') as f:
    print(f.read())

OSError: [Errno 95] Operation not supported: './notebook_name'

というエラーが出てしまい、ファイルを直接読むことができませんでした。 Databricks側にファイルを取得することがブロックされている可能性が高く、この方法も避けるべきだと考えました。

原因を調査してみたところ、/Workspacefuse(Filesystem in Userspace)でマウントされていることがわかりました。

これはファイルシステムの挙動をアプリケーションに委譲する仕組みでファイルオープンのリクエストに対してアプリケーション側で挙動を制御することができます。そのため、Databricksのアプリケーションがファイルの読み込みをブロックしている可能性が高いと考えました。 notebook上で生成したファイルに対しては問題なくアクセスできたのですが、DatabricksのnotebookではGitHubレポジトリから同期されているファイルは読めないようになっているのかもしれません。

IPythonの拡張機能を使う

IPythonの拡張機能を使ってセルの中身を取得することができるのでは?と考えました。 下記のように、IPythonのイベントフックを使ってセルの実行前後のイベントを取得することができます。

from IPython import get_ipython

ip = get_ipython()

# セル実行前のイベントフックを設定
def pre_run_cell(info):
    print(f"About to run cell: {info.raw_cell}")

# セル実行後のイベントフックを設定
def post_run_cell(result):
    print(f"Finished running cell, result: {result.result}")

# イベントフックをIPythonに登録
ip.events.register('pre_run_cell', pre_run_cell)
ip.events.register('post_run_cell', post_run_cell)

しかし、こちらのアプローチでも%sqlマジックコマンドを使ったセルの中身を取得することができませんでした。

まとめ

  • Databricks上のセルの内容を取得する方法としては、WorkspaceClientのworkspace.exportを使う方法が最適だと考えました
  • 他の方法も試してみましたが、%sqlマジックコマンドを使ったセルの中身を取得することができなかったため、Databricksのセキュリティ設定によってファイルの読み込みがブロックされている可能性が高いと考えました
  • 同様の課題を感じている方の解決策になれば幸いです