Compare commits

..

5 Commits

Author SHA1 Message Date
af92fcd9a6 fix: spcify host and port for doc-server
All checks were successful
MJN/finance-dbt/pipeline/head This commit looks good
2024-07-07 14:52:12 +08:00
58034b8690 fix: requirements.txt
All checks were successful
MJN/finance-dbt/pipeline/head This commit looks good
2024-07-05 18:41:21 +08:00
54ebc311a7 fix: add wheel to requirements.txt (sometimes it is not installed, causes installation issues of dbt)
All checks were successful
MJN/finance-dbt/pipeline/head This commit looks good
2024-07-05 18:15:58 +08:00
f8d9e59c30 chores: pip3 -> pip
Some checks failed
MJN/finance-dbt/pipeline/head There was a failure building this commit
2024-07-05 18:11:40 +08:00
9d9c343b71 release: 0.1.0 add ods for minutes trade data
Some checks failed
MJN/finance-dbt/pipeline/head There was a failure building this commit
2024-07-05 17:54:16 +08:00
11 changed files with 387 additions and 267 deletions

View File

@ -4,7 +4,7 @@ WORKDIR /app
### Pip ### Pip
USER root USER root
COPY ./requirements.txt /app/ COPY ./requirements.txt /app/
RUN pip3 config set global.index-url https://nexus.mujiannan.com:5001/repository/pypiserver/simple && \ RUN pip config set global.index-url https://nexus.mujiannan.com:5001/repository/pypiserver/simple && \
pip install -r requirements.txt pip install -r requirements.txt
USER 1001 USER 1001
## Copy files ## Copy files

View File

@ -1,2 +1,2 @@
dbt docs generate dbt docs generate
dbt docs serve dbt docs serve --host=0.0.0.0 --port=8080

View File

@ -2,14 +2,16 @@
config( config(
materialized='table', materialized='table',
engine='MergeTree', engine='MergeTree',
order_by='time_id' order_by='(date_id_str, full_time_str, time_id)'
) )
}} }}
Select cast(formatDateTime(`full_time`, '%Y%m%d%H%i') As Int64) As time_id Select toYYYYMMDDhhmmss(`full_time`) As time_id
, `full_time` , `full_time`
, `date_id` , `date_id`
, hour(`full_time`) As `hour` , hour(`full_time`) As `hour`
, minute(`full_time`) As `minute` , minute(`full_time`) As `minute`
, formatDateTimeInJodaSyntax(`full_time`, 'yyyy-MM-dd HH:mm:ss') As full_time_str
, Cast(`date_id` As String) As date_id_str
From( From(
SELECT SELECT
arrayJoin( arrayJoin(

View File

@ -5,6 +5,10 @@
order_by='ts_code, date_id', order_by='ts_code, date_id',
unique_key=['ts_code', 'date_id'], unique_key=['ts_code', 'date_id'],
incremental_strategy='delete+insert', incremental_strategy='delete+insert',
query_settings={
"join_algorithm": "'full_sorting_merge'",
"max_bytes_before_external_sort": "'1000M'"
}
) )
}} }}
Select Select

View File

@ -1,38 +1,42 @@
{{ {{
config( config(
materialized='materialized_view', materialized='incremental',
engine='MergeTree', engine='MergeTree',
order_by='ts_code, time_id', order_by='ts_code, time_id',
unique_key=['ts_code', 'time_id'], unique_key=['ts_code', 'time_id'],
incremental_strategy='delete+insert',
partition_by=['cast(floor(time_id / 1000000) As Int32)'], partition_by=['cast(floor(time_id / 1000000) As Int32)'],
query_settings={ query_settings={
"join_algorithm": "'full_sorting_merge'", "join_algorithm": "'full_sorting_merge'",
"max_rows_in_set_to_optimize_join": "0", "max_bytes_before_external_sort": "'1000M'",
"max_bytes_before_external_sort": "'1000M'" "max_bytes_before_external_group_by":"'1000M'",
} }
) )
}} }}
Select Select
StgMinutes.ts_code As ts_code, OdsMinutes.ts_code As ts_code,
DimTime.time_id As time_id, OdsMinutes.time_id As time_id,
StgMinutes.close As close, OdsMinutes.close As close,
StgMinutes.open As open, OdsMinutes.open As open,
StgMinutes.high As high, OdsMinutes.high As high,
StgMinutes.low As low, OdsMinutes.low As low,
StgMinutes.vol As vol, OdsMinutes.vol As vol,
StgMinutes.amount As amount, OdsMinutes.amount As amount,
StgAdjFactor.adj_factor As adj_factor, OdsAdjFactor.adj_factor As adj_factor,
now() As dt now() As dt
From {{ source('finance', 'stg.tushare_minutes') }} As StgMinutes From {{ ref('ods.tushare_minutes') }} As OdsMinutes
Inner Join {{ ref('dw.dim_time') }} As DimTime Any Left Join {{ ref('dw.dim_time') }} As DimTime
On StgMinutes.trade_time = formatDateTimeInJodaSyntax(full_time, 'yyyy-MM-dd HH:mm:ss') On OdsMinutes.time_id = DimTime.time_id
Any Left Join {{ source('finance', 'stg.tushare_adj_factor') }} As StgAdjFactor Any Left Join {{ ref('ods.tushare_adj_factor') }} As OdsAdjFactor
On StgMinutes.ts_code = StgAdjFactor.ts_code On OdsMinutes.ts_code = OdsAdjFactor.ts_code
And StgAdjFactor.trade_date = Cast(DimTime.date_id As String) And OdsAdjFactor.date_id = DimTime.date_id
where 1 = 1 where 1 = 1
{% if is_incremental() %}
And OdsMinutes.time_id >= (Select max(`time_id`) From {{ this }})
{% endif %}
{% if 'dev' in target.name %} {% if 'dev' in target.name %}
And StgMinutes.trade_time >= formatDateTimeInJodaSyntax(today(), 'yyyy-01-01 00:00:00') And OdsMinutes.time_id >= toYYYYMMDDhhmmss(dateAdd(Month,-1,today()))
{% elif target.name == 'test' %} {% elif target.name == 'test' %}
And StgMinutes.trade_time >= formatDateTimeInJodaSyntax(dateAdd(Year,-1,today()), 'yyyy-01-01 00:00:00') And OdsMinutes.time_id >= toYYYYMMDDhhmmss(dateAdd(Year,-1,today()))
{% endif %} {% endif %}

View File

@ -94,6 +94,15 @@ models:
description: "分钟" description: "分钟"
tests: tests:
- not_null - not_null
- name: full_time_str
description: "时间字符串yyyy-MM-dd HH:mm:ss"
tests:
- not_null
- unique
- name: date_id_str
description: "日期字符串yyyy-MM-dd"
tests:
- not_null
- name: dw.dim_hs_calendar - name: dw.dim_hs_calendar
description: "沪深交易日历" description: "沪深交易日历"
tests: tests:

View File

@ -4,250 +4,250 @@ sources:
schema: '{{ env_var("FINANCE_CLICKHOUSE_DATABASE") }}' schema: '{{ env_var("FINANCE_CLICKHOUSE_DATABASE") }}'
description: "The source for financial data" description: "The source for financial data"
tables: tables:
- name: stg.tushare_trade_calendar - name: stg.tushare_trade_calendar
description: > description: >
The source table for the trade calendar data from Tushare The source table for the trade calendar data from Tushare
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- exchange
- cal_date
columns:
- name: exchange
description: "The exchange code"
tests: tests:
- dbt_utils.unique_combination_of_columns: - not_null
combination_of_columns: - name: cal_date
- exchange description: "The date"
- cal_date
columns:
- name: exchange
description: "The exchange code"
tests:
- not_null
- name: cal_date
description: "The date"
tests:
- not_null
- name: is_open
description: "Whether the exchange is open"
tests:
- not_null
- name: pretrade_date
description: "The previous trading date"
- name: stg.tushare_stock_basic
columns:
- name: ts_code
description: TS代码
tests:
- not_null
- name: symbol
description: 股票代码
- name: name
description: 股票名称
- name: area
description: 所在地域
- name: industry
description: 所属行业
- name: fullname
description: 股票全称
- name: enname
description: 英文全称
- name: cnspell
description: 拼音缩写
- name: market
description: 市场类型 (主板/中小板/创业板)
- name: exchange
description: 交易所代码
tests:
- not_null
- name: curr_type
description: 交易货币
- name: list_status
description: 上市状态: L上市 D退市 P暂停上市
- name: list_date
description: 上市日期
- name: delist_date
description: 退市日期
- name: is_hs
description: 是否沪深港通标的N否 H沪股通 S深股通
- name: act_name
description: 实控人名称
- name: act_ent_type
description: 实控人企业性质
- name: dt
description: 最后修改时间
unique_composite:
- name: unique_exchange_ts_code
columns:
- exchange
- ts_code
- name: stg.tushare_daily
description: "沪深股票交易日线数据"
tests: tests:
- dbt_utils.unique_combination_of_columns: - not_null
combination_of_columns: - name: is_open
- ts_code description: "Whether the exchange is open"
- trade_date
columns:
- name: ts_code
description: TS代码
tests:
- not_null
- name: trade_date
description: 交易日期
tests:
- not_null
- name: open
description: 开盘价
tests:
- not_null
- name: high
description: 最高价
tests:
- not_null
- name: low
description: 最低价
tests:
- not_null
- name: close
description: 收盘价
tests:
- not_null
- name: pre_close
description: 昨收价
tests:
- not_null
- name: change
description: 涨跌额
tests:
- not_null
- name: pct_chg
description: 涨跌幅
tests:
- not_null
- name: vol
description: 成交量
tests:
- not_null
- name: amount
description: 成交额
- name: stg.tushare_daily_basic
description: "沪深股票每日指标数据"
tests: tests:
- dbt_utils.unique_combination_of_columns: - not_null
combination_of_columns: - name: pretrade_date
- ts_code description: "The previous trading date"
- trade_date - name: stg.tushare_stock_basic
columns:
- name: ts_code
description: TS代码
tests:
- not_null
- name: symbol
description: 股票代码
- name: name
description: 股票名称
- name: area
description: 所在地域
- name: industry
description: 所属行业
- name: fullname
description: 股票全称
- name: enname
description: 英文全称
- name: cnspell
description: 拼音缩写
- name: market
description: 市场类型 (主板/中小板/创业板)
- name: exchange
description: 交易所代码
tests:
- not_null
- name: curr_type
description: 交易货币
- name: list_status
description: 上市状态: L上市 D退市 P暂停上市
- name: list_date
description: 上市日期
- name: delist_date
description: 退市日期
- name: is_hs
description: 是否沪深港通标的N否 H沪股通 S深股通
- name: act_name
description: 实控人名称
- name: act_ent_type
description: 实控人企业性质
- name: dt
description: 最后修改时间
unique_composite:
- name: unique_exchange_ts_code
columns:
- exchange
- ts_code
- name: stg.tushare_daily
description: "沪深股票交易日线数据"
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- ts_code
- trade_date
columns:
- name: ts_code
description: TS代码
tests:
- not_null
- name: trade_date
description: 交易日期
tests:
- not_null
- name: open
description: 开盘价
tests:
- not_null
- name: high
description: 最高价
tests:
- not_null
- name: low
description: 最低价
tests:
- not_null
- name: close
description: 收盘价
tests:
- not_null
- name: pre_close
description: 昨收价
tests:
- not_null
- name: change
description: 涨跌额
tests:
- not_null
- name: pct_chg
description: 涨跌幅
tests:
- not_null
- name: vol
description: 成交量
tests:
- not_null
- name: amount
description: 成交额
- name: stg.tushare_daily_basic
description: "沪深股票每日指标数据"
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- ts_code
- trade_date
columns: columns:
- name: ts_code - name: ts_code
description: "股票代码" description: "股票代码"
data_type: String data_type: String
- name: trade_date - name: trade_date
description: "交易日期" description: "交易日期"
data_type: String data_type: String
- name: close - name: close
description: "当日收盘价" description: "当日收盘价"
data_type: Nullable(Float32) data_type: Nullable(Float32)
- name: turnover_rate - name: turnover_rate
description: "换手率(%" description: "换手率(%"
data_type: Nullable(Float32) data_type: Nullable(Float32)
- name: turnover_rate_f - name: turnover_rate_f
description: "换手率(自由流通股)" description: "换手率(自由流通股)"
data_type: Nullable(Float32) data_type: Nullable(Float32)
- name: volume_ratio - name: volume_ratio
description: "量比" description: "量比"
data_type: Nullable(Float32) data_type: Nullable(Float32)
- name: pe - name: pe
description: "市盈率(总市值/净利润, 亏损的PE为空" description: "市盈率(总市值/净利润, 亏损的PE为空"
data_type: Nullable(Float32) data_type: Nullable(Float32)
- name: pe_ttm - name: pe_ttm
description: "市盈率TTM亏损的PE为空" description: "市盈率TTM亏损的PE为空"
data_type: Nullable(Float32) data_type: Nullable(Float32)
- name: pb - name: pb
description: "市净率(总市值/净资产)" description: "市净率(总市值/净资产)"
data_type: Nullable(Float32) data_type: Nullable(Float32)
- name: ps - name: ps
description: "市销率" description: "市销率"
data_type: Nullable(Float32) data_type: Nullable(Float32)
- name: ps_ttm - name: ps_ttm
description: "市销率TTM" description: "市销率TTM"
data_type: Nullable(Float32) data_type: Nullable(Float32)
- name: dv_ratio - name: dv_ratio
description: "股息率 %" description: "股息率 %"
data_type: Nullable(Float32) data_type: Nullable(Float32)
- name: dv_ttm - name: dv_ttm
description: "股息率TTM%" description: "股息率TTM%"
data_type: Nullable(Float32) data_type: Nullable(Float32)
- name: total_share - name: total_share
description: "总股本 (万股)" description: "总股本 (万股)"
data_type: Nullable(Float32) data_type: Nullable(Float32)
- name: float_share - name: float_share
description: "流通股本 (万股)" description: "流通股本 (万股)"
data_type: Nullable(Float32) data_type: Nullable(Float32)
- name: free_share - name: free_share
description: "自由流通股本 (万)" description: "自由流通股本 (万)"
data_type: Nullable(Float32) data_type: Nullable(Float32)
- name: total_mv - name: total_mv
description: "总市值 (万元)" description: "总市值 (万元)"
data_type: Nullable(Float32) data_type: Nullable(Float32)
- name: circ_mv - name: circ_mv
description: "流通市值(万元)" description: "流通市值(万元)"
data_type: Nullable(Float32) data_type: Nullable(Float32)
- name: stg.tushare_adj_factor - name: stg.tushare_adj_factor
description: "沪深股票复权因子" description: "沪深股票复权因子"
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- ts_code
- trade_date
columns:
- name: ts_code
description: "股票代码"
data_type: String
- name: trade_date
description: "交易日期"
data_type: String
- name: adj_factor
description: "复权因子"
data_type: Nullable(Decimal(16, 6))
- name: dt
description: "更新时间"
data_type: DateTime
- name: stg.tushare_minutes
description: "沪深分钟级交易数据"
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- ts_code
- trade_time
config:
where: "left(trade_time,7)>=concat(cast(year(today())-1 As String), '-01')"
columns:
- name: ts_code
description: "The stock code."
data_type: String
tests: tests:
- dbt_utils.unique_combination_of_columns: - not_null
combination_of_columns: - name: trade_time
- ts_code description: "The trading time."
- trade_date data_type: String
columns:
- name: ts_code
description: "股票代码"
data_type: String
- name: trade_date
description: "交易日期"
data_type: String
- name: adj_factor
description: "复权因子"
data_type: Nullable(Decimal(16, 6))
- name: dt
description: "更新日期"
data_type: DateTime
- name: stg.tushare_minutes
description: "沪深分钟级交易数据"
tests: tests:
- dbt_utils.unique_combination_of_columns: - not_null
combination_of_columns: - name: close
- ts_code description: "The closing price."
- trade_time data_type: Nullable(Float32)
config: - name: open
where: "left(trade_time,7)>=concat(cast(year(today())-1 As String), '-01')" description: "The opening price."
columns: data_type: Nullable(Float32)
- name: ts_code - name: high
description: "The stock code." description: "The highest price."
data_type: String data_type: Nullable(Float32)
tests: - name: low
- not_null description: "The lowest price."
- name: trade_time data_type: Nullable(Float32)
description: "The trading time." - name: vol
data_type: String description: "The volume of trades."
tests: data_type: Nullable(Float32)
- not_null - name: amount
- name: close description: "The amount of trades."
description: "The closing price." data_type: Nullable(Float32)
data_type: Nullable(Float32) meta:
- name: open engine: ReplacingMergeTree
description: "The opening price." partition_by: "left(trade_time, 7)"
data_type: Nullable(Float32) order_by: "(ts_code, trade_time)"
- name: high settings:
description: "The highest price." index_granularity: 8192
data_type: Nullable(Float32)
- name: low
description: "The lowest price."
data_type: Nullable(Float32)
- name: vol
description: "The volume of trades."
data_type: Nullable(Float32)
- name: amount
description: "The amount of trades."
data_type: Nullable(Float32)
meta:
engine: ReplacingMergeTree
partition_by: "left(trade_time, 7)"
order_by: "(ts_code, trade_time)"
settings:
index_granularity: 8192

View File

@ -0,0 +1,19 @@
{{
config(
materialized='incremental',
engine="MergeTree",
order_by="ts_code, date_id",
unique_key=['ts_code', 'date_id'],
incremental_strategy='delete+insert',
)
}}
Select
ts_code As ts_code,
toYYYYMMDD(toDate(trade_date)) As date_id,
adj_factor As adj_factor,
dt As dt
From {{ source('finance', 'stg.tushare_adj_factor') }} As StgAdjFactor
{% if is_incremental() %}
Where `trade_date` >= (Select formatDateTimeInJodaSyntax(YYYYMMDDToDate(max(`date_id`)), 'yyyy-MM-dd') From {{ this }})
{% endif %}

View File

@ -0,0 +1,29 @@
{{
config(
materialized='incremental',
engine="MergeTree",
order_by='ts_code, time_id',
unique_key=['ts_code', 'time_id'],
incremental_strategy='delete+insert',
)
}}
Select
StgMinutes.ts_code As ts_code,
toYYYYMMDDhhmmss(toDateTime(`StgMinutes`.`trade_time`)) As time_id,
StgMinutes.close As close,
StgMinutes.open As open,
StgMinutes.high As high,
StgMinutes.low As low,
StgMinutes.vol As vol,
StgMinutes.amount As amount
From {{ source('finance', 'stg.tushare_minutes') }} As StgMinutes
where 1 = 1
{% if is_incremental() %}
And StgMinutes.trade_time >= (Select formatDateTimeInJodaSyntax(YYYYMMDDhhmmssToDateTime(max(time_id)), 'yyyy-MM-dd hh:mm:ss') From {{ this }})
{% endif %}
{% if 'dev' in target.name %}
And StgMinutes.trade_time >= formatDateTimeInJodaSyntax(dateAdd(Month,-1,today()), 'yyyy-01-01 00:00:00')
{% elif target.name == 'test' %}
And StgMinutes.trade_time >= formatDateTimeInJodaSyntax(dateAdd(Year,-1,today()), 'yyyy-01-01 00:00:00')
{% endif %}

54
models/ods/schema.yml Normal file
View File

@ -0,0 +1,54 @@
version: 2
models:
- name: ods.tushare_minutes
description: "沪深股票交易分钟线数据"
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- ts_code
- time_id
columns:
- name: ts_code
description: TS代码
tests:
- not_null
- name: time_id
description: 交易日期
tests:
- not_null
- name: close
description: 收盘价
- name: open
description: 开盘价
- name: high
description: 最高价
- name: low
description: 最低价
- name: vol
description: 成交量
- name: amount
description: 成交额
- name: dt
description: 更新时间
- name: ods.tushare_adj_factor
description: "沪深股票复权因子"
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- ts_code
- date_id
columns:
- name: ts_code
description: TS代码
tests:
- not_null
- name: date_id
description: 交易日期
tests:
- not_null
- name: adj_factor
description: 复权因子
- name: dt
description: 更新时间

View File

@ -1,2 +1 @@
dbt==1.0.0.37.0 dbt-clickhouse==1.8.0
dbt-clickhouse==1.7.3