release: 0.1.0 add ods for minutes trade data
Some checks failed
MJN/finance-dbt/pipeline/head There was a failure building this commit

This commit is contained in:
沈楠 2024-07-05 17:54:16 +08:00
parent 7fcbfe07f4
commit 9d9c343b71
8 changed files with 384 additions and 263 deletions

View File

@ -2,14 +2,16 @@
config( config(
materialized='table', materialized='table',
engine='MergeTree', engine='MergeTree',
order_by='time_id' order_by='(date_id_str, full_time_str, time_id)'
) )
}} }}
Select cast(formatDateTime(`full_time`, '%Y%m%d%H%i') As Int64) As time_id Select toYYYYMMDDhhmmss(`full_time`) As time_id
, `full_time` , `full_time`
, `date_id` , `date_id`
, hour(`full_time`) As `hour` , hour(`full_time`) As `hour`
, minute(`full_time`) As `minute` , minute(`full_time`) As `minute`
, formatDateTimeInJodaSyntax(`full_time`, 'yyyy-MM-dd HH:mm:ss') As full_time_str
, Cast(`date_id` As String) As date_id_str
From( From(
SELECT SELECT
arrayJoin( arrayJoin(

View File

@ -5,6 +5,10 @@
order_by='ts_code, date_id', order_by='ts_code, date_id',
unique_key=['ts_code', 'date_id'], unique_key=['ts_code', 'date_id'],
incremental_strategy='delete+insert', incremental_strategy='delete+insert',
query_settings={
"join_algorithm": "'full_sorting_merge'",
"max_bytes_before_external_sort": "'1000M'"
}
) )
}} }}
Select Select

View File

@ -1,38 +1,42 @@
{{ {{
config( config(
materialized='materialized_view', materialized='incremental',
engine='MergeTree', engine='MergeTree',
order_by='ts_code, time_id', order_by='ts_code, time_id',
unique_key=['ts_code', 'time_id'], unique_key=['ts_code', 'time_id'],
incremental_strategy='delete+insert',
partition_by=['cast(floor(time_id / 1000000) As Int32)'], partition_by=['cast(floor(time_id / 1000000) As Int32)'],
query_settings={ query_settings={
"join_algorithm": "'full_sorting_merge'", "join_algorithm": "'full_sorting_merge'",
"max_rows_in_set_to_optimize_join": "0", "max_bytes_before_external_sort": "'1000M'",
"max_bytes_before_external_sort": "'1000M'" "max_bytes_before_external_group_by":"'1000M'",
} }
) )
}} }}
Select Select
StgMinutes.ts_code As ts_code, OdsMinutes.ts_code As ts_code,
DimTime.time_id As time_id, OdsMinutes.time_id As time_id,
StgMinutes.close As close, OdsMinutes.close As close,
StgMinutes.open As open, OdsMinutes.open As open,
StgMinutes.high As high, OdsMinutes.high As high,
StgMinutes.low As low, OdsMinutes.low As low,
StgMinutes.vol As vol, OdsMinutes.vol As vol,
StgMinutes.amount As amount, OdsMinutes.amount As amount,
StgAdjFactor.adj_factor As adj_factor, OdsAdjFactor.adj_factor As adj_factor,
now() As dt now() As dt
From {{ source('finance', 'stg.tushare_minutes') }} As StgMinutes From {{ ref('ods.tushare_minutes') }} As OdsMinutes
Inner Join {{ ref('dw.dim_time') }} As DimTime Any Left Join {{ ref('dw.dim_time') }} As DimTime
On StgMinutes.trade_time = formatDateTimeInJodaSyntax(full_time, 'yyyy-MM-dd HH:mm:ss') On OdsMinutes.time_id = DimTime.time_id
Any Left Join {{ source('finance', 'stg.tushare_adj_factor') }} As StgAdjFactor Any Left Join {{ ref('ods.tushare_adj_factor') }} As OdsAdjFactor
On StgMinutes.ts_code = StgAdjFactor.ts_code On OdsMinutes.ts_code = OdsAdjFactor.ts_code
And StgAdjFactor.trade_date = Cast(DimTime.date_id As String) And OdsAdjFactor.date_id = DimTime.date_id
where 1 = 1 where 1 = 1
{% if 'dev' in target.name %} {% if is_incremental() %}
And StgMinutes.trade_time >= formatDateTimeInJodaSyntax(today(), 'yyyy-01-01 00:00:00') And OdsMinutes.time_id >= (Select max(`time_id`) From {{ this }})
{% elif target.name == 'test' %} {% endif %}
And StgMinutes.trade_time >= formatDateTimeInJodaSyntax(dateAdd(Year,-1,today()), 'yyyy-01-01 00:00:00') {% if 'dev' in target.name %}
And OdsMinutes.time_id >= toYYYYMMDDhhmmss(dateAdd(Month,-1,today()))
{% elif target.name == 'test' %}
And OdsMinutes.time_id >= toYYYYMMDDhhmmss(dateAdd(Year,-1,today()))
{% endif %} {% endif %}

View File

@ -94,6 +94,15 @@ models:
description: "分钟" description: "分钟"
tests: tests:
- not_null - not_null
- name: full_time_str
description: "时间字符串yyyy-MM-dd HH:mm:ss"
tests:
- not_null
- unique
- name: date_id_str
description: "日期字符串yyyy-MM-dd"
tests:
- not_null
- name: dw.dim_hs_calendar - name: dw.dim_hs_calendar
description: "沪深交易日历" description: "沪深交易日历"
tests: tests:

View File

@ -205,7 +205,7 @@ sources:
description: "复权因子" description: "复权因子"
data_type: Nullable(Decimal(16, 6)) data_type: Nullable(Decimal(16, 6))
- name: dt - name: dt
description: "更新日期" description: "更新时间"
data_type: DateTime data_type: DateTime
- name: stg.tushare_minutes - name: stg.tushare_minutes
description: "沪深分钟级交易数据" description: "沪深分钟级交易数据"

View File

@ -0,0 +1,19 @@
{{
config(
materialized='incremental',
engine="MergeTree",
order_by="ts_code, date_id",
unique_key=['ts_code', 'date_id'],
incremental_strategy='delete+insert',
)
}}
Select
ts_code As ts_code,
toYYYYMMDD(toDate(trade_date)) As date_id,
adj_factor As adj_factor,
dt As dt
From {{ source('finance', 'stg.tushare_adj_factor') }} As StgAdjFactor
{% if is_incremental() %}
Where `trade_date` >= (Select formatDateTimeInJodaSyntax(YYYYMMDDToDate(max(`date_id`)), 'yyyy-MM-dd') From {{ this }})
{% endif %}

View File

@ -0,0 +1,29 @@
{{
config(
materialized='incremental',
engine="MergeTree",
order_by='ts_code, time_id',
unique_key=['ts_code', 'time_id'],
incremental_strategy='delete+insert',
)
}}
Select
StgMinutes.ts_code As ts_code,
toYYYYMMDDhhmmss(toDateTime(`StgMinutes`.`trade_time`)) As time_id,
StgMinutes.close As close,
StgMinutes.open As open,
StgMinutes.high As high,
StgMinutes.low As low,
StgMinutes.vol As vol,
StgMinutes.amount As amount
From {{ source('finance', 'stg.tushare_minutes') }} As StgMinutes
where 1 = 1
{% if is_incremental() %}
And StgMinutes.trade_time >= (Select formatDateTimeInJodaSyntax(YYYYMMDDhhmmssToDateTime(max(time_id)), 'yyyy-MM-dd hh:mm:ss') From {{ this }})
{% endif %}
{% if 'dev' in target.name %}
And StgMinutes.trade_time >= formatDateTimeInJodaSyntax(dateAdd(Month,-1,today()), 'yyyy-01-01 00:00:00')
{% elif target.name == 'test' %}
And StgMinutes.trade_time >= formatDateTimeInJodaSyntax(dateAdd(Year,-1,today()), 'yyyy-01-01 00:00:00')
{% endif %}

54
models/ods/schema.yml Normal file
View File

@ -0,0 +1,54 @@
version: 2
models:
- name: ods.tushare_minutes
description: "沪深股票交易分钟线数据"
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- ts_code
- time_id
columns:
- name: ts_code
description: TS代码
tests:
- not_null
- name: time_id
description: 交易日期
tests:
- not_null
- name: close
description: 收盘价
- name: open
description: 开盘价
- name: high
description: 最高价
- name: low
description: 最低价
- name: vol
description: 成交量
- name: amount
description: 成交额
- name: dt
description: 更新时间
- name: ods.tushare_adj_factor
description: "沪深股票复权因子"
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- ts_code
- date_id
columns:
- name: ts_code
description: TS代码
tests:
- not_null
- name: date_id
description: 交易日期
tests:
- not_null
- name: adj_factor
description: 复权因子
- name: dt
description: 更新时间