【代码实践】starRocks 窗口函数(udf)实践

发布于:2024-05-10 ⋅ 阅读:(29) ⋅ 点赞:(0)

背景说明

实现天粒度的同比计算

重点说明

  1. 要求数据是连续的
  2. 因为天粒度的同比,需要365天,但为了方便测试,当前的判断逻辑是计算5天的前,而不是365天前的

参考文档

https://docs.starrocks.io/zh/docs/sql-reference/sql-functions/JAVA_UDF/#%E5%89%8D%E6%8F%90%E6%9D%A1%E4%BB%B6

参考SQL

select shop_id, 
       biz_date, active_num, 
       DAY_ON_DAY(active_num) OVER (PARTITION BY shop_id
                                           ORDER BY biz_date
                                           ROWS BETWEEN 365 PRECEDING AND CURRENT ROW )
  FROM test.tb_123;
  

字段说明

SQL说明

一级位置 二级位置 说明
1 维度信息,例如:店铺
2 计算同比的时间字段和对应数据值
3 DAY_ON_DAY 对应数据值
3 PARTITION BY 维度信息,例如:店铺
3 ORDER BY 时间字段
4 表名

查询结果

需要两个数据都有,所以不计算同比,直接把两个数字返回。若没有历史的数据,则返回0。

shop_id biz_date active_num DAY_ON_DAY
123 2024/3/1 566 0
123 2024/3/2 566 0
123 2024/3/3 566 0
123 2024/3/4 568 0
123 2024/3/5 569 0
123 2024/3/6 572 566
123 2024/3/7 572 566
123 2024/3/8 576 566
123 2024/3/9 577 568
123 2024/3/10 577 569
123 2024/3/11 577 572
123 2024/3/12 580 572

窗口函数创建&删除语句

DROP GLOBAL FUNCTION DAY_ON_DAY(bigint);

CREATE GLOBAL AGGREGATE FUNCTION DAY_ON_DAY(bigint)
RETURNS bigint
PROPERTIES 
(
    "analytic" = "true",
    "symbol" = "com.starrocks.udf.udwf.DayOnDay", 
    "type" = "StarrocksJar", 
    "file" = "http://ip:port/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"    
);



java代码实现

package com.starrocks.udf.udwf;

/**

 **/
public class DayOnDay {
    public static class State {
        long counter = 0;
        public int serializeLength() { return 4; }
        @Override
        public String toString() {
            return "State{" +
                    "counter=" + counter +
                    '}';
        }
    }

    public DayOnDay.State create() {
        return new DayOnDay.State();
    }

    public void destroy(DayOnDay.State state) {

    }

    public void update(DayOnDay.State state, Long val) {
        if (val != null) {
            state.counter+=val;
        }
    }

    public void serialize(DayOnDay.State state, java.nio.ByteBuffer buff) {
        buff.putLong(state.counter);
    }

    public void merge(DayOnDay.State state, java.nio.ByteBuffer buffer) {
        long val = buffer.getLong();
        state.counter += val;
    }

    public Long finalize(DayOnDay.State state) {
        return state.counter;
    }

    public void reset(DayOnDay.State state) {
        state.counter = 0;
    }

    public void windowUpdate(DayOnDay.State state,
                             int peer_group_start, int peer_group_end,
                             int frame_start, int frame_end,
                             Long[] inputs) {
        // 间隔天数 本算法是用于计算 天粒度的同比
        // 默认365,暂时不考虑闰年366天的误差
        // frame_start 是从0开始 frame_end 是需要-1 所以数组下标是ii[0, frame_end-1]
        int array_start = frame_start;
        int array_end = frame_end - 1 ;
        int INTERVAL = 5 ;

//        state.counter = array_end*10000 + frame_start;
        if (INTERVAL <= (array_end - array_start)) {
            if (0 <inputs[array_end-INTERVAL]) {
                state.counter = (inputs[array_end]-inputs[(array_end-INTERVAL)])*10000/inputs[(array_end-INTERVAL)];
//                state.counter = inputs[array_end]*10000 + inputs[(array_end-INTERVAL)];
                // 需要两个数据都有,所以不计算同比,直接把两个数字返回
                state.counter = inputs[(array_end-INTERVAL)];
            }
        }
        else {
            // 若没有的话,返回0
            state.counter = 0;
        }

    }

}

问题记录

【解决】数据过大导致int溢出

因为保留4位小数,所以GMV比较大,导致int无法满足,需要切换为bigint,对应类型long。
代码中int替换为long,窗口函数入参修改为bigint。