DBT(Kahan)

1.創(chuàng)建項目

1.1 創(chuàng)建多個項目

這次我們不使用命令行配置,直接更改profiles.yml

  1. profiles.yml新建一個環(huán)境dbt_kahan,項目里的yml和profiles.yml名字要對應
    image.png
  2. 填寫profiles.yml的配置信息(多項目)
    image.png

    3.測試
dbt debug

1.2 創(chuàng)建多個staging來區(qū)分不同的database

image.png
  1. 根據(jù)snowflake上面的數(shù)據(jù)庫名稱創(chuàng)建對應的staging


    image.png
  2. 創(chuàng)建需要的表的schema.yml
version: 2

sources:
  - name: snowflake_sample_data   #dbt里引用的名字
    database: SNOWFLAKE_SAMPLE_DATA    #實際數(shù)據(jù)庫的庫名
    schema: TPCDS_SF10TCL_OLD   #數(shù)據(jù)庫的schema名稱
    tables: 
      - name: STORE_SALES  #數(shù)據(jù)庫里的表名
  1. 創(chuàng)建一個dag名為ssr_store_sales.sql,用來將sources里的表,移動到demo_db的view里
WITH ssr_store_sales as(
    select * from {{ source('snowflake_sample_data', 'STORE_SALES') }}
)
select * from ssr_store_sales

2. 定制自己的schema

  • 默認情況下,我們在最開始配置profiles.yml已經(jīng)hard code了schema為public
    image.png

    但是,實際工作中,我們根據(jù)業(yè)務的處理用到很多的schema,所以需要我們根據(jù)業(yè)務的不同場景來切換staging
  1. 添加macros,用來更改默認的schema的generate_schema.sql
{% macro generate_schema_name(custom_schema_name, node) -%}

    {%- set default_schema = target.schema -%}
    {%- if custom_schema_name is none -%}

        {{ default_schema }}

    {%- else -%}

        {{ custom_schema_name | trim }}

    {%- endif -%}

{%- endmacro %}
  1. 修改dbt_project.yml,添加名為staging的schema,這樣在staing文件夾下的所有的表,都會進入到snowflake的staging下
models:
  dbt_kahan:
    # Config indicated by + and applies to all files under models/example/
    staging:
      +materialized: view
      +schema: staging
image.png

3. 對dbt的表進行持久化materialization

3.1 方法一:整體修改dbt_project.yml

  • 如果我們想對表進行各種持久化的操作需要用materialization,例如將表改為table,view等,可以在dbt_project.yml直接修改
  1. 將staging下example的表改為table,默認是view
models:
  dbt_kahan:
    # Config indicated by + and applies to all files under models/example/
    staging:
      +materialized: view
      +schema: staging
      example:
        +materialized: table
image.png

3.2 方法二:對單個modol進行修改

  • 將上面的first example改為view,在my_first_example_dbt.sql里添加

image.png
  1. macros 本質(zhì)是test
  2. seed 里面的數(shù)據(jù)應該是很少變動的,用來被引用的

3.3 ephemeral

  • 不是snowflake或者其他工具的內(nèi)置類型,只是dbt自定義的,所以將model設置為ephemeral不會出現(xiàn)在snowflake里,只會在dbt內(nèi)部當作CTE來使用
  • 優(yōu)點:可以當作表來使用,可以保持數(shù)據(jù)庫干凈
  • 缺點:難以調(diào)試和顯示

4. 添加變量

  1. dbt_project.yml添加變量
name: 'dbt_kahan'
version: '1.0.0'
config-version: 2

vars:
  current_champion: Lakers
  1. 使用變量
with champion_team as (
    select *,
        case when team = '{{var("current_champion")}}' then 'is_champion else NULL END as flag_champion
    from teams

6. Hooks

當我們想在model執(zhí)行的之前或者之后執(zhí)行一些重復操作的時候,不需要每個Model都添加操作,可以使用Hooks來

6.1 dbt運行前或者后執(zhí)行操作

  • dbt run之前和之后,執(zhí)行我們定義的sql statement,在dbt test這類操作也可以添加測試前的hooks
on-run-start:
  - "{% for schema in schemas %}grant usage on schema {{ schema }} to group reporter; {% endfor %}"

on-run-end:
  "{{ grant_select(schemas) }}"

models:
  dbt_kahan:
    # Config indicated by + and applies to all files under models/example/
    staging:
      +materialized: view
      +schema: staging
      example:
        +materialized: table

6.2 model級別的Hooks

  • 有兩種添加方式,
  1. 方法一:直接去dbt_project.yml指定的model下添加
  • 執(zhí)行所有Model之前,先給example下的所有Model授權(quán)給REPORTER角色,然后執(zhí)行完所有Model之后,在移除PC的使用權(quán)限
models:
  dbt_kahan:
    staging:
      +materialized: view
      +schema: staging
      example:
        +pre-hook:
          - "GRANT SELECT ON {{ this }} TO ROLE REPORTER"
        +post-hook:
          - "REMOVE SELECT ON {{ this }} TO ROLE PC"
        +materialized: table

  1. 方法二:去指定的model的.sql里添加
{{ config(
  post_hook = "unload ('select from {{ this }}') to 's3:/bucket_name/{{ this }}"
) }}

select ...

7. 用docs文件夾管理開發(fā)文檔

  1. 在model文件夾下創(chuàng)建一個docs文件夾,用來管理所有的docs


    image.png
  2. 創(chuàng)建一個關于所有銷售數(shù)據(jù)的md文檔sales_details.md
{% docs salers_type %}
This is type of salers:TOP1 GOLDEN SALER

{% enddocs %}

{% docs salers_name %}
This is name of salers: PJJ,GJJ

{% enddocs %}

{% docs salers_department %}
This is department of salers: IT, HR

{% enddocs %}

  1. 使用,分別在example和snowflake_sample_data里的schema.yml使用

version: 2

models:
  - name: my_first_dbt_model
    description: "A starter dbt model"
    columns:
      - name: id
        description: '{{doc("salers_type")}}'
        tests:
          - unique
          - not_null

  - name: my_second_dbt_model
    description: "A starter dbt model"
    columns:
      - name: id
        description: '{{doc("salers_name")}}'
        tests:
          - unique
          - not_null

8.freshness & add snowflake query tag

  1. 檢查數(shù)據(jù)和當前時間的差,并定義錯誤,不是UTC時間需要轉(zhuǎn)

    tables:
      - name: customers # this will use the freshness defined above

      - name: orders
        freshness: # make this a little more strict
          warn_after: {count: 6, period: hour}
          error_after: {count: 12, period: hour}
          # Apply a where clause in the freshness query
          filter: datediff('day', _etl_loaded_at, current_timestamp) < 2
  1. 添加snowflake query tag
{{
  config(
    query_tag = 'demo',
    )
}}

select *
from {{ ref('my_first_dbt_model') }}
where id = 1

9. 引用其他項目的文件到自己的

  • 假設我們需要引用一個其他項目dbtlearn的一些文件
  1. 創(chuàng)建一個Package將其他的項目復制的本地,在本地package.yml里添加
packages:
  - git: "https://github.com/CXTV/dbt_demo.git"
    revision: master
  1. 編譯包,此時我們當前的項目就有了導入的其他項目
dbt deps
image.png

3.使用dbt_demo里的macros

select *, {{dbt_packages.dbtlearn.new_macors('Mike')}} as test_col from teams

4.修改本地項目的.gitignore防止上傳了其他項目代碼

10. 使用dbt創(chuàng)建uuid&自動清理文件夾clean&添加tag

10.1創(chuàng)建uuid

  • dbt內(nèi)置了自動創(chuàng)建唯一標識符的功能,同一個執(zhí)行的上下文,是相同的,這樣可以用來審核一些執(zhí)行
select *, '{{ invocation_id}}' as invocation_id from teams

10.2 自動清理文件夾clean

1.配置我們需要清理的文件夾在dbt_project.yml

clean-targets:       
  - "target"  
  - "dbt_packages"
  - "logs"
  1. 執(zhí)行清理
dbt clean

10.3 添加Tag

  1. 添加一個tag,dbt_project.yml
models:
  dbt_kahan:
    staging:
      +materialized: view
      +schema: staging
      example:
        +tag: p1
        +materialized: table
  1. 方法二:在Model里用config添加
{{ config(materialized='view',tags=['special']) }}
  1. 執(zhí)行指定的tag
dbt run -m tag:p1

11. DBT一些知識

  1. 可以指定每個庫使用的snowflake的 warehourse
models:
  dbt_kahan:
    staging:
      +materialized: view
      +schema: staging 
      example:
        +tag: p1
        +materialized: ephemeral
        snowflake_warehouse: pc_small
      marts:
        +materialized: table
        snowflake_warehouse: pc_large
  1. 文件結(jié)構(gòu)需要有staging(處理數(shù)據(jù)的文件夾,一般都是view),marts(最后處理完成的表,一般是table,snowflake里的實表)
  2. 最好給Models加上這個,用來測試


    image.png

4.package-codegen

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容