Compare commits

..

No commits in common. "af92fcd9a655c7455705a70bbf33a419ffc766da" and "7fcbfe07f4b11e6643631dc0ec850a2174f7190e" have entirely different histories.

11 changed files with 266 additions and 386 deletions

View File

@ -4,7 +4,7 @@ WORKDIR /app
### Pip ### Pip
USER root USER root
COPY ./requirements.txt /app/ COPY ./requirements.txt /app/
RUN pip config set global.index-url https://nexus.mujiannan.com:5001/repository/pypiserver/simple && \ RUN pip3 config set global.index-url https://nexus.mujiannan.com:5001/repository/pypiserver/simple && \
pip install -r requirements.txt pip install -r requirements.txt
USER 1001 USER 1001
## Copy files ## Copy files

View File

@ -1,2 +1,2 @@
dbt docs generate dbt docs generate
dbt docs serve --host=0.0.0.0 --port=8080 dbt docs serve

View File

@ -2,16 +2,14 @@
config( config(
materialized='table', materialized='table',
engine='MergeTree', engine='MergeTree',
order_by='(date_id_str, full_time_str, time_id)' order_by='time_id'
) )
}} }}
Select toYYYYMMDDhhmmss(`full_time`) As time_id Select cast(formatDateTime(`full_time`, '%Y%m%d%H%i') As Int64) As time_id
, `full_time` , `full_time`
, `date_id` , `date_id`
, hour(`full_time`) As `hour` , hour(`full_time`) As `hour`
, minute(`full_time`) As `minute` , minute(`full_time`) As `minute`
, formatDateTimeInJodaSyntax(`full_time`, 'yyyy-MM-dd HH:mm:ss') As full_time_str
, Cast(`date_id` As String) As date_id_str
From( From(
SELECT SELECT
arrayJoin( arrayJoin(

View File

@ -5,10 +5,6 @@
order_by='ts_code, date_id', order_by='ts_code, date_id',
unique_key=['ts_code', 'date_id'], unique_key=['ts_code', 'date_id'],
incremental_strategy='delete+insert', incremental_strategy='delete+insert',
query_settings={
"join_algorithm": "'full_sorting_merge'",
"max_bytes_before_external_sort": "'1000M'"
}
) )
}} }}
Select Select

View File

@ -1,42 +1,38 @@
{{ {{
config( config(
materialized='incremental', materialized='materialized_view',
engine='MergeTree', engine='MergeTree',
order_by='ts_code, time_id', order_by='ts_code, time_id',
unique_key=['ts_code', 'time_id'], unique_key=['ts_code', 'time_id'],
incremental_strategy='delete+insert',
partition_by=['cast(floor(time_id / 1000000) As Int32)'], partition_by=['cast(floor(time_id / 1000000) As Int32)'],
query_settings={ query_settings={
"join_algorithm": "'full_sorting_merge'", "join_algorithm": "'full_sorting_merge'",
"max_bytes_before_external_sort": "'1000M'", "max_rows_in_set_to_optimize_join": "0",
"max_bytes_before_external_group_by":"'1000M'", "max_bytes_before_external_sort": "'1000M'"
} }
) )
}} }}
Select Select
OdsMinutes.ts_code As ts_code, StgMinutes.ts_code As ts_code,
OdsMinutes.time_id As time_id, DimTime.time_id As time_id,
OdsMinutes.close As close, StgMinutes.close As close,
OdsMinutes.open As open, StgMinutes.open As open,
OdsMinutes.high As high, StgMinutes.high As high,
OdsMinutes.low As low, StgMinutes.low As low,
OdsMinutes.vol As vol, StgMinutes.vol As vol,
OdsMinutes.amount As amount, StgMinutes.amount As amount,
OdsAdjFactor.adj_factor As adj_factor, StgAdjFactor.adj_factor As adj_factor,
now() As dt now() As dt
From {{ ref('ods.tushare_minutes') }} As OdsMinutes From {{ source('finance', 'stg.tushare_minutes') }} As StgMinutes
Any Left Join {{ ref('dw.dim_time') }} As DimTime Inner Join {{ ref('dw.dim_time') }} As DimTime
On OdsMinutes.time_id = DimTime.time_id On StgMinutes.trade_time = formatDateTimeInJodaSyntax(full_time, 'yyyy-MM-dd HH:mm:ss')
Any Left Join {{ ref('ods.tushare_adj_factor') }} As OdsAdjFactor Any Left Join {{ source('finance', 'stg.tushare_adj_factor') }} As StgAdjFactor
On OdsMinutes.ts_code = OdsAdjFactor.ts_code On StgMinutes.ts_code = StgAdjFactor.ts_code
And OdsAdjFactor.date_id = DimTime.date_id And StgAdjFactor.trade_date = Cast(DimTime.date_id As String)
where 1 = 1 where 1 = 1
{% if is_incremental() %}
And OdsMinutes.time_id >= (Select max(`time_id`) From {{ this }})
{% endif %}
{% if 'dev' in target.name %} {% if 'dev' in target.name %}
And OdsMinutes.time_id >= toYYYYMMDDhhmmss(dateAdd(Month,-1,today())) And StgMinutes.trade_time >= formatDateTimeInJodaSyntax(today(), 'yyyy-01-01 00:00:00')
{% elif target.name == 'test' %} {% elif target.name == 'test' %}
And OdsMinutes.time_id >= toYYYYMMDDhhmmss(dateAdd(Year,-1,today())) And StgMinutes.trade_time >= formatDateTimeInJodaSyntax(dateAdd(Year,-1,today()), 'yyyy-01-01 00:00:00')
{% endif %} {% endif %}

View File

@ -94,15 +94,6 @@ models:
description: "分钟" description: "分钟"
tests: tests:
- not_null - not_null
- name: full_time_str
description: "时间字符串yyyy-MM-dd HH:mm:ss"
tests:
- not_null
- unique
- name: date_id_str
description: "日期字符串yyyy-MM-dd"
tests:
- not_null
- name: dw.dim_hs_calendar - name: dw.dim_hs_calendar
description: "沪深交易日历" description: "沪深交易日历"
tests: tests:

View File

@ -205,7 +205,7 @@ sources:
description: "复权因子" description: "复权因子"
data_type: Nullable(Decimal(16, 6)) data_type: Nullable(Decimal(16, 6))
- name: dt - name: dt
description: "更新时间" description: "更新日期"
data_type: DateTime data_type: DateTime
- name: stg.tushare_minutes - name: stg.tushare_minutes
description: "沪深分钟级交易数据" description: "沪深分钟级交易数据"

View File

@ -1,19 +0,0 @@
{{
config(
materialized='incremental',
engine="MergeTree",
order_by="ts_code, date_id",
unique_key=['ts_code', 'date_id'],
incremental_strategy='delete+insert',
)
}}
Select
ts_code As ts_code,
toYYYYMMDD(toDate(trade_date)) As date_id,
adj_factor As adj_factor,
dt As dt
From {{ source('finance', 'stg.tushare_adj_factor') }} As StgAdjFactor
{% if is_incremental() %}
Where `trade_date` >= (Select formatDateTimeInJodaSyntax(YYYYMMDDToDate(max(`date_id`)), 'yyyy-MM-dd') From {{ this }})
{% endif %}

View File

@ -1,29 +0,0 @@
{{
config(
materialized='incremental',
engine="MergeTree",
order_by='ts_code, time_id',
unique_key=['ts_code', 'time_id'],
incremental_strategy='delete+insert',
)
}}
Select
StgMinutes.ts_code As ts_code,
toYYYYMMDDhhmmss(toDateTime(`StgMinutes`.`trade_time`)) As time_id,
StgMinutes.close As close,
StgMinutes.open As open,
StgMinutes.high As high,
StgMinutes.low As low,
StgMinutes.vol As vol,
StgMinutes.amount As amount
From {{ source('finance', 'stg.tushare_minutes') }} As StgMinutes
where 1 = 1
{% if is_incremental() %}
And StgMinutes.trade_time >= (Select formatDateTimeInJodaSyntax(YYYYMMDDhhmmssToDateTime(max(time_id)), 'yyyy-MM-dd hh:mm:ss') From {{ this }})
{% endif %}
{% if 'dev' in target.name %}
And StgMinutes.trade_time >= formatDateTimeInJodaSyntax(dateAdd(Month,-1,today()), 'yyyy-01-01 00:00:00')
{% elif target.name == 'test' %}
And StgMinutes.trade_time >= formatDateTimeInJodaSyntax(dateAdd(Year,-1,today()), 'yyyy-01-01 00:00:00')
{% endif %}

View File

@ -1,54 +0,0 @@
version: 2
models:
- name: ods.tushare_minutes
description: "沪深股票交易分钟线数据"
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- ts_code
- time_id
columns:
- name: ts_code
description: TS代码
tests:
- not_null
- name: time_id
description: 交易日期
tests:
- not_null
- name: close
description: 收盘价
- name: open
description: 开盘价
- name: high
description: 最高价
- name: low
description: 最低价
- name: vol
description: 成交量
- name: amount
description: 成交额
- name: dt
description: 更新时间
- name: ods.tushare_adj_factor
description: "沪深股票复权因子"
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- ts_code
- date_id
columns:
- name: ts_code
description: TS代码
tests:
- not_null
- name: date_id
description: 交易日期
tests:
- not_null
- name: adj_factor
description: 复权因子
- name: dt
description: 更新时间

View File

@ -1 +1,2 @@
dbt-clickhouse==1.8.0 dbt==1.0.0.37.0
dbt-clickhouse==1.7.3