1.創(chuàng)建項目
1.1 創(chuàng)建多個項目
這次我們不使用命令行配置,直接更改profiles.yml
- 在
profiles.yml新建一個環(huán)境dbt_kahan,項目里的yml和profiles.yml名字要對應
image.png - 填寫
profiles.yml的配置信息(多項目)
image.png
3.測試
dbt debug
1.2 創(chuàng)建多個staging來區(qū)分不同的database

image.png
-
根據(jù)snowflake上面的數(shù)據(jù)庫名稱創(chuàng)建對應的staging
image.png - 創(chuàng)建需要的表的
schema.yml
version: 2
sources:
- name: snowflake_sample_data #dbt里引用的名字
database: SNOWFLAKE_SAMPLE_DATA #實際數(shù)據(jù)庫的庫名
schema: TPCDS_SF10TCL_OLD #數(shù)據(jù)庫的schema名稱
tables:
- name: STORE_SALES #數(shù)據(jù)庫里的表名
- 創(chuàng)建一個dag名為
ssr_store_sales.sql,用來將sources里的表,移動到demo_db的view里
WITH ssr_store_sales as(
select * from {{ source('snowflake_sample_data', 'STORE_SALES') }}
)
select * from ssr_store_sales
2. 定制自己的schema
- 默認情況下,我們在最開始配置
profiles.yml已經(jīng)hard code了schema為public
image.png
但是,實際工作中,我們根據(jù)業(yè)務的處理用到很多的schema,所以需要我們根據(jù)業(yè)務的不同場景來切換staging
- 添加macros,用來更改默認的schema的
generate_schema.sql
{% macro generate_schema_name(custom_schema_name, node) -%}
{%- set default_schema = target.schema -%}
{%- if custom_schema_name is none -%}
{{ default_schema }}
{%- else -%}
{{ custom_schema_name | trim }}
{%- endif -%}
{%- endmacro %}
- 修改
dbt_project.yml,添加名為staging的schema,這樣在staing文件夾下的所有的表,都會進入到snowflake的staging下
models:
dbt_kahan:
# Config indicated by + and applies to all files under models/example/
staging:
+materialized: view
+schema: staging

image.png
3. 對dbt的表進行持久化materialization
3.1 方法一:整體修改dbt_project.yml
- 如果我們想對表進行各種持久化的操作需要用materialization,例如將表改為table,view等,可以在dbt_project.yml直接修改
- 將staging下example的表改為table,默認是view
models:
dbt_kahan:
# Config indicated by + and applies to all files under models/example/
staging:
+materialized: view
+schema: staging
example:
+materialized: table

image.png
3.2 方法二:對單個modol進行修改
- 將上面的first example改為view,在my_first_example_dbt.sql里添加

image.png
- macros 本質(zhì)是test
- seed 里面的數(shù)據(jù)應該是很少變動的,用來被引用的
3.3 ephemeral
- 不是snowflake或者其他工具的內(nèi)置類型,只是dbt自定義的,所以將model設置為ephemeral不會出現(xiàn)在snowflake里,只會在dbt內(nèi)部當作CTE來使用
- 優(yōu)點:可以當作表來使用,可以保持數(shù)據(jù)庫干凈
- 缺點:難以調(diào)試和顯示
4. 添加變量
- 在
dbt_project.yml添加變量
name: 'dbt_kahan'
version: '1.0.0'
config-version: 2
vars:
current_champion: Lakers
- 使用變量
with champion_team as (
select *,
case when team = '{{var("current_champion")}}' then 'is_champion else NULL END as flag_champion
from teams
6. Hooks
當我們想在model執(zhí)行的之前或者之后執(zhí)行一些重復操作的時候,不需要每個Model都添加操作,可以使用Hooks來
6.1 dbt運行前或者后執(zhí)行操作
- 在
dbt run之前和之后,執(zhí)行我們定義的sql statement,在dbt test這類操作也可以添加測試前的hooks
on-run-start:
- "{% for schema in schemas %}grant usage on schema {{ schema }} to group reporter; {% endfor %}"
on-run-end:
"{{ grant_select(schemas) }}"
models:
dbt_kahan:
# Config indicated by + and applies to all files under models/example/
staging:
+materialized: view
+schema: staging
example:
+materialized: table
6.2 model級別的Hooks
- 有兩種添加方式,
- 方法一:直接去
dbt_project.yml指定的model下添加
- 執(zhí)行所有Model之前,先給example下的所有Model授權(quán)給REPORTER角色,然后執(zhí)行完所有Model之后,在移除PC的使用權(quán)限
models:
dbt_kahan:
staging:
+materialized: view
+schema: staging
example:
+pre-hook:
- "GRANT SELECT ON {{ this }} TO ROLE REPORTER"
+post-hook:
- "REMOVE SELECT ON {{ this }} TO ROLE PC"
+materialized: table
- 方法二:去指定的model的
.sql里添加
{{ config(
post_hook = "unload ('select from {{ this }}') to 's3:/bucket_name/{{ this }}"
) }}
select ...
7. 用docs文件夾管理開發(fā)文檔
-
在model文件夾下創(chuàng)建一個docs文件夾,用來管理所有的docs
image.png - 創(chuàng)建一個關于所有銷售數(shù)據(jù)的md文檔
sales_details.md
{% docs salers_type %}
This is type of salers:TOP1 GOLDEN SALER
{% enddocs %}
{% docs salers_name %}
This is name of salers: PJJ,GJJ
{% enddocs %}
{% docs salers_department %}
This is department of salers: IT, HR
{% enddocs %}
- 使用,分別在example和snowflake_sample_data里的schema.yml使用
version: 2
models:
- name: my_first_dbt_model
description: "A starter dbt model"
columns:
- name: id
description: '{{doc("salers_type")}}'
tests:
- unique
- not_null
- name: my_second_dbt_model
description: "A starter dbt model"
columns:
- name: id
description: '{{doc("salers_name")}}'
tests:
- unique
- not_null
8.freshness & add snowflake query tag
- 檢查數(shù)據(jù)和當前時間的差,并定義錯誤,不是UTC時間需要轉(zhuǎn)
tables:
- name: customers # this will use the freshness defined above
- name: orders
freshness: # make this a little more strict
warn_after: {count: 6, period: hour}
error_after: {count: 12, period: hour}
# Apply a where clause in the freshness query
filter: datediff('day', _etl_loaded_at, current_timestamp) < 2
- 添加snowflake query tag
{{
config(
query_tag = 'demo',
)
}}
select *
from {{ ref('my_first_dbt_model') }}
where id = 1
9. 引用其他項目的文件到自己的
- 假設我們需要引用一個其他項目dbtlearn的一些文件
- 創(chuàng)建一個Package將其他的項目復制的本地,在本地
package.yml里添加
packages:
- git: "https://github.com/CXTV/dbt_demo.git"
revision: master
- 編譯包,此時我們當前的項目就有了導入的其他項目
dbt deps

image.png
3.使用dbt_demo里的macros
select *, {{dbt_packages.dbtlearn.new_macors('Mike')}} as test_col from teams
4.修改本地項目的.gitignore防止上傳了其他項目代碼
10. 使用dbt創(chuàng)建uuid&自動清理文件夾clean&添加tag
10.1創(chuàng)建uuid
- dbt內(nèi)置了自動創(chuàng)建唯一標識符的功能,同一個執(zhí)行的上下文,是相同的,這樣可以用來審核一些執(zhí)行
select *, '{{ invocation_id}}' as invocation_id from teams
10.2 自動清理文件夾clean
1.配置我們需要清理的文件夾在dbt_project.yml里
clean-targets:
- "target"
- "dbt_packages"
- "logs"
- 執(zhí)行清理
dbt clean
10.3 添加Tag
- 添加一個tag,
dbt_project.yml里
models:
dbt_kahan:
staging:
+materialized: view
+schema: staging
example:
+tag: p1
+materialized: table
- 方法二:在Model里用config添加
{{ config(materialized='view',tags=['special']) }}
- 執(zhí)行指定的tag
dbt run -m tag:p1
11. DBT一些知識
- 可以指定每個庫使用的snowflake的 warehourse
models:
dbt_kahan:
staging:
+materialized: view
+schema: staging
example:
+tag: p1
+materialized: ephemeral
snowflake_warehouse: pc_small
marts:
+materialized: table
snowflake_warehouse: pc_large
- 文件結(jié)構(gòu)需要有staging(處理數(shù)據(jù)的文件夾,一般都是view),marts(最后處理完成的表,一般是table,snowflake里的實表)
-
最好給Models加上這個,用來測試
image.png
4.package-codegen





