KAKEHASHI Tech Blog

カケハシのEngineer Teamによるブログです。

データプロダクトのテストにdbtを検討してみた

こちらの記事はカケハシ Advent Calendar 2024の15日目の記事になります。

こんにちは、株式会社カケハシのソフトウェアエンジニアの坂本です。
現在データプロダクトの開発に関わっており、データの品質チェックに課題を感じています。
この課題に対して、dbtというツールを活用できないか検討してみたのでその検討内容を紹介させていただきたいと思います。

課題背景

現在携わっているデータプロダクトでは、データ基盤に存在しているテーブルのデータを抽出、変換してCSVとして出力し外部提供する処理が存在します。
なお、このプロダクトで参照しているテーブルは各プロダクトで保存しているものをデータ基盤(Databricks)に連携しているものになります。
その中で、CSVを出力する直前に品質の問題がないことをチェックしています。

しかし、この品質チェックでは課題点があります。
品質チェックに失敗し出力データに何らかの問題があると判明した場合、

  • 取得元のテーブルに異常があるのか
  • データ作成ロジックに異常があるのか

といった原因の切り分けをするために時間を割く必要があります。

また、カケハシでも採用しているメダリオンアーキテクチャのようにデータを複数レイヤーで作成するようになった場合にもより原因調査が難しくなると考えられます。

このような課題感の中、テーブル自体の品質を簡単にチェックできるツールがないかと考えました。
そんな中、カケハシのデータ基盤として採用しているDatabricks内のDatabricks Jobでdbtを使えることを知ったので、今回品質チェックに活用できるか検討することにしました。(参考)

dbt とは

dbt is a transformation workflow that helps you get more work done while producing higher quality results.

dbtはETL/ELTワークフローのTransform(変換)処理をサポートするツールで、OLAPでは見落としがちな品質面に配慮しながら変換処理を実施できます。
なお、dbtにはたくさんの機能があるのですが本記事ではデータの品質に関わるテスト処理(dbt testコマンド)に話題を絞ります。

dbt test とは

dbt test コマンドを実行することでテーブルの品質チェックができるのですが、テストケースの作成方法にいくつか種類があります。

  • Generic data tests
  • Singular data tests
  • Custom Generic data tests

Generic data tests

Generic data testsではテーブルの基本的な制約をチェックするようなテストが用意されています。
ユニーク制約や外部キー制約などのデータ品質をチェックすることが可能で、各アプリケーションのRDBからデータレイクに連携されてきたようなデータについてアプリケーション側と同様の制約が守れているか、を確認するのにとても有用だと思っています。
というのも、Databricksのような分析用テーブルにはRDBのようなユニーク制約や外部キー制約を強制する機能を持っていないことが多く、データ品質を別途確認する必要があります。
(厳密にはDatabricksではNot Null制約は強制できるのですが、主キーや外部キーの制約は強制できないみたいです。(参考))

【実装例】

...
sources:
  - name: orders # 対象テーブル名
    columns:
      - name: id
        tests:
          - unique # ユニーク制約のテスト
          - not_null # null が存在しないことのテスト
      - name: user_id
        tests:
          - not_null
          - relationships: # 外部キー制約のテスト
              to: ref('users')
              field: id
      - name: category
        tests:
          - accepted_values: # 指定した値のみ存在することのテスト
              values: ["A", "B"]
...

Singular data tests

Singular data testsではSELECT文を実行し、その条件に当てはまるデータが1件でも存在した場合にテストを失敗とするテストです。

【実装例】

今回は対象のカラムの要素の文字数が5文字以下の場合にエラーとするテストケースを書いてみます。

select
  *
from users
where 
  length(name) <= 5

Custom Generic data tests

Custom Generic data testsはSingular data testsで実行するようなテストをより汎用的にするイメージです。 たとえば、Singular data testsの実装例で最低文字数を指定できるようにしてみます。

【実装例】

  • テスト関数
{% test min_length(model, column_name, length) %}

select
  *
from {{ model }}
where 
  length({{ column_name }}) <= {{ length }}

{% endtest %}
  • 呼び出し側のスキーマ
...
- name: name
  data_tests:
    - min_length:
        length: 5
...

Databricksで実装してみた

今回はメダリオンアーキテクチャに基づいて、以下のような流れでデータを作成することを想定しそれぞれのデータテストを実行してみます。 サンプルで用意したデータは注文情報とそれに紐づくユーザーマスタと商品マスタとしています。
このデータを結合し集計した情報が要求されている、というシナリオを想定します。

ワークフローの作成

Databricks Jobではこちらの手順に沿って簡単にdbtのTaskを作成できます。
作成したワークフローは以下です。

それぞれのTaskでは以下コマンドを実行しています。

  • test_bronze_table
    • dbt test
      • 取得元のテーブルの品質チェック
  • create_and_test(silver|gold)table
    • dbt run
      • テーブルの作成
      • dbt runを実行することでテーブルを作成することができるのですが、本記事では省略します
    • dbt test
      • 作成したテーブルの品質チェック

実装例

全体のコード量が多くなってしまったため、test_bronze_table Taskで参照している設定のみ記述しています。

version: 2

sources:
  - name: sample_catalog
    schema: sample_schema
    tables:
      - name: bronze_users
        description: ユーザー
        columns:
          - name: id
            data_tests:
              - unique
              - not_null
          - name: name
            description: ユーザー名
            data_tests:
              - not_null
              - length: # Custom Generic data tests
                  min_length: 4
                  max_length: 6
          - name: area_id
            description: エリアID
            data_tests:
              - not_null
          - name: is_active
            description: 契約状況(0:解約,1:契約)
            data_tests:
              - not_null
              - accepted_values:
                  values: [0, 1]
      - name: bronze_products
        description: 商品
        columns:
          - name: id
            data_tests:
              - unique
              - not_null
          - name: price
            description: 価格
            data_tests:
              - not_null
          - name: category_id
            description: カテゴリID
            data_tests:
              - not_null
      - name: bronze_orders
        description: 注文
        columns:
          - name: id
            data_tests:
              - unique
              - not_null
          - name: user_id
            description: ユーザーID
            data_tests:
              - not_null
              - relationships:
                  to: ref('bronze_users')
                  field: id
          - name: product_id
            description: 商品ID
            data_tests:
              - not_null
              - relationships:
                  to: ref('bronze_products')
                  field: id
          - name: quantity
            description: 注文数
            data_tests:
              - not_null

実行結果

dbt test を実行すると、成功時は以下のようなログが出力されます。

+ dbt test -models 'bronze.*'
11:31:16  Running with dbt=1.8.7
...
11:31:31  1 of 19 START test source_accepted_values_xxx_bronze_users_is_active__0__1  [RUN]
11:31:39  1 of 19 PASS source_accepted_values_xxx_bronze_users_is_active__0__1 ..... [PASS in 8.12s]
...
11:31:46  Finished running 19 data tests in 0 hours 0 minutes and 24.97 seconds (24.97s).
11:31:46  Completed successfully
11:31:46  Done. PASS=19 WARN=0 ERROR=0 SKIP=0 TOTAL=19

テストデータを不正な値でパッチして再度テストを実行してみます。
すると、テストは異常終了し以下のようなログが出力されます。
また、 --store-failures オプションを付与して実行することでエラーとなったレコードを別テーブルに保存しておくことができ、エラー調査がしやすくなります。

+ dbt test -models 'bronze.*' --store-failures
11:39:47  Running with dbt=1.8.7
...
11:39:52  1 of 19 START test source_accepted_values_workspace_bronze_users_is_active__0__1  [RUN]
11:39:57  1 of 19 FAIL 1 source_accepted_values_workspace_bronze_users_is_active__0__1 ... [FAIL 1 in 4.44s]
...
11:40:01  Finished running 19 data tests in 0 hours 0 minutes and 9.51 seconds (9.51s).
11:40:01  
11:40:01  Completed with 1 error and 0 warnings:
11:40:01  
11:40:01  Failure in test source_accepted_values_workspace_bronze_users_is_active__0__1 (models/bronze/schema.yml)
...
11:40:01    See test failures:
  ------------------------------------------------------------------------------------------------------
  select * from `xxx`.`xxx`.`source_accepted_values_workspace_bronze_users_is_active__0__1`
  ------------------------------------------------------------------------------------------------------
11:40:01  
11:40:01  Done. PASS=18 WARN=0 ERROR=1 SKIP=0 TOTAL=19

使ってみた感想、まとめ

dbtでデータの品質チェックを検討してみました。
個人的にはテストを実行するという機能のみでもdbtをプロダクトに導入する価値があるように思いました。
以下はdbtを導入しやすい、導入する価値があるなと思ったポイントです。

  • テスト項目の柔軟性が高い
    • Custom Generic data testsを活用することでテストの作成方法の汎用性や柔軟性が高いものにでき、後続のタスクを実行する前にデータの異常を検知することができる
  • 原因調査をしやすい
    • テストでエラーとなったレコードをテーブルに保存しておく機能があるため、原因調査が容易になる
  • チーム開発しやすい
    • テーブルの制約事項をyamlで宣言的に管理でき理解容易性が高い
  • 環境構築のしやすさ
    • Databricksで導入する場合、環境構築コストが軽い
    • Databricks Jobではdbtコマンドなど、最低限の設定をするだけでジョブを作成できるため、簡潔

また、以下のような課題や未調査な部分がありますが、実際に導入する上で検討していきたいと思っているポイントです。

  • 課題
    • dbtの機能が豊富であるため、一定の学習コストが発生する
  • 未調査
    • Databricks Jobを活用する場合、ローカル環境はどのように作成するか
    • dbt test自体のパフォーマンスは問題なさそうか

スケジュールの都合でまだプロダクトには導入できていないのですが、実際に導入した時の苦労などもブログにできればと思います。
データ品質の課題に困っている、またはdbtを検討しているがなかなか触る機会がない方の一助になれば幸いです。

参考情報