Flink-1.19.2报错及解决方案

发布于:2025-08-11 ⋅ 阅读:(15) ⋅ 点赞:(0)

一、序列化问题
12:54:24.270 [main] INFO  o.a.f.a.j.t.TypeExtractor - [analyzePojo,2102] - No fields were detected for class java.util.concurrent.ConcurrentHashMap so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
12:54:24.282 [main] INFO  o.a.f.c.d.f.FromElementsGeneratorFunction - [serializeElements,98] - Serializing elements using  org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@7c93933
12:54:24.283 [main] INFO  o.a.f.a.j.t.r.k.KryoSerializer - [getKryoInstance,492] - Kryo serializer scala extensions are not available.

ConcurrentHashMap 无法作为 POJO 类型

​原因​:ConcurrentHashMap 的字段(如 threshold、loadFactor)不符合 Flink 的 POJO 规范(需公共字段或标准 getter/setter)。Flink 只能将其视为 GenericType,使用低效的 Kryo 序列化。
​影响​:序列化性能下降,且不支持 Schema 演进(如字段增减会导致状态恢复失败)

替换为 Flink 原生类型​:
避免直接使用 ConcurrentHashMap,改用 Tuple2 或符合 POJO 规范的自定义类存储键值对

示例:
// 错误示例
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 正确示例
DataStream<Tuple2<String, Integer>> stream = env.fromElements(Tuple2.of("key", 1));

显式声明类型信息​:

若必须使用 ConcurrentHashMap,需通过 returns() 指定类型信息,强制启用 Kryo 序列化:
DataStream<ConcurrentHashMap<String, Integer>> stream = env.fromCollection(mapList).returns(Types.GENERIC(ConcurrentHashMap.class));

java.time.LocalDateTime 无法作为 POJO 类型

​原因​:LocalDateTime 是 Java 内置类型,其内部字段(如 hour、minute)非公共字段,不满足 POJO 规范
​影响​:与 ConcurrentHashMap 相同,需回退到 Kryo 序列化,影响性能和状态兼容性。

注册 Flink 内置时间模块​:
在 Flink 配置中添加 JavaTimeModule,为 LocalDateTime 提供专用序列化器

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().registerTypeWithKryoSerializer(LocalDateTime.class, new JavaTimeModule());

​替换为 java.sql.Timestamp
若无需时区操作,优先使用 Flink 原生支持的 Timestamp 类型
Timestamp eventTime = Timestamp.valueOf("2025-08-07 14:30:45.123");

​Kryo 的 Scala 扩展不可用​

​原因​:未添加 Flink 的 Scala 扩展依赖(flink-scala),导致 Kryo 无法优化 Scala 集合类型(如 List、Map)的序列化
​影响​:Scala 集合类型使用低效的 Java 序列化,增加 CPU 开销和状态大小。

添加依赖​(Maven):
在 pom.xml 中添加与 Scala 版本匹配的 flink-scala 依赖:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala_2.12</artifactId> <!-- 匹配 Scala 2.12 -->
    <version>1.17.1</version>
</dependency>

​配置类加载顺序​:
在 flink-conf.yaml 中设置类加载策略,避免用户代码覆盖 Flink 核心类:
classloader.resolve-order: parent-first

二、按照要求添加了flink-scala依赖,还报下面的错误 
No fields were detected for class java.util.concurrent.ConcurrentHashMap so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
Exception in thread "main" java.lang.NoClassDefFoundError: com/twitter/chill/KryoBase

1. NoClassDefFoundError: com/twitter/chill/KryoBase
​原因​:
Flink 的 Scala 序列化扩展依赖于 Twitter Chill 库(com.twitter.chill),但你的项目未引入该依赖。
flink-scala 仅提供基础集成,Chill 库才是增强 Kryo 对 Scala 类型支持的核心组件。
​解决方法​:
​添加 Chill 依赖​(Maven):
<dependency>
    <groupId>com.twitter</groupId>
    <artifactId>chill_${scala.binary.version}</artifactId> <!-- 例如 chill_2.12 -->
    <version>0.10.0</version> <!-- 兼容 Flink 1.17+ -->
</dependency>

验证依赖传递​:
运行 mvn dependency:tree 检查是否包含 chill 和 kryo 的 JAR 包(如 chill-java、kryo-serializers)

2. ConcurrentHashMap 无法作为 POJO 类型

原因​:
ConcurrentHashMap 的字段不符合 Flink POJO 规范(无公共字段/标准 getter/setter),Flink 只能将其视为 GenericType,使用低效的 Kryo 序列化

解决方法​:
​替换为 Flink 原生类型​:
优先使用 Tuple、POJO 类或 DataStream<MapEntry<K,V>> 替代 ConcurrentHashMap。

// 错误用法
DataStream<ConcurrentHashMap<String, Integer>> stream = ... 
// 正确用法:转换为二元组流
DataStream<Tuple2<String, Integer>> stream = env.fromElements(Tuple2.of("key", 1));

显式注册 Kryo 序列化器​(若必须使用 ConcurrentHashMap):

env.getConfig().registerTypeWithKryoSerializer(
    ConcurrentHashMap.class, 
    new KryoSerializer<>(ConcurrentHashMap.class, env.getConfig())
);

​3. 依赖冲突与版本兼容性

Scala 版本一致性​:
确保所有依赖的 Scala 二进制版本(如 _2.11/_2.12)与 Flink 和 Chill 完全匹配。例如:

<properties>
    <scala.binary.version>2.12</scala.binary.version>
    <flink.version>1.17.1</flink.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- 其他依赖保持相同 Scala 版本 -->
</dependencies>

排除冲突的 Kryo 版本​:
若存在多个 Kryo 版本(如 Flink 自带 Kryo 与用户依赖冲突),在 Maven 中排除冲突:

<dependency>
    <groupId>com.example</groupId>
    <artifactId>conflicting-lib</artifactId>
    <version>1.0</version>
    <exclusions>
        <exclusion>
            <groupId>com.esotericsoftware</groupId>
            <artifactId>kryo</artifactId>
        </exclusion>
    </exclusions>
</dependency>

检查依赖树​:

mvn dependency:tree | grep -E "chill|kryo|scala"

输出应包含:
├─ com.twitter:chill_2.12:jar:0.10.0
├─ org.apache.flink:flink-scala_2.12:jar:1.17.1
└─ com.esotericsoftware:kryo:jar:4.0.2 (由 Flink 传递引入)

2、​启用序列化调试​:
在 Flink 配置中添加:
env.getConfig().enableForceKryo(); // 强制使用 Kryo
env.getConfig().setGlobalJobParameters(new Configuration().set(SerializationConfig.ALLOWED_OPTIONS, "Kryo"));

3、​日志验证​:
成功解决后,日志应显示:
Kryo serializer scala extensions are now available  // Scala 扩展已激活
Serializing elements using org.apache.flink...KryoSerializer  // ConcurrentHashMap 使用 Kryo 序列化

三、StreamExecutionEnvironment的registerTypeWithKryoSerializer是被废弃的状态,那么实现相同的功能该如何写呢

方案一:通过 TypeInformation 注册自定义序列化器(推荐)​​
Flink 推荐使用 TypeInformation 系统声明类型信息,并绑定自定义序列化器,避免直接操作 Kryo。
​代码示例​:

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.serialization.SerializerConfig;

// 1. 为自定义类型创建 TypeInformation
TypeInformation<MyCustomType> typeInfo = Types.POJO(MyCustomType.class);

// 2. 在 SerializerConfig 中注册序列化器
SerializerConfig serializerConfig = new SerializerConfig();
serializerConfig.registerTypeWithKryoSerializer(
    MyCustomType.class, 
    MyCustomKryoSerializer.class  // 需继承 com.esotericsoftware.kryo.Serializer
);

// 3. 将配置应用到 ExecutionConfig
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setSerializerConfig(serializerConfig);

优势​:

类型安全,减少运行时错误
支持 Flink 自动推断 POJO 结构(如字段类型、顺序)

方案二:使用 ExecutionConfig 的新注册接口

ExecutionConfig 提供了更灵活的注册方法,替代废弃的 registerTypeWithKryoSerializer。

ExecutionConfig config = env.getConfig();

// 注册自定义 Kryo 序列化器类
config.registerTypeWithKryoSerializer(
    MyCustomType.class, 
    MyCustomKryoSerializer.class
);

// 或注册序列化器实例(避免反射开销)
MyCustomKryoSerializer serializerInstance = new MyCustomKryoSerializer();
config.registerTypeWithKryoSerializer(
    MyCustomType.class, 
    serializerInstance
);
适用场景​:
需精细控制序列化逻辑(如自定义 Protobuf/Thrift 序列化器)
需复用序列化器实例以减少初始化开销。

​方案三:集成第三方序列化框架(如 Protobuf/Thrift)​​
对于复杂类型(如 Protobuf 生成的类),可直接注册 Flink 兼容的序列化器。
​步骤​:
​添加依赖​(Maven):
<!-- Protobuf 序列化支持 -->
<dependency>
    <groupId>com.twitter</groupId>
    <artifactId>chill-protobuf</artifactId>
    <version>0.10.0</version>
    <exclusions>
        <exclusion>
            <groupId>com.esotericsoftware</groupId>
            <artifactId>kryo</artifactId>
        </exclusion>
    </exclusions>
</dependency>
​注册序列化器​:
env.getConfig().registerTypeWithKryoSerializer(
    MyProtobufMessage.class,
    ProtobufSerializer.class  // 来自 chill-protobuf
);
优势​:
直接利用成熟序列化框架(如 Protobuf 的高效二进制编码)
避免手动实现序列化逻辑,减少错误风险。


网站公告

今日签到

点亮在社区的每一天
去签到