6.1 本章概述
本章内容介绍基于 Flink 1.14.x 版本开发流计算案例。这个案例中我们将 kafka 作为数据源,启动 Flink 任务以后,将会监听 kafka 的特定的一个或多个 TOPIC,并根据消息内容进行计算。
这是一种实时计算场景,即数据一旦流入 kafka ,就触发计算条件,也就是 flink 官方一直强调的 “流批一体” 的概念的一种体现。这里与批计算的差别也非常明显,即无需等待凑足数据以后再批量执行。
我们的例子非常简单:
- flink 任务启动后,将写入 kafka 中的字符串打印到控制台;
- flink 任务启动后,将写入 kafka 的 json 格式数据进行反序列化,转换为 实体类,然后将满足特定条件的实体打印到控制台。
相关内容可以概述为:
- 明确 flink 与 对应 kafka 、jdk 版本之间的适配关系;
- 明确 flink 开发时,
env.fromSource
与env.addSource
的区别; - 明确 flink 监听 kafka topic 的基本方法;
- 了解 flink 监听 kafka 中的topic未创建这种情况,以及对应的处理方法;
- 了解 flink 的
MultipleParameterTool
的用法(用于任务的启动参数); - 了解 kafka 客户端的使用方法;
- 明确 flink 监听 kafka 中消息的反序列操作(实际业务场景中也是直接操作实体类,而不是字符串)。
6.2 效果展示
6.2.1 打印生产者写入 kafka 的字符串
flink-kafka 简单例子
6.2.2 将生产者写入的字符串反序列化为实体类
flink 接收kafka消息并反序列化
6.3 环境准备
相关环境基础包括:
- Oracle JDK / OpenJDK 1.8.x ;
- maven 环境,开发时需要通过 maven 下载相关包;
- kafka
6.3.1 检查 jdk 版本
确保是 1.8 版本
$ java -versio
$ javac -version
6.3.2 本地启动 kafka
确保 kafka 的版本为 2.x
。
cd
到 kafka 所在目录,启动 zookeeper
$ bin/zookeeper-server-start.sh config/zookeeper.properties
接着再打开一个命令行窗口,启动 kafka
$ bin/kafka-server-start.sh config/server.properties
注意
:以上内容均是基于 macOS 以及 linux 的命令,windows 下基本一致,请查阅相关资料。
6.3.2 启动 kafka 生产者客户端
cd
到 kafka 所在目录
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
接下来将会基于此控制台与 flink 进行交互,即输入字符串,flink job 读取字符串。
6.4 代码编写
具体依赖请参考源码仓库:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>quick-start-kafka</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<scala.binary.version>2.12</scala.binary.version>
<lombok.version>1.18.30</lombok.version>
<flink.version>1.14.6</flink.version>
<slf4j.version>2.0.9</slf4j.version>
<logback.version>1.3.11</logback.version>
<junit.version>4.13.2</junit.version>
</properties>
<dependencies>
<!-- flink 相关 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- 编译工具 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<!-- log 相关 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
<scope>provided</scope>
</dependency>
<!-- test 相关 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<!-- Flink Kafka Connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.48</version>
</dependency>
</dependencies>
</project>
6.4.1 打印生产者写入 kafka 的字符串
package cn.smileyan.demos;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* flink 使用 kafka 作为数据源的简单例子
* @author Smileyan
*/
@Slf4j
public class FlinkKafkaExample {
/**
* 参数解释:
* -bs broker 地址
* -kcg kafka consumer group
* -it kafka 输入数据 topic
* -ct 是否自动创建 topic
* -pt topic 分区数
* -rf topic 副本数
*/
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final MultipleParameterTool cmd = MultipleParameterTool.fromArgs(args);
final String bootstrapServer = cmd.get("bs", "localhost:9092");
final String kafkaConsumerGroup = cmd.get("kcg", "flink-consumer");
final String inputTopic = cmd.get("it", "quickstart-events");
final boolean createTopic = cmd.getBoolean("ct", false);
log.info("broker is {} and topic is {}", bootstrapServer, inputTopic);
// 如果 topic 不存在,并且开启了由 flink 任务创建 TOPIC。默认不开启,一般情况下,部署人员应当根据实际情况设置不同topic的并行度,副本数
if (createTopic) {
final int partitions = cmd.getInt("pt", 1);
final short replicationFactor = cmd.getShort("rf", (short) 1);
createTopic(bootstrapServer, inputTopic, partitions, replicationFactor);
}
final KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setGroupId(kafkaConsumerGroup)
.setStartingOffsets(OffsetsInitializer.latest())
.setBootstrapServers(bootstrapServer)
.setTopics(inputTopic)
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
final DataStreamSource<String> kafkaStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");
kafkaStream.print();
env.execute("Flink Kafka Example");
}
/**
* 如果 TOPIC 不存在则创建该 TOPIC
* @param bootstrapServer kafka broker 地址
* @param topic 想要创建的 TOPIC
* @param partitions 并行度
* @param replicationFactor 副本数
*/
public static void createTopic(String bootstrapServer,
String topic,
int partitions,
int replicationFactor) throws ExecutionException, InterruptedException {
Properties adminProperties = new Properties();
adminProperties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
try (AdminClient adminClient = AdminClient.create(adminProperties)) {
if (!adminClient.listTopics().names().get().contains(topic)) {
NewTopic newTopic = new NewTopic(topic, partitions, (short) replicationFactor);
adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
log.info("created topic: {}", topic);
}
}
}
}
6.4.2 将生产者写入的字符串反序列化为实体类
package cn.smileyan.demos;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Collections;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* 实体类序列化
* @author smileyan
*/
@Slf4j
public class FlinkKafkaEntityExample {
/**
* 参数解释:
* -bs broker 地址
* -kcg kafka consumer group
* -it kafka 输入数据 topic
* -ct 是否自动创建 topic
* -pt topic 分区数
* -rf topic 副本数
*/
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final MultipleParameterTool cmd = MultipleParameterTool.fromArgs(args);
final String bootstrapServer = cmd.get("bs", "localhost:9092");
final String kafkaConsumerGroup = cmd.get("kcg", "flink-consumer");
final String inputTopic = cmd.get("it", "quickstart-events");
final boolean createTopic = cmd.getBoolean("ct", false);
log.info("broker is {} and topic is {}", bootstrapServer, inputTopic);
// 如果 topic 不存在,并且开启了由 flink 任务创建 TOPIC。默认不开启,一般情况下,部署人员应当根据实际情况设置不同topic的并行度,副本数
if (createTopic) {
final int partitions = cmd.getInt("pt", 1);
final short replicationFactor = cmd.getShort("rf", (short) 1);
createTopic(bootstrapServer, inputTopic, partitions, replicationFactor);
}
final KafkaSource<Student> kafkaSource = KafkaSource.<Student>builder()
.setGroupId(kafkaConsumerGroup)
.setStartingOffsets(OffsetsInitializer.latest())
.setBootstrapServers(bootstrapServer)
.setTopics(inputTopic)
.setValueOnlyDeserializer(new CommonDeserializationSchema<>(Student.class))
.build();
final DataStreamSource<Student> kafkaStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");
// 过滤掉反序列化失败的对象,只保留正确的对象
SingleOutputStreamOperator<Student> out1 = kafkaStream.filter(Objects::nonNull)
.map(student -> {
log.info("filter none objects is {}", student);
return student;
});
// 只选择年纪小于 10 的对象
out1.filter(student -> student.getAge() != null && student.getAge() < 10)
.map(student -> {
log.info("filter age < 10: {}", student);
return student;
});
env.execute("Flink Kafka Example");
}
/**
* 如果 TOPIC 不存在则创建该 TOPIC
* @param bootstrapServer kafka broker 地址
* @param topic 想要创建的 TOPIC
* @param partitions 并行度
* @param replicationFactor 副本数
*/
public static void createTopic(String bootstrapServer,
String topic,
int partitions,
int replicationFactor) throws ExecutionException, InterruptedException {
Properties adminProperties = new Properties();
adminProperties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
try (AdminClient adminClient = AdminClient.create(adminProperties)) {
if (!adminClient.listTopics().names().get().contains(topic)) {
NewTopic newTopic = new NewTopic(topic, partitions, (short) replicationFactor);
adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
log.info("created topic: {}", topic);
}
}
}
@Data
static class Student {
private String name;
private Integer age;
}
}
package cn.smileyan.demos;
import com.alibaba.fastjson2.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
* 将字节码数据进行序列化
* @author smileyan
* @param <O> 实体类
*/
@Slf4j
public class CommonDeserializationSchema<O> implements DeserializationSchema<O> {
private final Class<O> clazz;
public CommonDeserializationSchema(Class<O> clazz) {
this.clazz = clazz;
}
@Override
public O deserialize(byte[] message) {
try {
String str = new String(message, StandardCharsets.UTF_8);
log.info("kafka received message: {}", str);
return JSON.parseObject(str, clazz);
} catch (Exception e) {
log.error(e.getMessage());
}
return null;
}
@Override
public boolean isEndOfStream(O nextElement) {
return false;
}
@Override
public TypeInformation<O> getProducedType() {
return TypeInformation.of(clazz);
}
}
6.5 相关问题
6.5.1 jdk / flink / kafka 的版本适配问题
jdk:1.8.x
flink: 1.14.x,不能使用 1.15 以及以上的版本,因为依赖的 jdk 版本发生了变化(jdk 11)
kafka: 2.x.x ,不能使用 3.x,因为依赖的 jdk 版本发生了变化(jdk11)。
6.5.2 env.fromSource 以及 env.addSource 的区别
Flink 中 env.fromSource()
和 env.addSource()
方法都是用于创建数据流(DataStream
),即从不同的数据源引入数据到 Flink 流处理或批处理作业中。虽然它们的目的相似,但具体的使用方式和语境有所不同:
env.addSource(sourceFunction)
方法签名与参数:
addSource
是StreamExecutionEnvironment
类的一个方法,它接收一个实现了SourceFunction
接口的对象作为参数。SourceFunction
是 Flink 提供的一个通用接口,用于定义数据源的逻辑,包括如何初始化数据源、如何产生数据以及如何正确清理资源。用途:
addSource
主要用于自定义数据源或者使用 Flink 提供的某些特定数据源,这些数据源可能没有提供更高级别的抽象或者封装。通过实现SourceFunction
,开发者可以完全控制数据读取的细节,如从文件、网络套接字、自定义服务等非标准或非常规数据源读取数据。灵活性与复杂性:
使用addSource
的方式提供了极大的灵活性,因为可以直接编写代码来处理数据源的各种特性和行为。然而,这也意味着需要更多手动编码工作,包括处理错误恢复、并行化(如果支持)、数据分区等复杂任务。对于容错和并行读取的支持通常需要在SourceFunction
实现中集成相应的机制。
env.fromSource(source)
方法签名与参数:
fromSource
方法同样属于StreamExecutionEnvironment
类,但其参数通常是某个具体数据源类的实例,而非SourceFunction
接口。这里的source
参数往往代表一个已经封装好的、针对特定数据源类型的高级抽象,如FlinkKafkaConsumer
、SocketTextStreamFunction
等。用途:
fromSource
通常用于直接使用 Flink 内置或社区提供的对常见数据源(如 Apache Kafka、文件系统、数据库连接器等)的预封装支持。这些封装好的数据源类通常会隐藏底层复杂性,提供友好的配置选项,并且内置了对容错、并行读取等特性的支持。便捷性与标准化:
使用fromSource
方法更为便捷,因为它针对常见的数据源类型提供了开箱即用的解决方案。开发者无需从头实现复杂的SourceFunction
,只需配置必要的参数(如 Kafka 主题名、数据库连接信息等)即可快速接入数据源。这种做法遵循 Flink 的最佳实践,确保了与 Flink 生态系统的良好集成以及对数据源特性的有效利用。
总结:
env.addSource(sourceFunction)
适用于需要自定义数据源逻辑、处理非标准数据源或对数据源控制有特殊需求的情况。使用时需要自行实现SourceFunction
,处理数据读取、错误恢复、并行化等细节。env.fromSource(source)
适用于对接已知、常见的数据源,如 Kafka、文件、数据库等,利用 Flink 提供的预封装数据源类。这种方式简化了开发过程,提供了更好的容错性和易用性,但可能不支持所有定制化需求。flink 官方文档中,更加推荐使用
env.fromSource
处理我们前面提到的场景。
6.6 本章小结
本章举了两个例子,介绍 Flink 流计算中最常见的场景:基于 Kafka 通讯。实际业务场景中我们将这个通讯过程称为 “下发任务”。比如后端同事开发一个接口,触发后将实际业务需求打包成为一个实体类(Task),写入 kafka,大数据平台(Flink)通过监听这个 kafka 消息进行驱动计算过程,并且将算法结果输出到 kafka 或者 elastic search 等。
由于实际业务场景、现场部署环境的差别,我们通过引入 MultipleParameterTool 来调整相关参数,从而更加灵活。
最后还有序列化的过程,即将 json 格式的字符串,转换为我们需要的实体类,进而完成更加复杂的操作。
感谢阅读 ~
感谢点赞 ~