feat: use ReplacingMergeTree for ods/dw fact tables

! Have to optimize parts after dbt run incrementially
This commit is contained in:
沈楠 2024-07-10 16:14:09 +08:00
parent af92fcd9a6
commit f1efaf2c2e
6 changed files with 29 additions and 26 deletions

View File

@ -2,7 +2,7 @@
config(
materialized='table',
engine='MergeTree',
order_by='(date_id_str, full_time_str, time_id)'
order_by='time_id',
)
}}
Select toYYYYMMDDhhmmss(`full_time`) As time_id

View File

@ -1,13 +1,12 @@
{{
config(
materialized='incremental',
engine='MergeTree',
order_by='ts_code, date_id',
unique_key=['ts_code', 'date_id'],
incremental_strategy='delete+insert',
engine='ReplacingMergeTree',
order_by='(date_id, ts_code)',
unique_key=['date_id', 'ts_code'],
incremental_strategy='append',
query_settings={
"join_algorithm": "'full_sorting_merge'",
"max_bytes_before_external_sort": "'1000M'"
"join_algorithm": "'full_sorting_merge'",
}
)
}}
@ -39,7 +38,7 @@ Select
StgDailyBasic.free_share As free_share,
StgDailyBasic.total_mv As total_mv,
StgDailyBasic.circ_mv As circ_mv
From {{ source('finance', 'stg.tushare_daily') }} As StgDaily Final
From {{ source('finance', 'stg.tushare_daily') }} As StgDaily
Inner Join {{ ref('dw.dim_date') }} As DimDate
On StgDaily.trade_date = Cast(DimDate.date_id As String)
Any Left Join {{ source('finance', 'stg.tushare_daily_basic') }} As StgDailyBasic

View File

@ -1,15 +1,13 @@
{{
config(
materialized='incremental',
engine='MergeTree',
order_by='ts_code, time_id',
unique_key=['ts_code', 'time_id'],
incremental_strategy='delete+insert',
partition_by=['cast(floor(time_id / 1000000) As Int32)'],
engine='ReplacingMergeTree',
order_by='(time_id, ts_code)',
unique_key=['time_id', 'ts_code'],
incremental_strategy='append',
partition_by=['toYYYYMM(toDateTime(time_id))'],
query_settings={
"join_algorithm": "'full_sorting_merge'",
"max_bytes_before_external_sort": "'1000M'",
"max_bytes_before_external_group_by":"'1000M'",
}
)
}}
@ -33,7 +31,7 @@ Any Left Join {{ ref('ods.tushare_adj_factor') }} As OdsAdjFactor
And OdsAdjFactor.date_id = DimTime.date_id
where 1 = 1
{% if is_incremental() %}
And OdsMinutes.time_id >= (Select max(`time_id`) From {{ this }})
And OdsMinutes.time_id >= (Select toYYYYMMDDhhmmss(toStartOfDay(YYYYMMDDhhmmssToDateTime(max(`time_id`)))) From {{ this }})
{% endif %}
{% if 'dev' in target.name %}
And OdsMinutes.time_id >= toYYYYMMDDhhmmss(dateAdd(Month,-1,today()))

View File

@ -1,10 +1,10 @@
{{
config(
materialized='incremental',
engine="MergeTree",
order_by="ts_code, date_id",
unique_key=['ts_code', 'date_id'],
incremental_strategy='delete+insert',
engine="ReplacingMergeTree",
order_by="(date_id, ts_code)",
unique_key=['date_id', 'ts_code'],
incremental_strategy='append',
)
}}

View File

@ -1,10 +1,16 @@
{{
config(
materialized='incremental',
engine="MergeTree",
order_by='ts_code, time_id',
unique_key=['ts_code', 'time_id'],
incremental_strategy='delete+insert',
engine="ReplacingMergeTree",
order_by='(time_id, ts_code)',
unique_key=['time_id', 'ts_code'],
partition_by=['toYYYYMM(toDateTime(time_id))'],
incremental_strategy='append',
query_settings={
"join_algorithm": "'full_sorting_merge'",
"max_bytes_before_external_sort": "'1000M'",
"max_bytes_before_external_group_by":"'1000M'",
}
)
}}
@ -20,7 +26,7 @@ Select
From {{ source('finance', 'stg.tushare_minutes') }} As StgMinutes
where 1 = 1
{% if is_incremental() %}
And StgMinutes.trade_time >= (Select formatDateTimeInJodaSyntax(YYYYMMDDhhmmssToDateTime(max(time_id)), 'yyyy-MM-dd hh:mm:ss') From {{ this }})
And StgMinutes.trade_time >= (Select formatDateTimeInJodaSyntax(toStartOfDay(YYYYMMDDhhmmssToDateTime(max(time_id))), 'yyyy-MM-dd HH:mm:ss') From {{ this }})
{% endif %}
{% if 'dev' in target.name %}
And StgMinutes.trade_time >= formatDateTimeInJodaSyntax(dateAdd(Month,-1,today()), 'yyyy-01-01 00:00:00')

View File

@ -15,7 +15,7 @@ finance_dbt:
retries: 1
compression: gzip
connect_timeout: 10
send_receive_timeout: 300
send_receive_timeout: 24000
cluster_mode: False
use_lw_deletes: True
check_exchange: True