package utils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @Package utils.KafkaUtil
* @Author xuang
* @Date 2025/5/13 16:20
* @description: kafka工具类
*/
public class KafkaUtil {
public static KafkaSink<String> getKafkaProduct(String servers, String topic) {
return KafkaSink.<String>builder()
.setBootstrapServers(servers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(topic)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.build();
}
public static DataStreamSource<String> getKafkaConsumer(StreamExecutionEnvironment env, String servers, String topic) {
// 配置 KafkaSource
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(servers)
.setTopics(topic)
.setGroupId("flink-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
// 将 KafkaSource 添加到作业
return env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
}
}
概述
KafkaUtil
是一个 Apache Flink 工具类,封装了 Kafka 作为数据源(Source)和数据汇(Sink)的常用操作,简化了 Flink 与 Kafka 集成的开发工作。
功能
实现细节
Kafka Sink 配置
Kafka Source 配置
最佳实践
Kafka Sink - 将 Flink 数据流写入 Kafka
Kafka Source - 从 Kafka 读取数据作为 Flink 数据源
使用
SimpleStringSchema
作为值的序列化器支持多服务器配置
需要指定目标 topic
使用
SimpleStringSchema
作为值的反序列化器从最早偏移量开始消费(
earliest()
)默认消费者组 ID 为 "flink-group"
不使用 watermark 策略
消费者组管理
对于生产环境,建议在调用处自定义消费者组 ID,而不是使用默认的 "flink-group"序列化扩展
对于复杂数据类型,可以扩展此类支持自定义序列化/反序列化器容错配置
生产环境中应考虑添加以下配置:重试策略
事务配置(精确一次语义)
检查点配置
安全配置
如果 Kafka 集群启用了安全认证,需要添加以下配置:SSL/TLS
SASL 认证
ACL 权限