基于Flink SQL的周维度聚合指标模型

发布于:2025-03-21 ⋅ 阅读:(105) ⋅ 点赞:(0)

接上文 基于Flink SQL的实时指标多维分析模型

数据流程介绍

该SQL实现从es_source表读取近7天数据,按周维度聚合统计指标(成功率/失败率/取消率),并与上周数据进行环比计算,最终将结果写入es_sink表。核心逻辑:

  1. 通过日期函数动态计算周维度
  2. 使用窗口函数获取上周数据
  3. 计算周环比增长率。
  4. 周计算口径从周五到下周四算新的一周,即:[Fri, next-Thu]
创建es_source表
创建es_sink表
生成周维度聚合视图tmp1
获取上周数据视图tmp2
关联tmp1与tmp2计算环比
插入计算结果到es_sink

Flink SQL 逻辑

CREATE TABLE es_source
(
    send_type   STRING,
    task_id     STRING,
    month_dim   STRING,
    day_dim     STRING,
    grouping_id INTEGER,
    init        INTEGER,
    cancel      INTEGER,
    succ        INTEGER,
    fail        INTEGER,
    cancel_rate float,
    succ_rate   float,
    fail_rate   float,
    update_date float
)
    with (
        'connector' = 'elasticsearch-6',
        'index' = 'xx',
        'document-type' = '_doc',
        'hosts' = 'http:xx:9200',
        'format' = 'json'
        );

CREATE TABLE es_sink
(
    send_type          STRING,
    year_dim           STRING,
    month_dim          STRING,
    day_dim            STRING,
    week_dim           INTEGER,
    init               INTEGER,
    cancel             INTEGER,
    succ               INTEGER,
    fail               INTEGER,
    succ_rate          float,
    succ_week_ring     float,
    fail_rate          float,
    fail_week_ring     float,
    cancel_rate        float,
    cancel_week_ring   float,
    update_date       STRING,
    PRIMARY KEY (send_type,year_dim,week_dim) NOT ENFORCED
)
    with (
        'connector' = 'elasticsearch-6',
        'index' = 'xx',
        'document-type' = '_doc',
        'hosts' = 'http://xx:9200',
        'format' = 'json',
        'filter.null-value'='true',
        'sink.bulk-flush.max-actions' = '1000',
        'sink.bulk-flush.max-size' = '10mb'
        );

create view tmp1 as
select
       send_type
     , SUBSTRING(cast(CURRENT_DATE - INTERVAL '1' DAY AS STRING),1,4) as year_dim
     , SUBSTRING(cast(CURRENT_DATE - INTERVAL '1' DAY AS STRING),1,7) as month_dim
     , CAST(CURRENT_DATE - INTERVAL '1' DAY AS STRING) as day_dim
--      , CAST(week(CURRENT_DATE - INTERVAL '1' DAY) AS INTEGER) as week_dim
--      , dayofweek(CURRENT_DATE - INTERVAL '1' DAY) as day_of_week
     , case when dayofweek(CURRENT_DATE - INTERVAL '1' DAY) in (1,6,7)  then CAST(week(CURRENT_DATE + INTERVAL '6' DAY) AS INTEGER) else CAST(week(CURRENT_DATE - INTERVAL '1' DAY) AS INTEGER) end AS week_dim
     , sum(init) as init
     , sum(cancel) as cancel
     , sum(succ) as succ
     , sum(fail) as fail
     , CAST(ROUND(sum(succ)*100.0/(sum(init) - sum(cancel)),2) AS FLOAT) AS succ_rate
     , CAST(ROUND(sum(fail)*100.0/(sum(init) - sum(cancel)),2) AS FLOAT) AS fail_rate
     , CAST(ROUND(sum(cancel)*100.0/sum(init),2) AS FLOAT) AS cancel_rate
from es_source
where grouping_id = 4
and TO_DATE(day_dim,'yyyy-MM-dd') >= CURRENT_DATE - INTERVAL '7' DAY
and TO_DATE(day_dim,'yyyy-MM-dd') < CURRENT_DATE
and
(
    (
        dayofweek(CURRENT_DATE) <> 1 -- exclude sunday
        and
        (
            ( -- week start from Friday
                week(TO_DATE(day_dim, 'yyyy-MM-dd')) <> week(CURRENT_DATE)
                    and dayofweek(TO_DATE(day_dim, 'yyyy-MM-dd')) >= 6
            )
            or
            ( --week end to Thursday
                week(TO_DATE(day_dim, 'yyyy-MM-dd')) = week(CURRENT_DATE)
                    and dayofweek(TO_DATE(day_dim, 'yyyy-MM-dd')) < 6
            )
            or
            ( --sunday
                week(TO_DATE(day_dim, 'yyyy-MM-dd')) <> week(CURRENT_DATE)
                    and dayofweek(TO_DATE(day_dim, 'yyyy-MM-dd')) = 1
            )
        )
    )
or
    (
        dayofweek(CURRENT_DATE) = 1 -- sunday
            and
        (-- week start from Friday
            week(TO_DATE(day_dim, 'yyyy-MM-dd')) = week(CURRENT_DATE)
                and dayofweek(TO_DATE(day_dim, 'yyyy-MM-dd')) >= 6
        )
    )
)
group by send_type;

create view tmp2 as
select * from
    (
        select
             send_type
            ,year_dim
            ,month_dim
            ,day_dim
            ,week_dim
            ,cancel_rate
            ,succ_rate
            ,fail_rate
            ,row_number() over(partition by send_type order by day_dim desc) as rn
        from es_sink
        where week_dim <> IF(dayofweek(CURRENT_DATE - INTERVAL '1' DAY) in (1,6,7), CAST(week(CURRENT_DATE + INTERVAL '6' DAY) AS INTEGER), CAST(week(CURRENT_DATE - INTERVAL '1' DAY) AS INTEGER))
    ) t
where rn=1;

insert into es_sink
select
      t1.send_type
     ,t1.year_dim
     ,t1.month_dim
     ,t1.day_dim
     ,t1.week_dim
     ,t1.init
     ,t1.cancel
     ,t1.succ
     ,t1.fail
     ,CAST(t1.succ_rate AS FLOAT) AS succ_rate
     ,case when t2.succ_rate is null then 0 else CAST(ROUND(CAST((t1.succ_rate - t2.succ_rate)*100/t2.succ_rate AS DOUBLE),2) AS FLOAT) end AS succ_week_ring
     ,CAST(t1.fail_rate AS FLOAT) AS fail_rate
     ,case when t2.fail_rate is null then 0 else CAST(ROUND(CAST((t1.fail_rate - t2.fail_rate)*100/t2.fail_rate AS DOUBLE),2) AS FLOAT) end AS fail_week_ring
     ,CAST(t1.cancel_rate AS FLOAT) AS cancel_rate
     ,case when t2.cancel_rate is null then 0 else CAST(ROUND(CAST((t1.cancel_rate - t2.cancel_rate)*100/t2.cancel_rate AS DOUBLE),2) AS FLOAT) end AS cancel_week_ring
     ,CAST(LOCALTIMESTAMP AS STRING) as update_date
from tmp1 t1
left join tmp2 t2 on t1.send_type = t2.send_type;

es mapping

{
    "_doc": {
        "properties": {
            "send_type": {
                "type": "keyword",
                "ignore_above": 256
            },
           "year_dim": {
           	"type": "keyword",
           	"fields": {
           		"text": {
           			"type": "keyword"
           		},
           		"date": {
           			"type": "date",
           			"format": "yyyy",
           			"ignore_malformed":"true" 
           		}
           	}
           },
           "month_dim": {
           	"type": "keyword",
           	"fields": {
           		"text": {
           			"type": "keyword"
           		},
           		"date": {
           			"type": "date",
           			"format": "yyyy-MM",
           			"ignore_malformed":"true"
           		}
           	}
           },
            "day_dim": {
           	"type": "keyword",
           	"fields": {
           		"text": {
           			"type": "keyword"
           		},
           		"date": {
           			"type": "date",
           			"format": "yyyy-MM-dd",
           			"ignore_malformed":"true"
           		}
           	}
           },
            "week_dim": {
                "type": "keyword"
            },
            "init": {
                "type": "integer"
            },
            "cancel": {
                "type": "integer"
            },
            "succ": {
                "type": "integer"
            },
            "fail": {
                "type": "integer"
            },
            "cancel_rate": {
                "type": "float"
            },
            "cancel_week_ring": {
                "type": "float"
            },
            "succ_rate": {
                "type": "float"
            },
            "succ_week_ring": {
                "type": "float"
            },
            "fail_rate": {
                "type": "float"
            },
            "fail_week_ring": {
                "type": "float"
            },
            "update_date": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
            }
        }
    }
}

网站公告

今日签到

点亮在社区的每一天
去签到