数据流程介绍
该SQL实现从es_source表读取近7天数据,按周维度聚合统计指标(成功率/失败率/取消率),并与上周数据进行环比计算,最终将结果写入es_sink表。核心逻辑:
- 通过日期函数动态计算周维度
- 使用窗口函数获取上周数据
- 计算周环比增长率。
- 周计算口径从周五到下周四算新的一周,即:[Fri, next-Thu]
Flink SQL 逻辑
CREATE TABLE es_source
(
send_type STRING,
task_id STRING,
month_dim STRING,
day_dim STRING,
grouping_id INTEGER,
init INTEGER,
cancel INTEGER,
succ INTEGER,
fail INTEGER,
cancel_rate float,
succ_rate float,
fail_rate float,
update_date float
)
with (
'connector' = 'elasticsearch-6',
'index' = 'xx',
'document-type' = '_doc',
'hosts' = 'http:xx:9200',
'format' = 'json'
);
CREATE TABLE es_sink
(
send_type STRING,
year_dim STRING,
month_dim STRING,
day_dim STRING,
week_dim INTEGER,
init INTEGER,
cancel INTEGER,
succ INTEGER,
fail INTEGER,
succ_rate float,
succ_week_ring float,
fail_rate float,
fail_week_ring float,
cancel_rate float,
cancel_week_ring float,
update_date STRING,
PRIMARY KEY (send_type,year_dim,week_dim) NOT ENFORCED
)
with (
'connector' = 'elasticsearch-6',
'index' = 'xx',
'document-type' = '_doc',
'hosts' = 'http://xx:9200',
'format' = 'json',
'filter.null-value'='true',
'sink.bulk-flush.max-actions' = '1000',
'sink.bulk-flush.max-size' = '10mb'
);
create view tmp1 as
select
send_type
, SUBSTRING(cast(CURRENT_DATE - INTERVAL '1' DAY AS STRING),1,4) as year_dim
, SUBSTRING(cast(CURRENT_DATE - INTERVAL '1' DAY AS STRING),1,7) as month_dim
, CAST(CURRENT_DATE - INTERVAL '1' DAY AS STRING) as day_dim
-- , CAST(week(CURRENT_DATE - INTERVAL '1' DAY) AS INTEGER) as week_dim
-- , dayofweek(CURRENT_DATE - INTERVAL '1' DAY) as day_of_week
, case when dayofweek(CURRENT_DATE - INTERVAL '1' DAY) in (1,6,7) then CAST(week(CURRENT_DATE + INTERVAL '6' DAY) AS INTEGER) else CAST(week(CURRENT_DATE - INTERVAL '1' DAY) AS INTEGER) end AS week_dim
, sum(init) as init
, sum(cancel) as cancel
, sum(succ) as succ
, sum(fail) as fail
, CAST(ROUND(sum(succ)*100.0/(sum(init) - sum(cancel)),2) AS FLOAT) AS succ_rate
, CAST(ROUND(sum(fail)*100.0/(sum(init) - sum(cancel)),2) AS FLOAT) AS fail_rate
, CAST(ROUND(sum(cancel)*100.0/sum(init),2) AS FLOAT) AS cancel_rate
from es_source
where grouping_id = 4
and TO_DATE(day_dim,'yyyy-MM-dd') >= CURRENT_DATE - INTERVAL '7' DAY
and TO_DATE(day_dim,'yyyy-MM-dd') < CURRENT_DATE
and
(
(
dayofweek(CURRENT_DATE) <> 1 -- exclude sunday
and
(
( -- week start from Friday
week(TO_DATE(day_dim, 'yyyy-MM-dd')) <> week(CURRENT_DATE)
and dayofweek(TO_DATE(day_dim, 'yyyy-MM-dd')) >= 6
)
or
( --week end to Thursday
week(TO_DATE(day_dim, 'yyyy-MM-dd')) = week(CURRENT_DATE)
and dayofweek(TO_DATE(day_dim, 'yyyy-MM-dd')) < 6
)
or
( --sunday
week(TO_DATE(day_dim, 'yyyy-MM-dd')) <> week(CURRENT_DATE)
and dayofweek(TO_DATE(day_dim, 'yyyy-MM-dd')) = 1
)
)
)
or
(
dayofweek(CURRENT_DATE) = 1 -- sunday
and
(-- week start from Friday
week(TO_DATE(day_dim, 'yyyy-MM-dd')) = week(CURRENT_DATE)
and dayofweek(TO_DATE(day_dim, 'yyyy-MM-dd')) >= 6
)
)
)
group by send_type;
create view tmp2 as
select * from
(
select
send_type
,year_dim
,month_dim
,day_dim
,week_dim
,cancel_rate
,succ_rate
,fail_rate
,row_number() over(partition by send_type order by day_dim desc) as rn
from es_sink
where week_dim <> IF(dayofweek(CURRENT_DATE - INTERVAL '1' DAY) in (1,6,7), CAST(week(CURRENT_DATE + INTERVAL '6' DAY) AS INTEGER), CAST(week(CURRENT_DATE - INTERVAL '1' DAY) AS INTEGER))
) t
where rn=1;
insert into es_sink
select
t1.send_type
,t1.year_dim
,t1.month_dim
,t1.day_dim
,t1.week_dim
,t1.init
,t1.cancel
,t1.succ
,t1.fail
,CAST(t1.succ_rate AS FLOAT) AS succ_rate
,case when t2.succ_rate is null then 0 else CAST(ROUND(CAST((t1.succ_rate - t2.succ_rate)*100/t2.succ_rate AS DOUBLE),2) AS FLOAT) end AS succ_week_ring
,CAST(t1.fail_rate AS FLOAT) AS fail_rate
,case when t2.fail_rate is null then 0 else CAST(ROUND(CAST((t1.fail_rate - t2.fail_rate)*100/t2.fail_rate AS DOUBLE),2) AS FLOAT) end AS fail_week_ring
,CAST(t1.cancel_rate AS FLOAT) AS cancel_rate
,case when t2.cancel_rate is null then 0 else CAST(ROUND(CAST((t1.cancel_rate - t2.cancel_rate)*100/t2.cancel_rate AS DOUBLE),2) AS FLOAT) end AS cancel_week_ring
,CAST(LOCALTIMESTAMP AS STRING) as update_date
from tmp1 t1
left join tmp2 t2 on t1.send_type = t2.send_type;
es mapping
{
"_doc": {
"properties": {
"send_type": {
"type": "keyword",
"ignore_above": 256
},
"year_dim": {
"type": "keyword",
"fields": {
"text": {
"type": "keyword"
},
"date": {
"type": "date",
"format": "yyyy",
"ignore_malformed":"true"
}
}
},
"month_dim": {
"type": "keyword",
"fields": {
"text": {
"type": "keyword"
},
"date": {
"type": "date",
"format": "yyyy-MM",
"ignore_malformed":"true"
}
}
},
"day_dim": {
"type": "keyword",
"fields": {
"text": {
"type": "keyword"
},
"date": {
"type": "date",
"format": "yyyy-MM-dd",
"ignore_malformed":"true"
}
}
},
"week_dim": {
"type": "keyword"
},
"init": {
"type": "integer"
},
"cancel": {
"type": "integer"
},
"succ": {
"type": "integer"
},
"fail": {
"type": "integer"
},
"cancel_rate": {
"type": "float"
},
"cancel_week_ring": {
"type": "float"
},
"succ_rate": {
"type": "float"
},
"succ_week_ring": {
"type": "float"
},
"fail_rate": {
"type": "float"
},
"fail_week_ring": {
"type": "float"
},
"update_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
}
}
}
}