Flink Sql 按分钟或日期统计数据量

发布于:2025-08-17 ⋅ 阅读:(18) ⋅ 点赞:(0)

一、环境版本

环境 版本
Flink 1.17.0
Kafka 2.12
MySQL 5.7.33

【注意】Flink 1.13版本增加Cumulate Window,之前版本Flink Sql 没有 Trigger 功能,长时间的窗口不能在中途触发计算,输出中间结果。比如每 10S 更新一次截止到当前的pv、uv。只能用Trigger配合State实现,可参考如下实现方式:
Flink DataStream 按分钟或日期统计数据量

二、MySQL建表脚本

create table user_log
(
    id      int auto_increment comment '主键'
        primary key,
    uid     int    not null comment '用户id',
    event   int    not null comment '用户行为',
    logtime bigint null comment '日志时间'
)
    comment '用户日志表,作为验证数据源';

三、用户日志类

新建maven项目

用以定义Kafka和MySQL中Schema

/**
 * 用户日志类
 */
@Data
public class UserLog {
    //用户uid
    private int uid;
    //用户行为
    private int event;
    //日志时间
    private Date logtime;
    
    //获取日期,用于按日期统计数据
    public String getFormatDate() {
        return DateUtil.format(logtime, "yyyyMMdd");
    }
    
    //获取时间,精确到分钟
    public String getFormatTime() {
        return DateUtil.format(logtime, "yyyy-MM-dd HH:mm") + ":00";
    }
}
}

四、用户数据生成器

/**
 * 用户数据生成器
 */
public class UserLogGenerator {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 2.自定义数据生成器Source
        DataGeneratorSource<UserLog> dataGeneratorSource = new DataGeneratorSource<>(
                // 指定GeneratorFunction 实现类
                new GeneratorFunction<Long, UserLog>(){

                    // 定义随机数数据生成器
                    public RandomDataGenerator generator;

                    @Override
                    public void open(SourceReaderContext readerContext) throws Exception {
                        generator = new RandomDataGenerator();
                    }

                    @Override
                    public UserLog map(Long aLong) throws Exception {
                        UserLog userLog = new UserLog();
                        //随机生成用户uid
                        userLog.setUid(generator.nextInt(1, 50));
                        //随机生成用户行为
                        userLog.setEvent(generator.nextInt(1, 2));
                        //随机生成用户数据时间
                        userLog.setLogtime(DateUtil.offset(new DateTime(), DateField.MILLISECOND, generator.nextInt(-2000, 2000)));
                        return userLog;
                    }
                },
                // 指定输出数据的总行数
//                60 * 60 * 10,
                1200,
                // 指定每秒发射的记录数
                RateLimiterStrategy.perSecond(10),
                // 指定返回值类型, 将Java的StockPrice封装成到TypeInformation
                TypeInformation.of(UserLog.class)
        );

        DataStreamSource<UserLog> dataGeneratorSourceStream = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "dataGeneratorSource");
        //输出生成数据
//        dataGeneratorSourceStream.print();

        //kafka数据写入
        KafkaSink<UserLog> kafkaSink = KafkaSink.<UserLog>builder()
                .setBootstrapServers("hadoop01:9092")
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.<UserLog>builder()
                                .setTopic("userLog")
                                .setValueSerializationSchema((SerializationSchema<UserLog>) userLog -> JSONUtil.toJsonStr(userLog).getBytes())
                                .build()
                ).build();
        dataGeneratorSourceStream.sinkTo(kafkaSink);

        //MySQL数据写入,用以数据验证
        SinkFunction<UserLog> jdbcSink = JdbcSink.sink(
                "insert into user_log (uid, event, logtime) values (?, ?, ?)",
                new JdbcStatementBuilder<UserLog>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, UserLog userLog) throws SQLException {
                        preparedStatement.setInt(1, userLog.getUid());
                        preparedStatement.setInt(2, userLog.getEvent());
                        preparedStatement.setLong(3, userLog.getLogtime().getTime());
                    }
                }
                ,
                JdbcExecutionOptions.builder()
                        .withBatchSize(1000)
                        .withBatchIntervalMs(200)
                        .withMaxRetries(5)
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://192.168.31.116:3306/demo")
                        .withDriverName("com.mysql.cj.jdbc.Driver")
                        .withUsername("root")
                        .withPassword("root")
                        .build()
        );
        dataGeneratorSourceStream.addSink(jdbcSink);

        env.execute();
    }
}

五、Sql按分钟或日期统计PV和UV

public class UserLogSql {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        env.setParallelism(1);

        // 创建一个输入表SourceTable
        String sourceDDL = "create table user_log\n" +
                "(\n" +
                "    uid  INT\n" +
                "    , event INT\n" +
                "    , logtime BIGINT\n" +
                "    , rowtime AS TO_TIMESTAMP_LTZ(logtime, 3)\n" +
                "    , WATERMARK FOR rowtime AS rowtime - INTERVAL '5' SECOND\n" +
                ") with (\n" +
                "      'connector' = 'kafka'\n" +
                "      ,'topic' = 'userLog'\n" +
                "      ,'properties.bootstrap.servers' = 'hadoop01:9092'\n" +
                "      ,'scan.startup.mode' = 'latest-offset'\n" +
                "      ,'format' = 'json'\n" +
                ");";

        tableEnv.executeSql(sourceDDL);

        // 统计每分钟PV和UV
        String result = "select\n" +
                " date_format(window_start, 'yyyy-MM-dd') cal_day\n" +
                " , date_format(window_start, 'HH:mm:ss') start_time\n" +
                " , date_format(window_end, 'HH:mm:ss') end_time\n" +
                " , count(uid) pv\n" +
                " , count(distinct uid) uv\n" +
                "FROM TABLE(\n" +
                // 每隔10秒触发一次计算,窗口大小为1天
//                "    CUMULATE(TABLE user_log, DESCRIPTOR(rowtime), INTERVAL '10' SECOND, INTERVAL '1' DAY))\n" +
                // 每隔10秒触发一次计算,窗口大小为10秒
                "    CUMULATE(TABLE user_log, DESCRIPTOR(rowtime), INTERVAL '10' SECOND, INTERVAL '10' SECOND))\n" +
                "  GROUP BY window_start, window_end\n" +
                ";";

        // 输出sql执行结果
        tableEnv.executeSql(result).print();
    }
}

六、sql-client方式执行Sql

# 建表语句
create table user_log
(
    uid  INT,
    event INT,
    logtime BIGINT,
    rowtime AS TO_TIMESTAMP_LTZ(logtime, 3) ,
    WATERMARK FOR rowtime AS rowtime - INTERVAL '5' SECOND
) with (
      'connector' = 'kafka',
      'topic' = 'userLog'
      'properties.bootstrap.servers' = 'hadoop01:9092',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json',
);

# pv、uv计算语句, 每隔10秒触发一次计算,窗口大小为1天
select
 date_format(window_start, 'yyyy-MM-dd') cal_day,
 date_format(window_start, 'HH:mm:ss') start_time,
 date_format(window_end, 'HH:mm:ss') end_time,
 count(uid) pv,
 count(distinct uid) uv
FROM TABLE(
    CUMULATE(TABLE user_log, DESCRIPTOR(rowtime), INTERVAL '10' SECOND, INTERVAL '1' DAY))
  GROUP BY window_start, window_end;

七、数据验证

  1. 启动 UserLogGenerator
  2. 启动 UserLogSql或在sql-client执行Sql
  3. 在MySQL中验证查询

转换时间戳

时间戳 转换前 转换后
w_start 2025-08-16 14:45:40 1755326740000
w_end 2025-08-16 14:45:50 1755326750000
select count(distinct uid) from user_log where logtime< 1755326750000 and logtime>=1755326740000;
# 与MySql中输出一致
                                                               SQL Query Result (Table)                                                               
 Refresh: 1 s                                                      Page: Last of 1                                              Updated: 23:50:09.972 

                        cal_day                     start_time                       end_time                   pv                   uv
                     2025-08-15                       23:45:30                       23:45:40                   15                   15
                     2025-08-15                       23:45:40                       23:45:50                  101                   45
                     2025-08-15                       23:45:50                       23:46:00                  104                   42
                     2025-08-15                       23:46:00                       23:46:10                  100                   42
                     2025-08-15                       23:46:10                       23:46:20                   97                   45
                     2025-08-15                       23:46:20                       23:46:30                  104                   40
                     2025-08-15                       23:46:30                       23:46:40                   97                   42
                     2025-08-15                       23:46:40                       23:46:50                   99                   44
                     2025-08-15                       23:46:50                       23:47:00                  103                   44
                     2025-08-15                       23:47:00                       23:47:10                   97                   44
                     2025-08-15                       23:47:10                       23:47:20                  100                   43

八、常见问题

  1. sql-client执行查询,缺少kafka包
# 运行SQL命令
Flink SQL> select * from user_log;
# 报错
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

解决方法

# 下载flink对应版本的kafka包,放到flink的lib目录下
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.0/flink-sql-connector-kafka-1.17.0.jar -P ${FLINK_HOME}/lib/

九、参考鸣谢

Flink 实时统计历史 pv、uv
Flink Cumulate Window


网站公告

今日签到

点亮在社区的每一天
去签到