Flink实时流量统计:基于窗口函数与Redis Sink的每小时PV监控系统(学习记录)

发布于:2025-07-20 ⋅ 阅读:(16) ⋅ 点赞:(0)

题目:

利用flink统计网站浏览量,并写入redis。

利用窗口函数以及算子实现每小时PV(网站的页面浏览量)统计,对统计后结果数据格式进行设计,存储至Redis中(利用sink将处理后结果数据输出到redis数据库中)。

操作步骤:

1.redis在虚拟机上的安装

(1)下载redis安装包

cd /opt/software # 进入一个用于存放安装包的目录,可自行选择

wget http://download.redis.io/releases/redis-6.2.6.tar.gz

# 下载Redis 6.2.6版本,可根据需求更换版本号

(2)解压安装包——使用tar命令解压下载好的安装包

tar -zxvf redis-6.2.6.tar.gz

解压后会生成一个名为redis-6.2.6的目录。

(3)安装gcc编译工具

yum install -y gcc gcc-c++ make

(4)编译和安装 Redis

cd redis-6.2.6

make # 编译Redis,此过程可能需要一些时间,取决于虚拟机性能

make install # 安装Redis,默认会安装到/usr/local/bin目录下

(5)配置redis—— Redis 默认没有生成配置文件,需要手动创建相关目录和文件:

mkdir /etc/redis # 创建存放配置文件的目录

cp /opt/redis-6.2.6/redis.conf /etc/redis/ # 将示例配置文件复制到新目录下,路径根据实际解压位置调整

如图所示,redis安装成功并且进行了正常访问。

此处还需要关闭虚拟机上的防火墙,使redis的6379端口可以被正常访问。

2.代码展示

项目创建

在 IntelliJ IDEA 中,选择File -> New -> Project,然后选择Maven,按照向导创建一个新的 Maven 项目。在pom.xml文件中添加以下依赖:

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>flink-pv-redis</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <flink.version>1.13.6</flink.version>
        <redis.clients.version>3.8.0</redis.clients.version>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <!-- Flink 核心依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- Flink Redis 连接器 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.1.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- Redis 客户端依赖 -->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>${redis.clients.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

    </dependencies>

    <!-- 阿里云镜像 -->
    <repositories>
        <repository>
            <id>aliyunmaven</id>
            <name>阿里云公共仓库</name>
            <url>https://maven.aliyun.com/repository/public</url>
        </repository>
    </repositories>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>${maven.compiler.source}</source>
                    <target>${maven.compiler.target}</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

定义数据类型

创建一个 Java 类来表示UserBehavior,UserBehavior.csv文件的每一行包含userId、behavior等五个字段,以逗号分隔。

package Bean;

public class UserBehavior {
    private Long userId;
    private Long itemId;
    private Integer categoryId;
    private String behavior;  // 行为类型:"pv"为页面浏览
    private Long timestamp;   // 时间戳(毫秒)

    public UserBehavior(Long userId, Long itemId, Integer categoryId, String behavior, Long timestamp) {
        this.userId = userId;
        this.itemId = itemId;
        this.categoryId = categoryId;
        this.behavior = behavior;
        this.timestamp = timestamp;
    }

    public Long getUserId() {
        return userId;
    }

    public void setUserId(Long userId) {
        this.userId = userId;
    }

    public Long getItemId() {
        return itemId;
    }

    public void setItemId(Long itemId) {
        this.itemId = itemId;
    }

    public Integer getCategoryId() {
        return categoryId;
    }

    public void setCategoryId(Integer categoryId) {
        this.categoryId = categoryId;
    }

    public String getBehavior() {
        return behavior;
    }

    public void setBehavior(String behavior) {
        this.behavior = behavior;
    }

    public Long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }

}

编写Flink程序

创建一个主类,比如PvStatisticsToRedis.java,编写 Flink 程序来统计每小时 PV 并写入 Redis。

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.connector.redis.sink.RedisSink;
import org.apache.flink.connector.redis.sink.RedisSinkFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.util.Collector;
import redis.clients.jedis.Jedis;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

public class PvStatisticsToRedis {
    public static void main(String[] args) throws Exception {
        // 创建 Flink 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 从 CSV 文件读取数据,假设第一行是表头,这里简单跳过
        DataStream<UserBehavior> userBehaviorDataStream = env.readTextFile("/mnt/UserBehavior(1).csv")
                .skip(1)
                .map(new MapFunction<String, UserBehavior>() {
                    @Override
                    public UserBehavior map(String line) throws Exception {
                        String[] parts = line.split(",");
                        long userId = Long.parseLong(parts[0]);
                        String behavior = parts[1];
                        return new UserBehavior(userId, behavior);
                    }
                });

        // 过滤出 pv 行为的数据
        SingleOutputStreamOperator<Tuple2<String, Long>> pvStream = userBehaviorDataStream
                .filter(behavior -> "pv".equals(behavior.getBehavior()))
                .map(behavior -> Tuple2.of("pv", behavior.getUserId()))
                .returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 按照 "pv" 进行分组,并使用滑动窗口统计每小时的 PV 数量
        KeyedStream<Tuple2<String, Long>, String> keyedStream = pvStream.keyBy(t -> t.f0);
        SingleOutputStreamOperator<Map<String, Object>> pvCountStream = (SingleOutputStreamOperator<Map<String, Object>>) keyedStream
                .window(TumblingProcessingTimeWindows.of(Time.hours(1)))
                .process(new KeyedProcessFunction<String, Tuple2<String, Long>, Map<String, Object>>() {
                    private transient HashSet<Long> userIds;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        userIds = new HashSet<>();
                    }

                    @Override
                    public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Map<String, Object>> out) throws Exception {
                        userIds.add(value.f1);
                        Map<String, Object> result = new HashMap<>();
                        result.put("window_end_time", ctx.timerService().currentProcessingTime());
                        result.put("pv_count", userIds.size());
                        out.collect(result);
                    }

                    @Override
                    public void close() throws Exception {
                        userIds.clear();
                    }
                });

        // 将统计结果转换为字符串格式
        SingleOutputStreamOperator<String> resultStream = pvCountStream.map(new MapFunction<Map<String, Object>, String>() {
            @Override
            public String map(Map<String, Object> value) throws Exception {
                return "window_end_time: " + value.get("window_end_time") + ", pv_count: " + value.get("pv_count");
            }
        });

        // 配置 Redis 连接
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
                .setHost("localhost")
                .setPort(6379)
                .build();

        // 创建 Redis Sink
        RedisSinkFunction<String> redisSinkFunction = new RedisSinkFunction<>(conf, new RedisMapper<String>() {
            @Override
            public RedisCommandDescription getCommandDescription() {
                return new RedisCommandDescription(RedisCommand.LPUSH, "pv_statistics");
            }
            @Override
            public String getKeyFromData(String data) {
                return null;
            }

            @Override
            public String getValueFromData(String data) {
                return data;
            }
        });
        RedisSink<String> redisSink = new RedisSink<>(redisSinkFunction);

        // 将结果写入 Redis
        resultStream.addSink(redisSink);

        // 打印结果到控制台(可选)
        resultStream.addSink(new PrintSinkFunction<>());

        // 执行 Flink 任务
        env.execute("PV Statistics to Redis");
    }
}

资源文件中加载日志log4j.poverities,代码如下:

log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

运行效果:

3.sink输出到redis数据库

核心原理

Flink 的RedisSink通过自定义RedisMapper实现数据写入 Redis,主要需指定:

Redis 命令:如SET(存储键值对)、HSET(存储哈希)、LPUSH(存储列表)等。

键(Key):用于标识数据的唯一性(如按小时的 PV 统计可将window_end_time作为 Key)。

值(Value):需要存储的具体统计结果(如 PV 数量)。

具体实现步骤

假设统计结果为每小时的 PV 数,设计存储格式如下:

Redis 键(Key):pv_statistics:{window_end_time}(其中window_end_time为窗口结束时间戳,精确到小时)。

Redis 值(Value):该小时的 PV 总数(如12345)。

数据结构:采用String类型(通过SET命令存储),便于后续查询和聚合。

RedisMapper是 Flink 与 Redis 交互的核心接口,需实现 3 个方法:

getCommandDescription():指定 Redis 命令(如SET)。

getKeyFromData():从统计结果中提取 Redis 的 Key。

getValueFromData():从统计结果中提取 Redis 的 Value。

代码实现

RedisMapper<PvResult> redisMapper = new RedisMapper<PvResult>() {
    @Override
    public RedisCommandDescription getCommandDescription() {
        return new RedisCommandDescription(RedisCommand.HSET, "pv:hour");
    }
    @Override
    public String getKeyFromData(PvResult data) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHH");
        String key = sdf.format(new Date(data.getWindowStart()));
        System.out.println("Redis Key: " + key);
        return key;
    }
    @Override
    public String getValueFromData(PvResult data) {
        return String.valueOf(data.getCount());
    }
};
package Bean;
public class PvResult {
    private long windowStart; // 窗口开始时间(毫秒)
    private long windowEnd;   // 窗口结束时间(毫秒)
    private long count;       // 该窗口的 PV 数
    public PvResult() {}

    public PvResult(long windowStart, long windowEnd, long count) {
        this.windowStart = windowStart;
        this.windowEnd = windowEnd;
        this.count = count;
    }
    // Getters & Setters
    public long getWindowStart() { return windowStart; }
    public void setWindowStart(long windowStart) { this.windowStart = windowStart; }
    public long getWindowEnd() { return windowEnd; }
    public void setWindowEnd(long windowEnd) { this.windowEnd = windowEnd; }
    public long getCount() { return count; }
    public void setCount(long count) { this.count = count; }
}

配置 Redis 连接

通过FlinkJedisPoolConfig配置 Redis 连接信息(如主机、端口、密码等):

FlinkJedisPoolConfig redisConfig = new FlinkJedisPoolConfig.Builder()
        .setHost("192.168.100.20") // 虚拟机Redis IP
        .setPort(6379)
        .build();

注意,这里连接的是虚拟机上的redis,需要更改为虚拟机上的IP地址。

效果展示:


网站公告

今日签到

点亮在社区的每一天
去签到