上节成功实现了FlinkKafkaConsumer消费Kafka数据,并将数据写入到控制台,接下来将继续将计算的结果输入到redis中。
pom.xml
引入redis到pom包
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.17.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.17.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>1.17.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>1.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.maven</groupId>
<artifactId>maven-plugin-api</artifactId>
<version>2.0</version>
</dependency>
<dependency>
<groupId>org.apache.maven.plugin-tools</groupId>
<artifactId>maven-plugin-annotations</artifactId>
<version>3.2</version>
</dependency>
<dependency>
<groupId>org.codehaus.plexus</groupId>
<artifactId>plexus-utils</artifactId>
<version>3.0.8</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.2</version>
<scope>test</scope>
</dependency>
<!--mybatis坐标-->
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>3.4.5</version>
</dependency>
<!--mysql驱动坐标-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.6</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-plugin-plugin</artifactId>
<version>3.2</version>
<executions>
<execution>
<phase>package</phase>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
KafkaProducer.java 生产数据存入Kafka
同上一节,具体代码
package org.example.snow.demo5;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* @author snowsong
*/
public class KafkaTestProducer {
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
// Kafka 集群的初始连接地址
props.put("bootstrap.servers", "172.16.1.173:9092");
// 序列化器 将 Java 对象序列化为字节数组
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// kafka生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 消息循环
for (int i = 0; i < 50; i++) {
String key = "key-" + i;
String value = "value-" + i;
ProducerRecord<String, String> record = new ProducerRecord<>("xue", key, value);
producer.send(record);
System.out.println("send: " + key);
Thread.sleep(200);
}
// 关闭生产者
producer.close();
}
}
启动服务类
Flink消费Kafka,并将结果存入redis。
设置FlinkRedisConfig
// 配置 Redis 连接池,设置 Redis 服务器地址和端口并构建对象
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig
.Builder()
.setHost(REDIS_SERVER)
.setPort(REDIS_PORT)
.build();
// 创建 RedisSink 对象,用于将数据写入 Redis
RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());
// 将 RedisSink 添加到数据流中,作为数据的接收端
wordData.addSink(redisSink);
MyRedisMapper
它实现了 RedisMapper 接口,用于自定义 Redis 数据的映射规则。MyRedisMapper 类用于将 Flink 数据流中的 Tuple2 对象映射到 Redis 命令中。
public static class MyRedisMapper implements RedisMapper<Tuple2<String,String>> {
/**
* 获取当前命令的描述信息。
*
* @return 返回Redis命令的描述信息对象,其中包含了命令的类型为LPUSH。
*/
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.LPUSH);
}
/**
* 从给定的Tuple2数据中获取键。
*
* @param data 一个包含两个字符串元素的Tuple2对象
* @return 返回Tuple2对象的第一个元素,即键
*/
@Override
public String getKeyFromData(Tuple2<String,String> data) {
return data.f0;
}
/**
* 从给定的元组中获取第二个元素的值。
*
* @param data 一个包含两个字符串元素的元组
* @return 元组中的第二个元素的值
*/
@Override
public String getValueFromData(Tuple2<String,String> data) {
return data.f1;
}
starApp的完整代码如下:
package org.example.snow.demo5;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import java.util.Properties;
/**
* @author snowsong
*/
public class StartApp {
private static final String REDIS_SERVER = "0.0.0.0";
private static final Integer REDIS_PORT = 6379;
public static void main(String[] args) throws Exception {
// 初始化
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 配置 Kafka 客户端的连接参数
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "172.16.1.173:9092");
FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer<>("xue",
new SimpleStringSchema(), properties);
DataStreamSource dataStreamSource = env.addSource(flinkKafkaConsumer);
// 将接收的数据映射为二元组
SingleOutputStreamOperator<Tuple2<String, String>> wordData = dataStreamSource.map(new MapFunction<String, Tuple2<String, String>>() {
/**
* 将输入的字符串映射为 Tuple2 对象。
*
* @param value 输入的字符串
* @return 一个包含两个元素的 Tuple2 对象,第一个元素为 "l_words",第二个元素为输入的字符串
* @throws Exception 如果发生异常,则抛出该异常
*/
@Override
public Tuple2<String, String> map(String value) throws Exception {
return new Tuple2<>("l_words", value);
}
});
// 配置 Redis 连接池,设置 Redis 服务器地址和端口并构建对象
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig
.Builder()
.setHost(REDIS_SERVER)
.setPort(REDIS_PORT)
.build();
// 创建 RedisSink 对象,用于将数据写入 Redis
RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());
// 将 RedisSink 添加到数据流中,作为数据的接收端
wordData.addSink(redisSink);
env.execute();
}
/**
* MyRedisMapper 类用于将 Flink 数据流中的 Tuple2 对象映射到 Redis 命令中。
* 它实现了 RedisMapper 接口,用于自定义 Redis 数据的映射规则。
*/
public static class MyRedisMapper implements RedisMapper<Tuple2<String,String>> {
/**
* 获取当前命令的描述信息。
*
* @return 返回Redis命令的描述信息对象,其中包含了命令的类型为LPUSH。
*/
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.LPUSH);
}
/**
* 从给定的Tuple2数据中获取键。
*
* @param data 一个包含两个字符串元素的Tuple2对象
* @return 返回Tuple2对象的第一个元素,即键
*/
@Override
public String getKeyFromData(Tuple2<String,String> data) {
return data.f0;
}
/**
* 从给定的元组中获取第二个元素的值。
*
* @param data 一个包含两个字符串元素的元组
* @return 元组中的第二个元素的值
*/
@Override
public String getValueFromData(Tuple2<String,String> data) {
return data.f1;
}
}
}