文章目录
-
-
-
- 1.kafka stream 使用介绍
- 2. kafka stream的基本操作
- 3.KTable
- 4.Serialization(序列化)
- 5.Joins(关联)
- 6.有状态操作(stateful Operations)
- 7.窗口(windowing)
- 8. 时间概念(Time concepts)
- 9. Processor API
- 10.kafka 流式测试(kafka stream testing)
- 11. kafka stream 错误处理
- 12.kafka stream 实现原理(Internals)
- 13.有状态容错(kafka stream Stateful Fault Tolerance)
- 14.交互式查询(Interactive Que)
-
-
1.kafka stream 使用介绍
kafka streaming engine
Kafka Streams 是对生产者和消费者的一种抽象,它让你可以忽略底层细节,专注于处理 Kafka 数据。由于它是声明式的,用 Kafka Streams 编写的处理代码比用底层 Kafka 客户端编写的相同代码要简洁得多。
Kafka Streams 是一个 Java 库:编写代码,创建 JAR 文件,然后启动独立的应用程序,该应用程序与 Kafka 之间进行记录的流式传输(它不会和 broker 运行在同一个节点上)。可以在从笔记本电脑到大型服务器的任何设备上运行Kafka Streams。
kafka处理事件的方法:
1.处理事件采用生产者消费者客户端实现:
2.处理事件采用kafka streams
连接器(connector是对生产者和消费者的一种抽象。需要将数据库记录导出到 Kafka,可以配置一个源连接器(source connector)来监听特定的数据库表,当有记录进入时,该连接器会将这些记录提取出来并发送到 Kafka。 sink 连接器的作用则相反:例如,如果想将记录写入像 MongoDB 这样的外部存储,sink连接器会在记录进入主题时从该主题读取记录,并将其转发到你的 MongoDB 实例
采用kafka stream处理事件比采用kafka consumer 和producer客户端实现处理事件代码简洁许多
2. kafka stream的基本操作
Event Stream(事件流):
事件代表与某个动作相对应的数据,例如通知或状态转移。事件流是事件记录的一个无界集合。
Key-Value Pairs(键值对):
Apache Kafka 以键值对的形式工作,在事件流中,具有相同键的记录彼此之间没有任何关系。例如,下图显示了四条独立的记录,尽管其中两个键是相同的:
kafka stream processor topology(拓扑):
kafka stream processor topology是一个DAG(有向图),每个 Kafka Streams 拓扑都有一个source processor,用于从 Kafka 读取记录。在其下方是任意数量的子处理器,这些子处理器执行某种操作。一个子节点可以有多个父节点,一个父节点也可以有多个子节点。所有这些最终都流向一个 sink processor,该处理器将数据写入Kafka。
实现kafka stream 应用的步骤:
KStream是Kafka Streams DSL 的一部分,也是主要流主要的使用构成
kafka stream主要操作:
kafka stream 基本操作代码实现:
3.KTable
update streams(流更新):如果有一条新记录的键与现有记录的键相同,那么现有记录将会被覆盖。
ktable
使用KTable 时,键是必需的,使用 KStream时则不需要。通过覆盖记录,即使给定相同的源记录,KTable也会创建与KStream完全不同的数据结构。
ktable的操作:
与 KStream 一样,mapValues 用于转换值,而 map 允许同时转换键和值。
与 KStream 一样,过滤操作允许提供一个谓词,只有与该谓词匹配的记录才会被转发到拓扑中的下一个节点。
Global KTable
KTable 和 GlobalKTable 主要区别在于:KTable 会在 Kafka Streams 实例之间对数据进行分片(shards),而 GlobalKTable 会向每个实例分发一份完整的数据副本。通常,会将 GlobalKTable与查找数据一起使用。此外,在GlobalKTable和 KStream 之间进行连接时还存在一些差异;
kafka stream KTable基本操作代码实现:
4.Serialization(序列化)
序列化对于Apache Kafka而言十分重要,Kafka broker仅处理字节。Kafka以字节形式存储记录,当消费者发出获取请求时,Kafka会以字节形式返回记录。实际上,broker对其记录一无所知,它只是将这些记录追加到文件末尾,仅此而已。
自定义Serdes
已经存在的serdes:
5.Joins(关联)
stream join代码实现
6.有状态操作(stateful Operations)
kafka stream Aggregation 代码实现
7.窗口(windowing)
窗口
窗口化允许按时间对有状态操作进行分桶,否则聚合数据将无休止地累积。窗口提供给定时间范围内聚合数据的快照,并且可以设置为跳跃、滚动、会话或滑动。
跳跃窗口
跳跃窗口受时间限制:可以定义窗口的大小,但窗口会以小于窗口大小的增量递增,因此最终会出现窗口重叠的情况。例如,窗口大小可能为 30 秒,递增幅度为 5 秒。数据点可以属于多个窗口。
滚动窗口:一种特殊的跳跃窗口
前进大小与窗口大小相同。只需定义一个 30 秒的窗口大小。30 秒结束后,你会得到一个时长为 30 秒的新窗口。不会得到重复的结果。
会话窗口:由用户活动定义
对于会话窗口,需要定义一个不活动间隔(inactivityGap)。只要在这个不活动间隔内有记录进入,会话就会持续延长。所以从理论上讲,如果正在跟踪某件事,并且它是一个非常活跃的键,那么会话将会不断扩大。
滑动窗口
滑动窗口受时间约束,但其端点由用户活动决定。因此,需要创建流并设置两个记录之间的最大时间差,以使它们能够包含在第一个窗口中。
窗口宽限期
除了由行为驱动的会话窗口外,所有窗口都具有宽限期的概念。宽限期是窗口大小的扩展。具体来说,它允许将时间戳大于窗口结束时间(但小于窗口结束时间加上宽限期)的事件纳入窗口计算中
kafka stream windowing(窗口)代码实现
8. 时间概念(Time concepts)
时间概念:
时间戳(timestamps)是 Apache Kafka的关键组件,同样驱动着Kafka Streams 的行为。可以将时间戳配置为按照事件时间event time(默认)或日志追加时间(log-append time)。
Event time:
使用事件时间,如果不添加自己的时间戳,生产者会自动创建一个带有生产者环境当前时间的时间戳。
Log-Append Time:
使用日志附加时间,当记录到broker时,broker将记录附加到日志时用其自己的时间戳(broker环境的当前时间)覆盖生产者记录的时间戳。
时间戳驱动kafka stream的操作
在窗口模块中窗口操作是由记录时间戳驱动的,而不是由挂钟时间驱动的。在Kafka Streams 中,首先选择所有分区中最早的时间戳进行处理,并使用TimeStampExtractor接口从当前记录中获取时间戳。
默认行为是使用ConsumerRecord中的时间戳,该记录的时间戳由producer或broker设置。TimeStampExtractor 的默认实现是FailOnInvalidTimestamp,这意味着如果获取的时间戳小于零,它将抛出异常。如果使用嵌入在记录键或值本身中的时间戳,可以提供自定义的TimeStampExtractor。
kafka Stream Time
根据定义,流时间是目前为止所见到的最大时间戳(timestamps),只能向前移动,不能向后倒退。如果一个乱序的记录到达(意味着该记录的时间戳早于当前的流时间,但仍处于窗口时间加宽限期内),流时间将保持不变。 迟到的记录其时间戳超出了窗口时间和宽限期的总和。记录的延迟是通过流时间减去事件时间戳来确定的。
kafka stream time 代码实现
9. Processor API
processor API
Processor API 可以访问状态存储,以进行自定义的有状态操作。在 “有状态操作” 模块中,了解了状态和 DSL,DSL使用的是底层状态存储,而无法直接访问这些存储。借助Processor API,可以创建一个传入的存储,因此拥有对该存储的完全访问权:可以提取所有记录,可以执行范围查询,还可以进行各种各样的操作。也可以通过编程方式调用提交,并且可以控制调用的频率。
Punctuation(标点)
Punctuation能够调度任意操作。当调度一个标点生成器时,它既可以使用流时间处理,也可以使用挂钟时间处理。
流时间处理(Stream-Time Processing)
在流时间处理中,可以为想要定期触发的任意操作调度标点 ,例如,每 30 秒一次 , 并且该操作是由记录上的时间戳驱动的。
挂钟时间处理(Wall-Clock-Time Processing)
可以根据挂钟时间来调度标点。在底层,Kafka Streams使用一个消费者,而消费者会调用 poll () 来获取记录。挂钟时间随着 poll () 调用而推进。因此**,挂钟时间的推进在一定程度上取决于从 poll () 循环返回所花费的时间**。
使用Processor API构建流式应用通常按以下方方式构建:
创建节点时,需要为每个节点提供一个名称,会将一个节点的名称用作另一个节点的父节点名称。如前所述,一个节点可以有多个子节点,但你可以有选择地将记录转发到某些节点
首先,创建一个实例并添加一个源节点。
然后,添加一个自定义处理器,其上级节点是源节点。
最后,创建一个汇聚节点,其上级节点是自定义处理器。对于这些节点中的任何一个,都可以有多个上级节点和多个子节点 。
Processor Api 代码:
10.kafka 流式测试(kafka stream testing)
kafka stream 测试代码
11. kafka stream 错误处理
kafka stream 错误分类:
- entry (consumer) errors(入口错误):这种类型的错误会在记录传入时发生,通常是网络错误或反序列化错误
- processing (user logic)errors(处理错误):通常情况下,任何与提供的逻辑相关的异常最终都会逐渐显现并导致应用程序关闭。如:类型不匹配
- exit (producer) errors(退出错误):这种类型的错误发生在向 Kafka topic写入记录时,通常与网络或序列化错误有关。常见的错误是:RecordTooLargeException。
task.timeout.config 的配置,会在发生错误时启动一个计时器,这样 Kafka Streams就可以尝试推进其他任务的进度。失败的任务会不断重试,直到达到超时时间,此时该任务最终会失败。
kafka stream 错误处理代码:
12.kafka stream 实现原理(Internals)
任务(Tasks):
Kafka Streams采用任务作为工作单元的概念。它是Kafka Streams应用实例中最小的工作单元。任务的数量由输入分区的数量决定。例如,如果Kafka Streams应用只订阅了一个主题,而该topic有六个分区,那么Kafka Streams应用就会有六个任务。但如果有多个topics,应用会采用这些topics中最大的分区数
线程(Threads)
任务被分配给StreamThread(流线程)执行。默认的Kafka Streams应用程序有一个StreamThread。因此,如果有五个任务和一个 StreamThread,该StreamThread会依次处理每个任务的记录。不过,在 Kafka Streams中,可以拥有与任务数量相同的线程。所以对于五个任务,你可以将应用程序配置为拥有五个线程。每个任务会得到自己的线程,而任何剩余的线程则处于空闲状态。
实例(Instances)
关于任务分配,应用实例与任务类似。如果你有一个包含五个分区的输入主题,你可以启动五个具有相同应用 ID 的 Kafka Streams 实例,每个应用会被分配并处理一个任务。启动新的应用与增加线程一样,都能提高吞吐量。和线程的情况类似,如果启动的应用实例数量多于任务数量,多余的实例将处于空闲状态,但可用于故障转移。其优势在于这种行为是动态的,无需关闭任何东西。可以随时启动实例,也可以关闭实例。
消费者组协议(consumer group protocol)
由于Kafka Streams在内部使用 Kafka消费者,因此它继承了消费者组协议的动态扩展特性。所以,当一个成员离开组时,它会将资源重新分配给其他活跃成员。当一个新成员加入组时,它会从现有成员那里获取资源并分配给新成员。正如前面所提到的,这一切都可以在运行时完成,无需关闭任何当前正在运行的应用程序。
因此,应该根据需要设置足够多的实例,直到所有任务都得到处理。然后,在流量减少时,可以关闭一些实例,资源会被自动重新分配。
13.有状态容错(kafka stream Stateful Fault Tolerance)
有状态容错:
Kafka Streams中的状态存储要么是持久化的,要么是内存中的。这两种类型都以变更日志主题作为后盾,以保证持久性。当 Kafka Streams应用程序启动时,它会检测有状态节点,如果确定数据缺失,就会从变更日志主题中进行恢复。内存存储在重启时不会保留记录,因此重启后需要从变更日志主题中完全恢复。相比之下,持久化状态存储可能几乎不需要恢复,甚至完全不需要恢复。
changelog topic: restore state
有状态操作的完全恢复需要时间。出于这个原因,Kafka Streams提供了备用任务(stand-by tasks)。当将 num.standby.replicas 设置为大于默认值 0 时,Kafka Streams会指定另一个应用实例作为备用实例。备用实例通过从变更日志读取数据,使镜像存储与原始存储保持同步。当主实例出现故障时,备用实例会立即接管。
压缩机制(compaction)
变更日志主题采用压缩机制,即每个键的最旧记录会被删除,只保留最新的记录。这意味着变更日志主题不会无限制地增大。
14.交互式查询(Interactive Que)
交互性查询:
Kafka Streams中的有状态操作代表了事件流的当前状态,包括聚合aggregate、归约reduce,甚至还包括 KTable,因为KTable使用更新流。
对于事件流来说,一种典型模式是报表 :类似仪表盘应用之类的东西。可以进行长期的分析,但也希望能实时了解当下正在发生的情况,尤其是在聚合和计数方面。例如,这能让实现告警功能。
报表通常需要流处理系统将其内容写入外部数据库,然后UI层会查询该数据库以用于实时使用。
Kafka Streams能够直接查询有状态操作或表的现有状态,无需 SQL层。可以通过交互式查询来实现这一点。这些查询是实时的,可以在它们发生时进行查看,而且不需要将其中间结果写入外部数据库。交互式查询提供了操作的实时物化视图,从而简化了架构。
开启交互式查询(enabling interactive queries)
KTables 和aggregations 聚合操作是交互式查询的目标。要启用它们,可以通过Materialized 对象为状态存储命名,或者使用Stores工厂类;Stores 类有几个方法可用于创建状态存储。还需要提供服务层(通常是一个 REST API),方法是设置 application.server 配置,指定主机和端口。请注意,每个实例会与所有其他具有相同应用程序ID的应用程序共享元数据。