Compare commits

...

5 Commits

Author SHA1 Message Date
af92fcd9a6 fix: spcify host and port for doc-server
All checks were successful
MJN/finance-dbt/pipeline/head This commit looks good
2024-07-07 14:52:12 +08:00
58034b8690 fix: requirements.txt
All checks were successful
MJN/finance-dbt/pipeline/head This commit looks good
2024-07-05 18:41:21 +08:00
54ebc311a7 fix: add wheel to requirements.txt (sometimes it is not installed, causes installation issues of dbt)
All checks were successful
MJN/finance-dbt/pipeline/head This commit looks good
2024-07-05 18:15:58 +08:00
f8d9e59c30 chores: pip3 -> pip
Some checks failed
MJN/finance-dbt/pipeline/head There was a failure building this commit
2024-07-05 18:11:40 +08:00
9d9c343b71 release: 0.1.0 add ods for minutes trade data
Some checks failed
MJN/finance-dbt/pipeline/head There was a failure building this commit
2024-07-05 17:54:16 +08:00
11 changed files with 387 additions and 267 deletions

View File

@ -4,7 +4,7 @@ WORKDIR /app
### Pip
USER root
COPY ./requirements.txt /app/
RUN pip3 config set global.index-url https://nexus.mujiannan.com:5001/repository/pypiserver/simple && \
RUN pip config set global.index-url https://nexus.mujiannan.com:5001/repository/pypiserver/simple && \
pip install -r requirements.txt
USER 1001
## Copy files

View File

@ -1,2 +1,2 @@
dbt docs generate
dbt docs serve
dbt docs serve --host=0.0.0.0 --port=8080

View File

@ -2,14 +2,16 @@
config(
materialized='table',
engine='MergeTree',
order_by='time_id'
order_by='(date_id_str, full_time_str, time_id)'
)
}}
Select cast(formatDateTime(`full_time`, '%Y%m%d%H%i') As Int64) As time_id
Select toYYYYMMDDhhmmss(`full_time`) As time_id
, `full_time`
, `date_id`
, hour(`full_time`) As `hour`
, minute(`full_time`) As `minute`
, formatDateTimeInJodaSyntax(`full_time`, 'yyyy-MM-dd HH:mm:ss') As full_time_str
, Cast(`date_id` As String) As date_id_str
From(
SELECT
arrayJoin(

View File

@ -5,6 +5,10 @@
order_by='ts_code, date_id',
unique_key=['ts_code', 'date_id'],
incremental_strategy='delete+insert',
query_settings={
"join_algorithm": "'full_sorting_merge'",
"max_bytes_before_external_sort": "'1000M'"
}
)
}}
Select

View File

@ -1,38 +1,42 @@
{{
config(
materialized='materialized_view',
materialized='incremental',
engine='MergeTree',
order_by='ts_code, time_id',
unique_key=['ts_code', 'time_id'],
incremental_strategy='delete+insert',
partition_by=['cast(floor(time_id / 1000000) As Int32)'],
query_settings={
"join_algorithm": "'full_sorting_merge'",
"max_rows_in_set_to_optimize_join": "0",
"max_bytes_before_external_sort": "'1000M'"
"max_bytes_before_external_sort": "'1000M'",
"max_bytes_before_external_group_by":"'1000M'",
}
)
}}
Select
StgMinutes.ts_code As ts_code,
DimTime.time_id As time_id,
StgMinutes.close As close,
StgMinutes.open As open,
StgMinutes.high As high,
StgMinutes.low As low,
StgMinutes.vol As vol,
StgMinutes.amount As amount,
StgAdjFactor.adj_factor As adj_factor,
OdsMinutes.ts_code As ts_code,
OdsMinutes.time_id As time_id,
OdsMinutes.close As close,
OdsMinutes.open As open,
OdsMinutes.high As high,
OdsMinutes.low As low,
OdsMinutes.vol As vol,
OdsMinutes.amount As amount,
OdsAdjFactor.adj_factor As adj_factor,
now() As dt
From {{ source('finance', 'stg.tushare_minutes') }} As StgMinutes
Inner Join {{ ref('dw.dim_time') }} As DimTime
On StgMinutes.trade_time = formatDateTimeInJodaSyntax(full_time, 'yyyy-MM-dd HH:mm:ss')
Any Left Join {{ source('finance', 'stg.tushare_adj_factor') }} As StgAdjFactor
On StgMinutes.ts_code = StgAdjFactor.ts_code
And StgAdjFactor.trade_date = Cast(DimTime.date_id As String)
From {{ ref('ods.tushare_minutes') }} As OdsMinutes
Any Left Join {{ ref('dw.dim_time') }} As DimTime
On OdsMinutes.time_id = DimTime.time_id
Any Left Join {{ ref('ods.tushare_adj_factor') }} As OdsAdjFactor
On OdsMinutes.ts_code = OdsAdjFactor.ts_code
And OdsAdjFactor.date_id = DimTime.date_id
where 1 = 1
{% if 'dev' in target.name %}
And StgMinutes.trade_time >= formatDateTimeInJodaSyntax(today(), 'yyyy-01-01 00:00:00')
{% elif target.name == 'test' %}
And StgMinutes.trade_time >= formatDateTimeInJodaSyntax(dateAdd(Year,-1,today()), 'yyyy-01-01 00:00:00')
{% if is_incremental() %}
And OdsMinutes.time_id >= (Select max(`time_id`) From {{ this }})
{% endif %}
{% if 'dev' in target.name %}
And OdsMinutes.time_id >= toYYYYMMDDhhmmss(dateAdd(Month,-1,today()))
{% elif target.name == 'test' %}
And OdsMinutes.time_id >= toYYYYMMDDhhmmss(dateAdd(Year,-1,today()))
{% endif %}

View File

@ -94,6 +94,15 @@ models:
description: "分钟"
tests:
- not_null
- name: full_time_str
description: "时间字符串yyyy-MM-dd HH:mm:ss"
tests:
- not_null
- unique
- name: date_id_str
description: "日期字符串yyyy-MM-dd"
tests:
- not_null
- name: dw.dim_hs_calendar
description: "沪深交易日历"
tests:

View File

@ -205,7 +205,7 @@ sources:
description: "复权因子"
data_type: Nullable(Decimal(16, 6))
- name: dt
description: "更新日期"
description: "更新时间"
data_type: DateTime
- name: stg.tushare_minutes
description: "沪深分钟级交易数据"

View File

@ -0,0 +1,19 @@
{{
config(
materialized='incremental',
engine="MergeTree",
order_by="ts_code, date_id",
unique_key=['ts_code', 'date_id'],
incremental_strategy='delete+insert',
)
}}
Select
ts_code As ts_code,
toYYYYMMDD(toDate(trade_date)) As date_id,
adj_factor As adj_factor,
dt As dt
From {{ source('finance', 'stg.tushare_adj_factor') }} As StgAdjFactor
{% if is_incremental() %}
Where `trade_date` >= (Select formatDateTimeInJodaSyntax(YYYYMMDDToDate(max(`date_id`)), 'yyyy-MM-dd') From {{ this }})
{% endif %}

View File

@ -0,0 +1,29 @@
{{
config(
materialized='incremental',
engine="MergeTree",
order_by='ts_code, time_id',
unique_key=['ts_code', 'time_id'],
incremental_strategy='delete+insert',
)
}}
Select
StgMinutes.ts_code As ts_code,
toYYYYMMDDhhmmss(toDateTime(`StgMinutes`.`trade_time`)) As time_id,
StgMinutes.close As close,
StgMinutes.open As open,
StgMinutes.high As high,
StgMinutes.low As low,
StgMinutes.vol As vol,
StgMinutes.amount As amount
From {{ source('finance', 'stg.tushare_minutes') }} As StgMinutes
where 1 = 1
{% if is_incremental() %}
And StgMinutes.trade_time >= (Select formatDateTimeInJodaSyntax(YYYYMMDDhhmmssToDateTime(max(time_id)), 'yyyy-MM-dd hh:mm:ss') From {{ this }})
{% endif %}
{% if 'dev' in target.name %}
And StgMinutes.trade_time >= formatDateTimeInJodaSyntax(dateAdd(Month,-1,today()), 'yyyy-01-01 00:00:00')
{% elif target.name == 'test' %}
And StgMinutes.trade_time >= formatDateTimeInJodaSyntax(dateAdd(Year,-1,today()), 'yyyy-01-01 00:00:00')
{% endif %}

54
models/ods/schema.yml Normal file
View File

@ -0,0 +1,54 @@
version: 2
models:
- name: ods.tushare_minutes
description: "沪深股票交易分钟线数据"
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- ts_code
- time_id
columns:
- name: ts_code
description: TS代码
tests:
- not_null
- name: time_id
description: 交易日期
tests:
- not_null
- name: close
description: 收盘价
- name: open
description: 开盘价
- name: high
description: 最高价
- name: low
description: 最低价
- name: vol
description: 成交量
- name: amount
description: 成交额
- name: dt
description: 更新时间
- name: ods.tushare_adj_factor
description: "沪深股票复权因子"
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- ts_code
- date_id
columns:
- name: ts_code
description: TS代码
tests:
- not_null
- name: date_id
description: 交易日期
tests:
- not_null
- name: adj_factor
description: 复权因子
- name: dt
description: 更新时间

View File

@ -1,2 +1 @@
dbt==1.0.0.37.0
dbt-clickhouse==1.7.3
dbt-clickhouse==1.8.0