データ分析基盤グループでデータエンジニアをしている平川です。
DataVaultに関する記事の第2回目となります。(第1回の記事はこちらです)
第2回の記事は、DataVaultモデリングの中心となるHub/Link/Satelliteをdbtのパッケージを利用して作っていくという内容です。
2,3回目の内容が当初と少し変わっていますので、再掲いたします。
第1回: DataVaultってなに?どんな特徴があるの?
第2回: automate_dvを使ってDataVaultモデリングの中心となるテーブルを作ってみてわかったこと ← 今回はここ
第3回: BusinessVault、発展的なSatelliteテーブルやキーがNullだった場合の対処方法についてなど
- 前回のおさらい
- はじめに
- automate_dvについて
- automate_dvの使い方
- automate_dvを使用していてハマったポイント
- RawVault層までの実装にautomate_dvを使った感想など
- まとめと次回予告
- 参考資料
前回のおさらい
前回の記事では、データウェアハウス設計における1つのアプローチであるDataVaultの特徴やメリットについて説明しました。DataVaultは、柔軟性や拡張性に優れ、大量のデータを効果的に管理することができます。
また、DataVaultを実装するためには、Hub/Link/Satelliteという構造を持ったテーブルを生成する必要があります。これらのテーブルの生成には手間がかかることがありますが、automate_dvというパッケージを使うことでテーブル生成を簡易にすることができます。
今回の記事では、このテーブルの生成をサポートするautomate_dvの紹介と実際のテーブル生成までの手順を解説します。
はじめに
第2弾の記事では、データウェアハウスのテーブル構築方法について、automate_dv1を利用した手法について紹介します。
まずは、前回の内容を振り返りつつ、データウェアハウスの設計に関する用語を簡単に説明します。
用語 | 意味 |
---|---|
ELT | データウェアハウスにおけるデータの取り込み方法の1つで、データを抽出してから変換し、最後にロードする手法です |
dbt | データウェアハウスの構築や管理をするためのオープンソースのツールです。SQL(一部Jinja)でデータパイプラインを記述できます。 |
DataVault | データウェアハウスの設計パターンの1つで、拡張性や柔軟性の高さなどが特徴です。 |
Hub | DataVaultで構築する上での中心となるテーブルで、ビジネスキーとそのハッシュキーで構成されます。 |
Link | DataVaultでのHub同士の関係性を表すテーブルで、関連するHubのハッシュキーと関連する2つのHubのビジネスキーをconcatしてハッシュ化したキーで構成されます。 |
Satellite | HubやLinkのキーに対して付随する属性情報を保持するテーブルです。属性情報をまとめてハッシュ化したHashdiffカラムによって、変化を検知することができます。 |
次のセクションでは、automate_dvを使用してDataVaultのテーブル構築を行う方法について、手順を解説します。
automate_dvについて
automate_dvとは?
dbtのパッケージの1つで、DataVault2.0モデルに基づいたデータウェアハウスの構築をサポートしてくれます。このパッケージのマクロを利用することで、DataVaultモデリングに関するテーブルのSQL実装を簡単に行うことができます。
便利な点は何?
automate_dvを利用することで、Hub/Link/Satelliteテーブルやハッシュ化したキーを含むテーブルの作成を容易にすることができます。 これにより、SQLの記述量を大幅に削減し、テーブルの実装にかかる時間を短縮することができます。
注意点
automate_dvは便利なツールですが、利用しているプラットフォームによっては使用できないマクロがあるため注意が必要です。
例えば、一部のマクロはRedshiftやPostgreSQLでは利用できません。一方で、Snowflake/BigQuery/MS SQL Serverでは、現在一般提供されているマクロを使用することができます。
プラットフォームによって利用できる機能に違いがあるため、使用する前にドキュメントを参照することをおすすめします。
automate_dvの使い方
インストール方法
automate_dvを使うにはdbtを実行する環境にインストールする必要があります。dbtのプロジェクト直下にある。package.yml
に以下のようにautomate_dvパッケージの依存関係を追加し、dpt deps
を実行してください。
packages: - package: Datavault-UK/automate_dv version: 0.9.5
Hub/Link/Satelliteの実装
dbt run
or dbt build
を実行することで作成したSQLから各種ビューとテーブルを作成してくれます。
Hub/Link/Satelliteを作成するための各SQLの書き方を以下で説明していきます。
ハッシュキーの生成
Hub/Link/Satelliteを実装するにはそれぞれハッシュ化されたキーとhashdiffが必要になります。
そのため、実装する3テーブルが参照するビューも必要になるため、まずはハッシュキーを含むビューをautomate_dvを使って実装していきます。
実装例として、orders
, customers
, products
, customers_orders
, orders_products
という5つのソーステーブルからハッシュキーを含むビューを作成していきます。
hashed_ordersのSQLは下記のようになります。(hashed_orders以外のSQLについては折り畳んでありますので詳細を知りたい方は展開していただければと思います。)
{%- set yaml_metadata -%} -- ① source_model: -- ② orders derived_columns: RECORD_SOURCE: "!ORDER_MANAGEMENT" hashed_columns: ORDER_HASH_DIFF: is_hashdiff: true columns: - ORDER_NAME - ORDER_AMOUNT - CREATED_AT ORDER_HK: - ORDER_ID {%- endset -%} {% set metadata_dict = fromyaml(yaml_metadata) %} {{ automate_dv.stage( include_source_columns=true, source_model=metadata_dict["source_model"], derived_columns=metadata_dict["derived_columns"], hashed_columns=metadata_dict["hashed_columns"], null_columns=none, ranked_columns=none, ) }} -- ③
① : jinjaの記述方法でyaml形式で各種パラメータを設定する
② : ハッシュキーの設定や参照するモデルの設定などをする
③ : automate_dvのstageマクロを使い各種パラメータを引数に設定する
その他のモデルのクエリ
hashed_products
{%- set yaml_metadata -%}
source_model:
products
derived_columns:
RECORD_SOURCE: "!ORDER_MANAGEMENT"
hashed_columns:
HASH_DIFF:
is_hashdiff: true
columns:
- PRODUCT_NAME
- PRICE
- CREATED_AT
PRODUCT_HK:
- PRODUCT_ID
{%- endset -%}
{% set metadata_dict = fromyaml(yaml_metadata) %}
{{ automate_dv.stage(
include_source_columns=true,
source_model=metadata_dict["source_model"],
derived_columns=metadata_dict["derived_columns"],
hashed_columns=metadata_dict["hashed_columns"],
null_columns=none,
ranked_columns=none,
)
}}
hashed_customers
{%- set yaml_metadata -%}
source_model:
customers
derived_columns:
RECORD_SOURCE: "!ORDER_MANAGEMENT"
hashed_columns:
HASH_DIFF:
is_hashdiff: true
columns:
- EMAIL
- PREFECTURE
- CREATED_AT
CUSTOMER_HK:
- CUSTOMER_ID
{%- endset -%}
{% set metadata_dict = fromyaml(yaml_metadata) %}
{{ automate_dv.stage(
include_source_columns=true,
source_model=metadata_dict["source_model"],
derived_columns=metadata_dict["derived_columns"],
hashed_columns=metadata_dict["hashed_columns"],
null_columns=none,
ranked_columns=none,
)
}}
hashed_customers_orders
{%- set yaml_metadata -%}
source_model:
customers_orders
derived_columns:
RECORD_SOURCE: "!ORDER_MANAGEMENT"
hashed_columns:
HASH_DIFF:
is_hashdiff: true
columns:
- CREATED_AT
CUSTOMER_HK:
- CUSTOMER_ID
ORDER_HK:
- ORDER_ID
CUSTOMER_ORDER_HK:
- CUSTOMER_ID
- ORDER_ID
{%- endset -%}
{% set metadata_dict = fromyaml(yaml_metadata) %}
{{ automate_dv.stage(
include_source_columns=true,
source_model=metadata_dict["source_model"],
derived_columns=metadata_dict["derived_columns"],
hashed_columns=metadata_dict["hashed_columns"],
null_columns=none,
ranked_columns=none,
)
}}
hashed_orders_products
{%- set yaml_metadata -%}
source_model:
orders_products
derived_columns:
RECORD_SOURCE: "!ORDER_MANAGEMENT"
hashed_columns:
HASH_DIFF:
is_hashdiff: true
columns:
- CREATED_AT
ORDER_HK:
- ORDER_ID
PRODUCT_HK:
- PRODUCT_ID
ORDER_PRODUCT_HK:
- ORDER_ID
- PRODUCT_ID
{%- endset -%}
{% set metadata_dict = fromyaml(yaml_metadata) %}
{{ automate_dv.stage(
include_source_columns=true,
source_model=metadata_dict["source_model"],
derived_columns=metadata_dict["derived_columns"],
hashed_columns=metadata_dict["hashed_columns"],
null_columns=none,
ranked_columns=none,
)
}}
続いて、今作成したハッシュキー含むモデルから、Hub/Link/Satelliteを作成していきます。
Hubの生成
ordersのHubテーブルの作成は以下のSQLのようになります。(customersとproductsのHubテーブルは折りたたみ内にSQLを記載しています)
{{ config(materialized="incremental") }} {%- set yaml_metadata -%} source_model: hashed_orders src_pk: ORDER_HK src_nk: ORDER_ID src_ldts: LOADED_AT src_source: RECORD_SOURCE {%- endset -%} {% set metadata_dict = fromyaml(yaml_metadata) %} {{ automate_dv.hub( src_pk=metadata_dict["src_pk"], src_nk=metadata_dict["src_nk"], src_ldts=metadata_dict["src_ldts"], src_source=metadata_dict["src_source"], source_model=metadata_dict["source_model"], ) }}
その他のモデルのクエリ
hub_products
{{ config(materialized="incremental") }}
{%- set yaml_metadata -%}
source_model:
hashed_products
src_pk: PRODUCT_HK
src_nk: PRODUCT_ID
src_ldts: LOADED_AT
src_source: RECORD_SOURCE
{%- endset -%}
{% set metadata_dict = fromyaml(yaml_metadata) %}
{{
automate_dv.hub(
src_pk=metadata_dict["src_pk"],
src_nk=metadata_dict["src_nk"],
src_ldts=metadata_dict["src_ldts"],
src_source=metadata_dict["src_source"],
source_model=metadata_dict["source_model"],
)
}}
hub_customers
{{ config(materialized="incremental") }}
{%- set yaml_metadata -%}
source_model:
hashed_customers
src_pk: CUSTOMER_HK
src_nk: CUSTOMER_ID
src_ldts: LOADED_AT
src_source: RECORD_SOURCE
{%- endset -%}
{% set metadata_dict = fromyaml(yaml_metadata) %}
{{
automate_dv.hub(
src_pk=metadata_dict["src_pk"],
src_nk=metadata_dict["src_nk"],
src_ldts=metadata_dict["src_ldts"],
src_source=metadata_dict["src_source"],
source_model=metadata_dict["source_model"],
)
}}
Linkの生成
orderとproductの関係性を記述するLinkテーブルの作成は以下のSQLのようになります。(link_customers_ordersは折りたたみ内にSQLを記載しています。)
{{ config(materialized="incremental") }} {%- set yaml_metadata -%} source_model: hashed_orders_products src_pk: ORDER_PRODUCT_HK src_fk: - ORDER_HK - PRODUCT_HK src_ldts: LOADED_AT src_source: RECORD_SOURCE {%- endset -%} {% set metadata_dict = fromyaml(yaml_metadata) %} {{ automate_dv.link( src_pk=metadata_dict["src_pk"], src_fk=metadata_dict["src_fk"], src_ldts=metadata_dict["src_ldts"], src_source=metadata_dict["src_source"], source_model=metadata_dict["source_model"], ) }}
その他のモデルのクエリ
link_customers_orders
{{ config(materialized="incremental") }}
{%- set yaml_metadata -%}
source_model:
hashed_customers_orders
src_pk: CUSTOMER_ORDER_HK
src_fk:
- CUSTOMER_HK
- ORDER_HK
src_ldts: LOADED_AT
src_source: RECORD_SOURCE
{%- endset -%}
{% set metadata_dict = fromyaml(yaml_metadata) %}
{{
automate_dv.link(
src_pk=metadata_dict["src_pk"],
src_fk=metadata_dict["src_fk"],
src_ldts=metadata_dict["src_ldts"],
src_source=metadata_dict["src_source"],
source_model=metadata_dict["source_model"],
)
}}
Satelliteの生成
orderの属性情報を記述するSatelliteテーブルの作成は以下のSQLのようになります。(sat_customersとsat_productsは折りたたみ内にSQLを記載しています。)
{{ config(materialized='incremental') }} {%- set yaml_metadata -%} source_model: hashed_orders src_pk: ORDER_HK src_hashdiff: HASH_DIFF src_payload: - ORDER_NAME - ORDER_AMOUNT - CREATED_AT src_ldts: LOADED_AT src_source: RECORD_SOURCE {%- endset -%} {% set metadata_dict = fromyaml(yaml_metadata) %} {{ automate_dv.sat( src_pk=metadata_dict["src_pk"], src_hashdiff=metadata_dict["src_hashdiff"], src_payload=metadata_dict["src_payload"], src_ldts=metadata_dict["src_ldts"], src_source=metadata_dict["src_source"], source_model=metadata_dict["source_model"], ) }}
その他のモデルのクエリ
sat_customers
{{ config(materialized='incremental') }}
{%- set yaml_metadata -%}
source_model: hashed_customers
src_pk: CUSTOMER_HK
src_hashdiff:
HASH_DIFF
src_payload:
- EMAIL
- PREFECTURE
- CREATED_AT
src_ldts: LOADED_AT
src_source: RECORD_SOURCE
{%- endset -%}
{% set metadata_dict = fromyaml(yaml_metadata) %}
{{
automate_dv.sat(
src_pk=metadata_dict["src_pk"],
src_hashdiff=metadata_dict["src_hashdiff"],
src_payload=metadata_dict["src_payload"],
src_ldts=metadata_dict["src_ldts"],
src_source=metadata_dict["src_source"],
source_model=metadata_dict["source_model"],
)
}}
sat_products
{{ config(materialized='incremental') }}
{%- set yaml_metadata -%}
source_model: hashed_products
src_pk: PRODUCT_HK
src_hashdiff:
HASH_DIFFF
src_payload:
- PRODUCT_NAME
- PRICE
- CREATED_AT
src_ldts: LOADED_AT
src_source: RECORD_SOURCE
{%- endset -%}
{% set metadata_dict = fromyaml(yaml_metadata) %}
{{
automate_dv.sat(
src_pk=metadata_dict["src_pk"],
src_hashdiff=metadata_dict["src_hashdiff"],
src_payload=metadata_dict["src_payload"],
src_ldts=metadata_dict["src_ldts"],
src_source=metadata_dict["src_source"],
source_model=metadata_dict["source_model"],
)
}}
automate_dvを使う際の小技
automate_dvのドキュメントでは、stageマクロを実行する際のsource_modelには最新分だけを含んだモデルを対象にするのがベストプラクティスとされています。
ですが、止むを得ず参照するモデルに複数日付が含まれていることがあるかもしれません。この場合、最新分だけが存在する層を作るということもできますが、stageマクロを使いつつ、最新分だけのビューを作成することも可能です。
具体的な手順としては、automate_dv.stage
部分をwith句に入れて一時テーブルとして定義し、日付を絞る方法です。以下のように記述することで実現できます
-- パラメータの設定は省略 with stage as ( {{ automate_dv.stage( include_source_columns=true, source_model=metadata_dict["source_model"], derived_columns=metadata_dict["derived_columns"], hashed_columns=metadata_dict["hashed_columns"], null_columns=none, ranked_columns=none, ) }} ) select * from stage where loaded_at = 'YYYY-MM-DD'
automate_dvを使用していてハマったポイント
automate_dvは、データウェアハウスの構築において高いパフォーマンスを発揮するツールですが、日本語の情報も少なくハマってしまった時に原因を理解するのに時間がかかると思います。ここでは、実際にDataVault層のSatelliteテーブルを構築する際にハマったポイントについて説明していきます。
履歴化されているデータソースを取り込む場合
Satelliteテーブルに取り込む対象が、前日からの増分のみであれば、automate_dvのSatelliteマクロを使用しても問題ありません。しかし、Satelliteが参照するモデルが過去から現在までの全期間のレコードを持っている場合、予期しない結果になる可能性があります。具体的なサンプルと動作を見ていきましょう。
取り込みたいモデルの構造
カラム名 |
---|
history_id |
customer_id |
amount |
created_at |
loaded_at |
対象のモデルのデータの変化
2023-04-02
history_id | customer_id | amount | created_at | loaded_at |
---|---|---|---|---|
1 | 1 | 1000 | 2023-04-01 12:00:00 | 2023-04-02 00:00:00 |
2 | 2 | 1500 | 2023-04-01 13:00:00 | 2023-04-02 00:00:00 |
2023-04-03
history_id | customer_id | amount | created_at | loaded_at |
---|---|---|---|---|
1 | 1 | 1000 | 2023-04-01 12:00:00 | 2023-04-03 00:00:00 |
2 | 2 | 1500 | 2023-04-01 13:00:00 | 2023-04-03 00:00:00 |
3 | 1 | 1000 | 2023-04-02 12:00:00 | 2023-04-03 00:00:00 |
4 | 1 | 3000 | 2023-04-02 12:30:00 | 2023-04-03 00:00:00 |
2023-04-04
history_id | customer_id | amount | created_at | loaded_at |
---|---|---|---|---|
1 | 1 | 1000 | 2023-04-01 12:00:00 | 2023-04-04 00:00:00 |
2 | 2 | 1500 | 2023-04-01 13:00:00 | 2023-04-04 00:00:00 |
3 | 1 | 1000 | 2023-04-02 12:00:00 | 2023-04-04 00:00:00 |
4 | 1 | 3000 | 2023-04-02 12:30:00 | 2023-04-04 00:00:00 |
5 | 3 | 5000 | 2023-04-03 15:00:00 | 2023-04-04 00:00:00 |
6 | 1 | 1000 | 2023-04-03 16:00:00 | 2023-04-04 00:00:00 |
automate_dvを使ってSatelliteテーブルを作った際の結果
上記のSatelliteテーブルの赤い枠で囲まれた1行目と5行目は同一のレコードになっています。全期間の履歴を保持するようなモデルに対してSatelliteマクロを使うとこのような現象が起きてしまいます。
このような状態になってしまう原因は、automate_dvのマクロを使用することで生成されるSQLを見ることでわかります。
以下生成されたSQLを抜粋
WITH source_data AS ( SELECT a.CUSTOMER_HK, a.HASH_DIFF, a.AMOUNT, a.CREATED_AT, a.LOADED_AT, a.RECORD_SOURCE FROM hs_order_histories AS a WHERE a.CUSTOMER_HK IS NOT NULL ), latest_records AS ( SELECT a.CUSTOMER_HK, a.HASH_DIFF, a.LOADED_AT FROM ( SELECT current_records.CUSTOMER_HK, current_records.HASH_DIFF, current_records.LOADED_AT, RANK() OVER ( PARTITION BY current_records.CUSTOMER_HK ORDER BY current_records.LOADED_AT DESC ) AS rank FROM sat_order_histories AS current_records JOIN ( SELECT DISTINCT source_data.CUSTOMER_HK FROM source_data ) AS source_records ON current_records.CUSTOMER_HK = source_records.CUSTOMER_HK ) AS a WHERE a.rank = 1 ), records_to_insert AS ( SELECT DISTINCT stage.CUSTOMER_HK, stage.HASH_DIFF, stage.AMOUNT, stage.CREATED_AT, stage.LOADED_AT, stage.RECORD_SOURCE FROM source_data AS stage LEFT JOIN latest_records ON latest_records.CUSTOMER_HK = stage.CUSTOMER_HK AND latest_records.HASH_DIFF = stage.HASH_DIFF WHERE latest_records.HASH_DIFF IS NULL ) SELECT * FROM records_to_insert
このSQLでは、ソースモデル(ハッシュキーを含むモデル)とSatelliteテーブルから条件を付与して抽出したテーブルを結合して、新たに追加するレコードを作成しています。
with句の2番目に定義されている最新レコード(latest_records)を見るとRankを使用して最新のレコードを抽出しています。
Satelliteテーブルに追加されるレコードは、ソースデータと最新レコードを結合し、ソースデータにだけ存在するレコードを抽出しています。
(where句でのlatest_records.HASH_DIFF IS NULL
でソースデータにだけ存在するレコードを取ろうとしています。)
今回例として取り上げたような、同じハッシュキーに対して、異なるHASH_DIFFが追加されるようなテーブルの場合、Satelliteマクロは予期した動作をしないことがあります。
対応策としては、最初に述べたように、Satelliteテーブルが参照するモデルには最新分しか含まないようにすることです。中間層に手を加えたくない場合は、SQLでSatelliteテーブルに存在しないレコードのみを抽出するようなSQLを書きます。
select distinct customer_hk, hash_diff, amount, created_at, loaded_at, record_source from hs_order_history as stg where customer_hk is not null {% if is_incremental() %} and not exists ( select 1 from ( select customer_hk, hash_diff from {{ this }} as cur where stg.customer_hk = cur.customer_hk and stg.hash_diff = cur.hash_diff ) ) {% endif %}
RawVault層までの実装にautomate_dvを使った感想など
- 1つのビジネスキーに対して、ログを溜め続けるようなデータソースがある場合、Satelliteマクロを使用する際に生成されるSQLを見ると、余計なレコードが生成されてしまうことがあります。そのため、データソースによってはマクロの使用の有無や参照モデルの修正など、使い分けや手を加える必要があり、少し面倒に感じました。
- マクロを使用することで、キーのハッシュ化など、SQLで記述する場合には多くのコード量が必要な部分をマクロで置き換えられるのはすごく便利に感じました。
- 予期しない動作が発生した場合、コンパイルされたSQLを見ることで原因を理解することはできますが、automate_dvの実装を見るとマクロ内でマクロを呼び出しているためautomate_dvでの仕組みを理解するのには時間がかかる印象でした。
- データソース側で物理削除されている場合、Satelliteではレコードが削除されたかどうかが判断できないので、StatusTrackingSatelliteが欲しくなる場面がありましたが、現在(2023年6月時点)はautomate_dvにマクロはまだ実装されていないのでSQLを直接書く必要がありました。(EffectiveSatelliteのマクロはあるのでもしかしたらそちらで対応できるかもしれません)
まとめと次回予告
今回の記事ではDataVaultモデリングの中心となるHub/Link/Satelliteを生成するためのdbtのパッケージの1つであるautomate_dvについて紹介させていただきました。
automate_dvはSQLだけで、Hub/Link/Satelliteの生成クエリを実装するのに比べて、非常に少ない記述量で各種テーブルの実装を行うことができます。
当たり前ですが、マクロによって生成されるSQLがどんなことをしているのかを理解するのが、使いこなす上で重要になってくると思います。
次回は、BusinessVaultや今回の記事で名前だけ出てきているEffectiveSatelliteやStatusTrackingSatelliteの紹介や、ビジネスキーがNullだった場合の対応方法などについて紹介させていただければと思います。
参考資料
- 記事を書き始めた時はdbtvaultでしたが、気づいたらautomate_dvに変わってました...↩