一、环境版本
环境 | 版本 |
---|---|
Flink | 1.17.0 |
Kafka | 2.12 |
MySQL | 5.7.33 |
二、MySQL建表脚本
create table user_log
(
id int auto_increment comment '主键'
primary key,
uid int not null comment '用户id',
event int not null comment '用户行为',
logtime bigint null comment '日志时间'
)
comment '用户日志表,作为验证数据源';
三、用户日志类
新建maven项目
用以定义Kafka和MySQL中Schema
/**
* 用户日志类
*/
@Data
public class UserLog {
//用户uid
private int uid;
//用户行为
private int event;
//日志时间
private Date logtime;
//获取日期,用于按日期统计数据
public String getFormatDate() {
return DateUtil.format(logtime, "yyyyMMdd");
}
//获取时间,精确到分钟
public String getFormatTime() {
return DateUtil.format(logtime, "yyyy-MM-dd HH:mm") + ":00";
}
}
}
四、用户数据生成器
/**
* 用户数据生成器
*/
public class UserLogGenerator {
public static void main(String[] args) throws Exception {
// 1.获取执行环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2.自定义数据生成器Source
DataGeneratorSource<UserLog> dataGeneratorSource = new DataGeneratorSource<>(
// 指定GeneratorFunction 实现类
new GeneratorFunction<Long, UserLog>(){
// 定义随机数数据生成器
public RandomDataGenerator generator;
@Override
public void open(SourceReaderContext readerContext) throws Exception {
generator = new RandomDataGenerator();
}
@Override
public UserLog map(Long aLong) throws Exception {
UserLog userLog = new UserLog();
//随机生成用户uid
userLog.setUid(generator.nextInt(1, 50));
//随机生成用户行为
userLog.setEvent(generator.nextInt(1, 2));
//随机生成用户数据时间
userLog.setLogtime(DateUtil.offset(new DateTime(), DateField.MILLISECOND, generator.nextInt(-2000, 2000)));
return userLog;
}
},
// 指定输出数据的总行数
// 60 * 60 * 10,
120,
// 指定每秒发射的记录数
RateLimiterStrategy.perSecond(10),
// 指定返回值类型, 将Java的StockPrice封装成到TypeInformation
TypeInformation.of(UserLog.class)
);
DataStreamSource<UserLog> dataGeneratorSourceStream = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "dataGeneratorSource");
//输出生成数据
// dataGeneratorSourceStream.print();
//kafka数据写入
KafkaSink<UserLog> kafkaSink = KafkaSink.<UserLog>builder()
.setBootstrapServers("hadoop01:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.<UserLog>builder()
.setTopic("userLog")
.setValueSerializationSchema((SerializationSchema<UserLog>) userLog -> JSONUtil.toJsonStr(userLog).getBytes())
.build()
).build();
dataGeneratorSourceStream.sinkTo(kafkaSink);
//MySQL数据写入,用以数据验证
SinkFunction<UserLog> jdbcSink = JdbcSink.sink(
"insert into user_log (uid, event, logtime) values (?, ?, ?)",
new JdbcStatementBuilder<UserLog>() {
@Override
public void accept(PreparedStatement preparedStatement, UserLog userLog) throws SQLException {
preparedStatement.setInt(1, userLog.getUid());
preparedStatement.setInt(2, userLog.getEvent());
preparedStatement.setLong(3, userLog.getLogtime().getTime());
}
}
,
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://192.168.31.116:3306/demo")
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUsername("root")
.withPassword("root")
.build()
);
dataGeneratorSourceStream.addSink(jdbcSink);
env.execute();
}
}
五、DataStream按分钟或日期统计PV和UV
/**
* 计算PV和UV
*/
public class UserLogPVUVCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("hadoop01:9092")
.setTopics("userLog")
.setValueOnlyDeserializer(new SimpleStringSchema())
.setStartingOffsets(OffsetsInitializer.earliest())
.build();
DataStreamSource<String> kafkaSourceStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkasource");
// kafkaSourceStream.print();
//kafka数据反序列化
SingleOutputStreamOperator<UserLog> userLogStream = kafkaSourceStream.map(s -> JSONUtil.toBean(s, UserLog.class));
//计算pv和uv,按日期统计时需将getFormatTime改为getFormatDate,并且注释‘一分钟为窗口’代码和反注释‘一天为窗口’代码
SingleOutputStreamOperator<Tuple3<String, String, Integer>> userPVUVStream =
userLogStream.keyBy((KeySelector<UserLog, String>) UserLog::getFormatTime)
// 一天为窗口,指定时间起点比时间戳时间早8个小时
// .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
// 一分钟为窗口
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
// 10s触发一次计算,更新统计结果
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
// 剔除超过时间范围的数据
.evictor(TimeEvictor.of(Time.seconds(0),true))
// 计算pv uv
.process(new MyProcessWindowFunction());
userPVUVStream.print();
env.execute();
}
}
/**
* 自定义窗口处理函数,计算PV和UV
*/
public class MyProcessWindowFunction extends ProcessWindowFunction<UserLog, Tuple3<String, String, Integer>, String, TimeWindow> {
// UV
private transient MapState<Integer, String> uvState;
// PV
private transient ValueState<Integer> pvState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
uvState = this.getRuntimeContext().getMapState(new MapStateDescriptor<>("uv", Integer.class, String.class));
pvState = this.getRuntimeContext().getState(new ValueStateDescriptor<>("pv", Integer.class));
//ttl过期机制
StateTtlConfig ttlConfig = StateTtlConfig
//1分钟过期
.newBuilder(Time.minutes(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
// 开启ttl
ValueStateDescriptor<Integer> pvStateDescriptor = new ValueStateDescriptor<>("pv", Integer.class);
MapStateDescriptor<Integer, String> uvStateDescriptor = new MapStateDescriptor<>("uv", Integer.class, String.class);
pvStateDescriptor.enableTimeToLive(ttlConfig);
uvStateDescriptor.enableTimeToLive(ttlConfig);
pvState = this.getRuntimeContext().getState(pvStateDescriptor);
uvState = this.getRuntimeContext().getMapState(uvStateDescriptor);
}
@Override
public void process(String s, ProcessWindowFunction<UserLog, Tuple3<String, String, Integer>, String, TimeWindow>.Context context, Iterable<UserLog> iterable, Collector<Tuple3<String, String, Integer>> collector) throws Exception {
Integer pv = 0;
Iterator<UserLog> iterator = iterable.iterator();
while (iterator.hasNext()){
pv = pv + 1;
Integer userId = iterator.next().getUid();
uvState.put(userId,null);
}
pvState.update((pvState.value() == null ? 0 : pvState.value()) + pv);
int uv = 0;
Iterator<Integer> uvIterator = uvState.keys().iterator();
while (uvIterator.hasNext()){
uvIterator.next();
uv = uv + 1;
}
collector.collect(Tuple3.of(s, "uv", uv));
collector.collect(Tuple3.of(s, "pv", pvState.value()));
}
}
六、数据验证
- 启动 UserLogGenerator
- 启动 UserLogCount
(2025-08-13 10:37:00,uv,45)
(2025-08-13 10:37:00,pv,118)
(2025-08-13 10:36:00,uv,2)
(2025-08-13 10:36:00,pv,2)
- 在MySQL中验证查询
转换时间戳
时间戳 | 转换前 | 转换后 |
---|---|---|
w_start | 2025-08-13 10:36:00 | 1755052560000 |
w_end | 2025-08-13 10:37:00 | 1755052620000 |
# 与Flink输出一致
select count(distinct uid) from user_log where logtime< 1755052620000 and logtime>=1755052560000;
select count(distinct uid) from user_log where logtime>= 1755052620000 ;
七、POM文件
<project>
<groupId>dblab</groupId>
<artifactId>demo</artifactId>
<modelVersion>4.0.0</modelVersion>
<name> </name>
<packaging>jar</packaging>
<version>1.0</version>
<repositories>
<repository>
<id>central-repos</id>
<name>Central Repository</name>
<url>http://repo.maven.apache.org/maven2</url>
</repository>
<repository>
<id>alimaven</id>
<name>aliyun maven</name>
<url>https://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
</repositories>
<properties>
<flink.version>1.17.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-connector-files</artifactId>-->
<!-- <version>${flink.version}</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-datagen</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-connector-files</artifactId>-->
<!-- <version>${flink.version}</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-csv</artifactId>-->
<!-- <version>${flink.version}</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.1.1-1.17</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.39</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>