From f1efaf2c2efd81869b6a1505f51ec7d2fdc10e2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B2=88=E6=A5=A0?= Date: Wed, 10 Jul 2024 16:14:09 +0800 Subject: [PATCH] feat: use ReplacingMergeTree for ods/dw fact tables ! Have to optimize parts after dbt run incrementially --- models/dw/dw.dim_time.sql | 2 +- models/dw/dw.fact_stock_daily.sql | 13 ++++++------- models/dw/dw.fact_stock_minute.sql | 14 ++++++-------- models/ods/ods.tushare_adj_factor.sql | 8 ++++---- models/ods/ods.tushare_minutes.sql | 16 +++++++++++----- profiles.yml | 2 +- 6 files changed, 29 insertions(+), 26 deletions(-) diff --git a/models/dw/dw.dim_time.sql b/models/dw/dw.dim_time.sql index 198908d..edd2736 100644 --- a/models/dw/dw.dim_time.sql +++ b/models/dw/dw.dim_time.sql @@ -2,7 +2,7 @@ config( materialized='table', engine='MergeTree', - order_by='(date_id_str, full_time_str, time_id)' + order_by='time_id', ) }} Select toYYYYMMDDhhmmss(`full_time`) As time_id diff --git a/models/dw/dw.fact_stock_daily.sql b/models/dw/dw.fact_stock_daily.sql index 3eba141..ff6f4c7 100644 --- a/models/dw/dw.fact_stock_daily.sql +++ b/models/dw/dw.fact_stock_daily.sql @@ -1,13 +1,12 @@ {{ config( materialized='incremental', - engine='MergeTree', - order_by='ts_code, date_id', - unique_key=['ts_code', 'date_id'], - incremental_strategy='delete+insert', + engine='ReplacingMergeTree', + order_by='(date_id, ts_code)', + unique_key=['date_id', 'ts_code'], + incremental_strategy='append', query_settings={ - "join_algorithm": "'full_sorting_merge'", - "max_bytes_before_external_sort": "'1000M'" + "join_algorithm": "'full_sorting_merge'", } ) }} @@ -39,7 +38,7 @@ Select StgDailyBasic.free_share As free_share, StgDailyBasic.total_mv As total_mv, StgDailyBasic.circ_mv As circ_mv -From {{ source('finance', 'stg.tushare_daily') }} As StgDaily Final +From {{ source('finance', 'stg.tushare_daily') }} As StgDaily Inner Join {{ ref('dw.dim_date') }} As DimDate On StgDaily.trade_date = Cast(DimDate.date_id As String) Any Left Join {{ source('finance', 'stg.tushare_daily_basic') }} As StgDailyBasic diff --git a/models/dw/dw.fact_stock_minute.sql b/models/dw/dw.fact_stock_minute.sql index 62f0ff4..c98f805 100644 --- a/models/dw/dw.fact_stock_minute.sql +++ b/models/dw/dw.fact_stock_minute.sql @@ -1,15 +1,13 @@ {{ config( materialized='incremental', - engine='MergeTree', - order_by='ts_code, time_id', - unique_key=['ts_code', 'time_id'], - incremental_strategy='delete+insert', - partition_by=['cast(floor(time_id / 1000000) As Int32)'], + engine='ReplacingMergeTree', + order_by='(time_id, ts_code)', + unique_key=['time_id', 'ts_code'], + incremental_strategy='append', + partition_by=['toYYYYMM(toDateTime(time_id))'], query_settings={ "join_algorithm": "'full_sorting_merge'", - "max_bytes_before_external_sort": "'1000M'", - "max_bytes_before_external_group_by":"'1000M'", } ) }} @@ -33,7 +31,7 @@ Any Left Join {{ ref('ods.tushare_adj_factor') }} As OdsAdjFactor And OdsAdjFactor.date_id = DimTime.date_id where 1 = 1 {% if is_incremental() %} - And OdsMinutes.time_id >= (Select max(`time_id`) From {{ this }}) + And OdsMinutes.time_id >= (Select toYYYYMMDDhhmmss(toStartOfDay(YYYYMMDDhhmmssToDateTime(max(`time_id`)))) From {{ this }}) {% endif %} {% if 'dev' in target.name %} And OdsMinutes.time_id >= toYYYYMMDDhhmmss(dateAdd(Month,-1,today())) diff --git a/models/ods/ods.tushare_adj_factor.sql b/models/ods/ods.tushare_adj_factor.sql index 051add1..c8df741 100644 --- a/models/ods/ods.tushare_adj_factor.sql +++ b/models/ods/ods.tushare_adj_factor.sql @@ -1,10 +1,10 @@ {{ config( materialized='incremental', - engine="MergeTree", - order_by="ts_code, date_id", - unique_key=['ts_code', 'date_id'], - incremental_strategy='delete+insert', + engine="ReplacingMergeTree", + order_by="(date_id, ts_code)", + unique_key=['date_id', 'ts_code'], + incremental_strategy='append', ) }} diff --git a/models/ods/ods.tushare_minutes.sql b/models/ods/ods.tushare_minutes.sql index 3d721a6..bb3c547 100644 --- a/models/ods/ods.tushare_minutes.sql +++ b/models/ods/ods.tushare_minutes.sql @@ -1,10 +1,16 @@ {{ config( materialized='incremental', - engine="MergeTree", - order_by='ts_code, time_id', - unique_key=['ts_code', 'time_id'], - incremental_strategy='delete+insert', + engine="ReplacingMergeTree", + order_by='(time_id, ts_code)', + unique_key=['time_id', 'ts_code'], + partition_by=['toYYYYMM(toDateTime(time_id))'], + incremental_strategy='append', + query_settings={ + "join_algorithm": "'full_sorting_merge'", + "max_bytes_before_external_sort": "'1000M'", + "max_bytes_before_external_group_by":"'1000M'", + } ) }} @@ -20,7 +26,7 @@ Select From {{ source('finance', 'stg.tushare_minutes') }} As StgMinutes where 1 = 1 {% if is_incremental() %} - And StgMinutes.trade_time >= (Select formatDateTimeInJodaSyntax(YYYYMMDDhhmmssToDateTime(max(time_id)), 'yyyy-MM-dd hh:mm:ss') From {{ this }}) + And StgMinutes.trade_time >= (Select formatDateTimeInJodaSyntax(toStartOfDay(YYYYMMDDhhmmssToDateTime(max(time_id))), 'yyyy-MM-dd HH:mm:ss') From {{ this }}) {% endif %} {% if 'dev' in target.name %} And StgMinutes.trade_time >= formatDateTimeInJodaSyntax(dateAdd(Month,-1,today()), 'yyyy-01-01 00:00:00') diff --git a/profiles.yml b/profiles.yml index c189897..5506a4d 100644 --- a/profiles.yml +++ b/profiles.yml @@ -15,7 +15,7 @@ finance_dbt: retries: 1 compression: gzip connect_timeout: 10 - send_receive_timeout: 300 + send_receive_timeout: 24000 cluster_mode: False use_lw_deletes: True check_exchange: True