Compare commits

..

5 Commits

Author SHA1 Message Date
af92fcd9a6 fix: spcify host and port for doc-server
All checks were successful
MJN/finance-dbt/pipeline/head This commit looks good
2024-07-07 14:52:12 +08:00
58034b8690 fix: requirements.txt
All checks were successful
MJN/finance-dbt/pipeline/head This commit looks good
2024-07-05 18:41:21 +08:00
54ebc311a7 fix: add wheel to requirements.txt (sometimes it is not installed, causes installation issues of dbt)
All checks were successful
MJN/finance-dbt/pipeline/head This commit looks good
2024-07-05 18:15:58 +08:00
f8d9e59c30 chores: pip3 -> pip
Some checks failed
MJN/finance-dbt/pipeline/head There was a failure building this commit
2024-07-05 18:11:40 +08:00
9d9c343b71 release: 0.1.0 add ods for minutes trade data
Some checks failed
MJN/finance-dbt/pipeline/head There was a failure building this commit
2024-07-05 17:54:16 +08:00
11 changed files with 387 additions and 267 deletions

View File

@ -4,7 +4,7 @@ WORKDIR /app
### Pip ### Pip
USER root USER root
COPY ./requirements.txt /app/ COPY ./requirements.txt /app/
RUN pip3 config set global.index-url https://nexus.mujiannan.com:5001/repository/pypiserver/simple && \ RUN pip config set global.index-url https://nexus.mujiannan.com:5001/repository/pypiserver/simple && \
pip install -r requirements.txt pip install -r requirements.txt
USER 1001 USER 1001
## Copy files ## Copy files

View File

@ -1,2 +1,2 @@
dbt docs generate dbt docs generate
dbt docs serve dbt docs serve --host=0.0.0.0 --port=8080

View File

@ -2,14 +2,16 @@
config( config(
materialized='table', materialized='table',
engine='MergeTree', engine='MergeTree',
order_by='time_id' order_by='(date_id_str, full_time_str, time_id)'
) )
}} }}
Select cast(formatDateTime(`full_time`, '%Y%m%d%H%i') As Int64) As time_id Select toYYYYMMDDhhmmss(`full_time`) As time_id
, `full_time` , `full_time`
, `date_id` , `date_id`
, hour(`full_time`) As `hour` , hour(`full_time`) As `hour`
, minute(`full_time`) As `minute` , minute(`full_time`) As `minute`
, formatDateTimeInJodaSyntax(`full_time`, 'yyyy-MM-dd HH:mm:ss') As full_time_str
, Cast(`date_id` As String) As date_id_str
From( From(
SELECT SELECT
arrayJoin( arrayJoin(

View File

@ -5,6 +5,10 @@
order_by='ts_code, date_id', order_by='ts_code, date_id',
unique_key=['ts_code', 'date_id'], unique_key=['ts_code', 'date_id'],
incremental_strategy='delete+insert', incremental_strategy='delete+insert',
query_settings={
"join_algorithm": "'full_sorting_merge'",
"max_bytes_before_external_sort": "'1000M'"
}
) )
}} }}
Select Select

View File

@ -1,38 +1,42 @@
{{ {{
config( config(
materialized='materialized_view', materialized='incremental',
engine='MergeTree', engine='MergeTree',
order_by='ts_code, time_id', order_by='ts_code, time_id',
unique_key=['ts_code', 'time_id'], unique_key=['ts_code', 'time_id'],
incremental_strategy='delete+insert',
partition_by=['cast(floor(time_id / 1000000) As Int32)'], partition_by=['cast(floor(time_id / 1000000) As Int32)'],
query_settings={ query_settings={
"join_algorithm": "'full_sorting_merge'", "join_algorithm": "'full_sorting_merge'",
"max_rows_in_set_to_optimize_join": "0", "max_bytes_before_external_sort": "'1000M'",
"max_bytes_before_external_sort": "'1000M'" "max_bytes_before_external_group_by":"'1000M'",
} }
) )
}} }}
Select Select
StgMinutes.ts_code As ts_code, OdsMinutes.ts_code As ts_code,
DimTime.time_id As time_id, OdsMinutes.time_id As time_id,
StgMinutes.close As close, OdsMinutes.close As close,
StgMinutes.open As open, OdsMinutes.open As open,
StgMinutes.high As high, OdsMinutes.high As high,
StgMinutes.low As low, OdsMinutes.low As low,
StgMinutes.vol As vol, OdsMinutes.vol As vol,
StgMinutes.amount As amount, OdsMinutes.amount As amount,
StgAdjFactor.adj_factor As adj_factor, OdsAdjFactor.adj_factor As adj_factor,
now() As dt now() As dt
From {{ source('finance', 'stg.tushare_minutes') }} As StgMinutes From {{ ref('ods.tushare_minutes') }} As OdsMinutes
Inner Join {{ ref('dw.dim_time') }} As DimTime Any Left Join {{ ref('dw.dim_time') }} As DimTime
On StgMinutes.trade_time = formatDateTimeInJodaSyntax(full_time, 'yyyy-MM-dd HH:mm:ss') On OdsMinutes.time_id = DimTime.time_id
Any Left Join {{ source('finance', 'stg.tushare_adj_factor') }} As StgAdjFactor Any Left Join {{ ref('ods.tushare_adj_factor') }} As OdsAdjFactor
On StgMinutes.ts_code = StgAdjFactor.ts_code On OdsMinutes.ts_code = OdsAdjFactor.ts_code
And StgAdjFactor.trade_date = Cast(DimTime.date_id As String) And OdsAdjFactor.date_id = DimTime.date_id
where 1 = 1 where 1 = 1
{% if 'dev' in target.name %} {% if is_incremental() %}
And StgMinutes.trade_time >= formatDateTimeInJodaSyntax(today(), 'yyyy-01-01 00:00:00') And OdsMinutes.time_id >= (Select max(`time_id`) From {{ this }})
{% elif target.name == 'test' %} {% endif %}
And StgMinutes.trade_time >= formatDateTimeInJodaSyntax(dateAdd(Year,-1,today()), 'yyyy-01-01 00:00:00') {% if 'dev' in target.name %}
And OdsMinutes.time_id >= toYYYYMMDDhhmmss(dateAdd(Month,-1,today()))
{% elif target.name == 'test' %}
And OdsMinutes.time_id >= toYYYYMMDDhhmmss(dateAdd(Year,-1,today()))
{% endif %} {% endif %}

View File

@ -94,6 +94,15 @@ models:
description: "分钟" description: "分钟"
tests: tests:
- not_null - not_null
- name: full_time_str
description: "时间字符串yyyy-MM-dd HH:mm:ss"
tests:
- not_null
- unique
- name: date_id_str
description: "日期字符串yyyy-MM-dd"
tests:
- not_null
- name: dw.dim_hs_calendar - name: dw.dim_hs_calendar
description: "沪深交易日历" description: "沪深交易日历"
tests: tests:

View File

@ -205,7 +205,7 @@ sources:
description: "复权因子" description: "复权因子"
data_type: Nullable(Decimal(16, 6)) data_type: Nullable(Decimal(16, 6))
- name: dt - name: dt
description: "更新日期" description: "更新时间"
data_type: DateTime data_type: DateTime
- name: stg.tushare_minutes - name: stg.tushare_minutes
description: "沪深分钟级交易数据" description: "沪深分钟级交易数据"

View File

@ -0,0 +1,19 @@
{{
config(
materialized='incremental',
engine="MergeTree",
order_by="ts_code, date_id",
unique_key=['ts_code', 'date_id'],
incremental_strategy='delete+insert',
)
}}
Select
ts_code As ts_code,
toYYYYMMDD(toDate(trade_date)) As date_id,
adj_factor As adj_factor,
dt As dt
From {{ source('finance', 'stg.tushare_adj_factor') }} As StgAdjFactor
{% if is_incremental() %}
Where `trade_date` >= (Select formatDateTimeInJodaSyntax(YYYYMMDDToDate(max(`date_id`)), 'yyyy-MM-dd') From {{ this }})
{% endif %}

View File

@ -0,0 +1,29 @@
{{
config(
materialized='incremental',
engine="MergeTree",
order_by='ts_code, time_id',
unique_key=['ts_code', 'time_id'],
incremental_strategy='delete+insert',
)
}}
Select
StgMinutes.ts_code As ts_code,
toYYYYMMDDhhmmss(toDateTime(`StgMinutes`.`trade_time`)) As time_id,
StgMinutes.close As close,
StgMinutes.open As open,
StgMinutes.high As high,
StgMinutes.low As low,
StgMinutes.vol As vol,
StgMinutes.amount As amount
From {{ source('finance', 'stg.tushare_minutes') }} As StgMinutes
where 1 = 1
{% if is_incremental() %}
And StgMinutes.trade_time >= (Select formatDateTimeInJodaSyntax(YYYYMMDDhhmmssToDateTime(max(time_id)), 'yyyy-MM-dd hh:mm:ss') From {{ this }})
{% endif %}
{% if 'dev' in target.name %}
And StgMinutes.trade_time >= formatDateTimeInJodaSyntax(dateAdd(Month,-1,today()), 'yyyy-01-01 00:00:00')
{% elif target.name == 'test' %}
And StgMinutes.trade_time >= formatDateTimeInJodaSyntax(dateAdd(Year,-1,today()), 'yyyy-01-01 00:00:00')
{% endif %}

54
models/ods/schema.yml Normal file
View File

@ -0,0 +1,54 @@
version: 2
models:
- name: ods.tushare_minutes
description: "沪深股票交易分钟线数据"
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- ts_code
- time_id
columns:
- name: ts_code
description: TS代码
tests:
- not_null
- name: time_id
description: 交易日期
tests:
- not_null
- name: close
description: 收盘价
- name: open
description: 开盘价
- name: high
description: 最高价
- name: low
description: 最低价
- name: vol
description: 成交量
- name: amount
description: 成交额
- name: dt
description: 更新时间
- name: ods.tushare_adj_factor
description: "沪深股票复权因子"
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- ts_code
- date_id
columns:
- name: ts_code
description: TS代码
tests:
- not_null
- name: date_id
description: 交易日期
tests:
- not_null
- name: adj_factor
description: 复权因子
- name: dt
description: 更新时间

View File

@ -1,2 +1 @@
dbt==1.0.0.37.0 dbt-clickhouse==1.8.0
dbt-clickhouse==1.7.3