golang kafka sarama源码分析

发布于:2024-04-17 ⋅ 阅读:(29) ⋅ 点赞:(0)

一些理论

1.topic支持多分区,每个分区只能被组内的一个消费者消费,一个消费者可能消费多个分区的数据;
2.消费者组重平衡的分区策略,是由消费者自己决定的,具体是从消费者组中选一个作为leader进行分区方案分配;
3.每条消息都有一个唯一的offset,kafka保证单个分区的消息有序,因为每个分区的消息是按顺序写入的,消费者是按offset拉取;
4.自动提交和手动提交,自动提交是指 sdk 开启了一个协程,定时自动提交已经标记处理的消息的offset,而不是说拉到消息就自动提交;手动提交则需要业务代码手动提交offset;如果是每消费一条消息再手动提交一次,这样是同步操作,会降低消费者消费速度,可以考虑批量处理多个消息再一起提交;
5.消费模式,kafka是拉模式,消费者定时从kafka拉取消息;
6.服务发布更新、重启、k8s中pod扩缩容 会导致消费者组内消费者成员数量发生变化,进而发生消费者组重平衡,重平衡期间stw消费者组短暂停止拉取拉取 会导致消息堆积,这种重平衡无法避免,stw时间取决于服务升级期间的耗时

源码分析

消费者组接口

type ConsumerGroup interface {
	Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
	Errors() <-chan error
	Close() error
}

type ConsumerGroupHandler interface {
	Setup(ConsumerGroupSession) error
	Cleanup(ConsumerGroupSession) error
	ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
}

type ConsumerGroupClaim interface {
	Topic() string
	Partition() int32
	InitialOffset() int64
	HighWaterMarkOffset() int64
	Messages() <-chan *ConsumerMessage
}

type ConsumerGroupSession interface {
	Claims() map[string][]int32
	MemberID() string
	GenerationID() int32
	MarkOffset(topic string, partition int32, offset int64, metadata string)
	Commit()
	ResetOffset(topic string, partition int32, offset int64, metadata string)
	MarkMessage(msg *ConsumerMessage, metadata string)
	Context() context.Context
}

消息拉取

可以在一个请求拉多个分区的数据,然后按分区分类

func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {
	// 。。。
	go withRecover(bc.subscriptionManager)
	go withRecover(bc.subscriptionConsumer)

	return bc
}

response, err := bc.fetchNewMessages()

func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
	request := &FetchRequest{
		MinBytes:    bc.consumer.conf.Consumer.Fetch.Min,
		MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
	}
	// 。。。

	for child := range bc.subscriptions {

		request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)

		fmt.Printf("fetchNewMessages topic=%s partition=%d offset=%d fetchSize=%d\n",
			child.topic, child.partition, child.offset, child.fetchSize)
	}

	return bc.broker.Fetch(request)
}


//subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
func (bc *brokerConsumer) subscriptionConsumer() {
	<-bc.wait // wait for our first piece of work

	for newSubscriptions := range bc.newSubscriptions {
		bc.updateSubscriptions(newSubscriptions)

		if len(bc.subscriptions) == 0 {
			// We're about to be shut down or we're about to receive more subscriptions.
			// Either way, the signal just hasn't propagated to our goroutine yet.
			<-bc.wait
			continue
		}

		response, err := bc.fetchNewMessages()
		fmt.Printf("[%s]subscriptionConsumer.fetchNewMessages...\n", time.Now())

		if err != nil {
			Logger.Printf("consumer/broker/%d disconnecting due to error processing FetchRequest: %s\n", bc.broker.ID(), err)
			bc.abort(err)
			return
		}

		bc.acks.Add(len(bc.subscriptions))
		for child := range bc.subscriptions {
			// 每个分区处理器都写入
			fmt.Printf("subscriptionConsumer write %s %d %+v\n", child.topic, child.partition, response)
			child.feeder <- response

		}
		bc.acks.Wait()
		bc.handleResponses()
	}
}

消息解析处理

func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) {
	child := &partitionConsumer{
		consumer:  c,
		conf:      c.conf,
		topic:     topic,
		partition: partition,
		messages:  make(chan *ConsumerMessage, c.conf.ChannelBufferSize),
		errors:    make(chan *ConsumerError, c.conf.ChannelBufferSize),
		feeder:    make(chan *FetchResponse, 1),
		trigger:   make(chan none, 1),
		dying:     make(chan none),
		fetchSize: c.conf.Consumer.Fetch.Default,
	}

	if err := child.chooseStartingOffset(offset); err != nil {
		return nil, err
	}

	var leader *Broker
	var err error
	if leader, err = c.client.Leader(child.topic, child.partition); err != nil {
		return nil, err
	}

	if err := c.addChild(child); err != nil {
		return nil, err
	}

	go withRecover(child.dispatcher)
	// 每个分区起一个协程处理
	go withRecover(child.responseFeeder)
	fmt.Printf("\nConsumePartition go %s %d %d\n", topic, partition, offset)

	child.broker = c.refBrokerConsumer(leader)
	child.broker.input <- child

	return child, nil
}


自动提交offset


func newOffsetManagerFromClient(group, memberID string, generation int32, client Client) (*offsetManager, error) {
	// Check that we are not dealing with a closed Client before processing any other arguments
	if client.Closed() {
		return nil, ErrClosedClient
	}

	conf := client.Config()
	om := &offsetManager{
		client: client,
		conf:   conf,
		group:  group,
		poms:   make(map[string]map[int32]*partitionOffsetManager),

		memberID:   memberID,
		generation: generation,

		closing: make(chan none),
		closed:  make(chan none),
	}
	if conf.Consumer.Offsets.AutoCommit.Enable {
		om.ticker = time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval)
		go withRecover(om.mainLoop)
	}

	return om, nil
}
func (om *offsetManager) mainLoop() {
	defer om.ticker.Stop()
	defer close(om.closed)

	for {
		select {
		case <-om.ticker.C:
			om.Commit()
		case <-om.closing:
			return
		}
	}
}

func (om *offsetManager) Commit() {
	om.flushToBroker()
	om.releasePOMs(false)
}

func (om *offsetManager) flushToBroker() {
	req := om.constructRequest()
	if req == nil {
		return
	}

	broker, err := om.coordinator()
	if err != nil {
		om.handleError(err)
		return
	}

	resp, err := broker.CommitOffset(req)
	if err != nil {
		fmt.Printf("broker.CommitOffset fail %v\n", err)
		om.handleError(err)
		om.releaseCoordinator(broker)
		_ = broker.Close()
		return
	}

	om.handleResponse(broker, req, resp)
}

标记位移

func (s *consumerGroupSession) MarkMessage(msg *ConsumerMessage, metadata string) {
	s.MarkOffset(msg.Topic, msg.Partition, msg.Offset+1, metadata)
}
func (pom *partitionOffsetManager) MarkOffset(offset int64, metadata string) {
	pom.lock.Lock()
	defer pom.lock.Unlock()

	if offset > pom.offset {
		pom.offset = offset
		pom.metadata = metadata
		pom.dirty = true
	}
}

func (om *offsetManager) constructRequest() *OffsetCommitRequest {
	var r *OffsetCommitRequest
	var perPartitionTimestamp int64
	if om.conf.Consumer.Offsets.Retention == 0 {
		perPartitionTimestamp = ReceiveTime
		r = &OffsetCommitRequest{
			Version:                 1,
			ConsumerGroup:           om.group,
			ConsumerID:              om.memberID,
			ConsumerGroupGeneration: om.generation,
		}
	} else {
		r = &OffsetCommitRequest{
			Version:                 2,
			RetentionTime:           int64(om.conf.Consumer.Offsets.Retention / time.Millisecond),
			ConsumerGroup:           om.group,
			ConsumerID:              om.memberID,
			ConsumerGroupGeneration: om.generation,
		}
	}

	om.pomsLock.RLock()
	defer om.pomsLock.RUnlock()

	for _, topicManagers := range om.poms {
		for _, pom := range topicManagers {
			pom.lock.Lock()
			if pom.dirty {
				r.AddBlock(pom.topic, pom.partition, pom.offset, perPartitionTimestamp, pom.metadata)
			}
			pom.lock.Unlock()
		}
	}

	// 有处理的才提交
	if len(r.blocks) > 0 {
		return r
	}

	return nil
}

消费者重平衡

func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {

	// 获取broker组协调器
	coordinator, err := c.client.Coordinator(c.groupID)
	if err != nil {
		if retries <= 0 {
			return nil, err
		}

		return c.retryNewSession(ctx, topics, handler, retries, true)
	}

	// 申请加入组
	// Join consumer group
	join, err := c.joinGroupRequest(coordinator, topics)
	if err != nil {
		_ = coordinator.Close()
		return nil, err
	}
	switch join.Err {
	case ErrNoError:
		c.memberID = join.MemberId
	case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
		c.memberID = ""
		return c.newSession(ctx, topics, handler, retries)
	case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
		if retries <= 0 {
			return nil, join.Err
		}

		return c.retryNewSession(ctx, topics, handler, retries, true)

	// 已经在重平衡期间
	case ErrRebalanceInProgress: // retry after backoff
		if retries <= 0 {
			return nil, join.Err
		}

		return c.retryNewSession(ctx, topics, handler, retries, false)
	default:
		return nil, join.Err
	}

	// 消费者组中的一个消费者作为leader,进行分区方案分配
	// Prepare distribution plan if we joined as the leader
	var plan BalanceStrategyPlan
	if join.LeaderId == join.MemberId {
		members, err := join.GetMembers()
		if err != nil {
			return nil, err
		}

		// 分配分区
		plan, err = c.balance(members)
		if err != nil {
			return nil, err
		}
	}

	// 同步给kafka,只有 leader会带上分区方案
	// Sync consumer group
	groupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
	if err != nil {
		_ = coordinator.Close()
		return nil, err
	}
	switch groupRequest.Err {
	case ErrNoError:
	case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
		c.memberID = ""
		return c.newSession(ctx, topics, handler, retries)
	case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
		if retries <= 0 {
			return nil, groupRequest.Err
		}

		return c.retryNewSession(ctx, topics, handler, retries, true)
	case ErrRebalanceInProgress: // retry after backoff
		if retries <= 0 {
			return nil, groupRequest.Err
		}

		return c.retryNewSession(ctx, topics, handler, retries, false)
	default:
		return nil, groupRequest.Err
	}

	// Retrieve and sort claims
	var claims map[string][]int32 // topic->partions
	// 如果有可消费的分区
	if len(groupRequest.MemberAssignment) > 0 {
		members, err := groupRequest.GetMemberAssignment()
		if err != nil {
			return nil, err
		}
		claims = members.Topics
		c.userData = members.UserData

		for _, partitions := range claims {
			sort.Sort(int32Slice(partitions))
		}
	}

	return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
}

使用例子

消费者-自动提交

package main

import (
	"context"
	"fmt"

	"github.com/Shopify/sarama"
)

func main() {
	config := sarama.NewConfig()
	config.Version = sarama.V2_0_0_0

	config.Consumer.Offsets.Initial = sarama.OffsetNewest
	config.Consumer.Offsets.AutoCommit.Enable = true // 自动提交
	config.Consumer.Return.Errors = true

	var (
		brokers = []string{"localhost:9092"}
		groupID = "g1"
		topics  = []string{"test3"}
	)

	group, err := sarama.NewConsumerGroup(brokers, groupID, config)
	if err != nil {
		panic(err)
	}
	defer func() { _ = group.Close() }()

	// Track errors
	go func() {
		for err := range group.Errors() {
			fmt.Println("ERROR", err)
		}
	}()

	// Iterate over consumer sessions.
	ctx := context.Background()
	for {

		handler := exampleConsumerGroupHandler{}

		// `Consume` should be called inside an infinite loop, when a
		// server-side rebalance happens, the consumer session will need to be
		// recreated to get the new claims
		err := group.Consume(ctx, topics, handler)
		if err != nil {
			panic(err)
		}
	}
}

type exampleConsumerGroupHandler struct{}

func (exampleConsumerGroupHandler) Setup(se sarama.ConsumerGroupSession) error {
	fmt.Printf("Setup %q %+v", se.MemberID(), se.Claims())
	return nil
}
func (exampleConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h exampleConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for msg := range claim.Messages() {
		fmt.Printf("Message topic:%q partition:%d offset:%d ts:%s val:%s\n",
			msg.Topic, msg.Partition, msg.Offset, msg.Timestamp, msg.Value)

		//time.Sleep(time.Second * 10)
		sess.MarkMessage(msg, "")

		//sess.Commit()
		//fmt.Printf("\n\nafter commit\n")
	}
	return nil
}

消费者-手动提交

package main

import (
	"context"
	"fmt"

	"github.com/Shopify/sarama"
)

func main() {
	config := sarama.NewConfig()
	config.Version = sarama.V2_0_0_0

	config.Consumer.Offsets.Initial = sarama.OffsetNewest
	config.Consumer.Offsets.AutoCommit.Enable = false
	config.Consumer.Return.Errors = true

	var (
		brokers = []string{"localhost:9092"}
		groupID = "g1"
		topics  = []string{"test3"}
	)

	group, err := sarama.NewConsumerGroup(brokers, groupID, config)
	if err != nil {
		panic(err)
	}
	defer func() { _ = group.Close() }()

	// Track errors
	go func() {
		for err := range group.Errors() {
			fmt.Println("ERROR", err)
		}
	}()

	// Iterate over consumer sessions.
	ctx := context.Background()
	for {

		handler := exampleConsumerGroupHandler{}

		// `Consume` should be called inside an infinite loop, when a
		// server-side rebalance happens, the consumer session will need to be
		// recreated to get the new claims
		err := group.Consume(ctx, topics, handler)
		if err != nil {
			panic(err)
		}
	}
}

type exampleConsumerGroupHandler struct{}

func (exampleConsumerGroupHandler) Setup(se sarama.ConsumerGroupSession) error {
	fmt.Printf("Setup %q %+v", se.MemberID(), se.Claims())
	return nil
}
func (exampleConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h exampleConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for msg := range claim.Messages() {
		fmt.Printf("Message topic:%q partition:%d offset:%d ts:%s val:%s\n",
			msg.Topic, msg.Partition, msg.Offset, msg.Timestamp, msg.Value)

		//time.Sleep(time.Second * 10)
		sess.MarkMessage(msg, "")
		sess.Commit()
	}
	return nil
}

生产者 同步发送

package main

import (
	"fmt"
	"log"
	"os"

	"github.com/Shopify/sarama"
)

var (
	logger = log.New(os.Stderr, "", log.LstdFlags)
)

func main() {

	var (
		brokers = []string{"localhost:9092"}
		topic   = "test3"
	)

	config := sarama.NewConfig()
	config.Producer.Return.Successes = true

	/*
		=0 不发送任何响应,TCP ACK就是你得到的全部
		WaitForLocal RequiredAcks=1 只等待本地提交成功后再进行响应。
		WaitForAll RequiredAcks=-1 等待所有同步副本提交后再响应。
	*/
	config.Producer.RequiredAcks = sarama.WaitForAll // WaitForAll等待所有同步副本提交后再响应。

	producer, err := sarama.NewSyncProducer(brokers, config)
	if err != nil {
		fmt.Printf("Failed to open Kafka producer: %s", err)
		return
	}
	defer func() {
		if err := producer.Close(); err != nil {
			logger.Println("Failed to close Kafka producer cleanly:", err)
		}
	}()

	message := &sarama.ProducerMessage{
		Topic: topic,
		Key:   sarama.StringEncoder("k1"),
		Value: sarama.StringEncoder("v1"),
	}
	partition, offset, err := producer.SendMessage(message)
	if err != nil {
		fmt.Printf("Failed to produce message: %s", err)
	}
	fmt.Printf("produce %d/%d\n", partition, offset)
}

shell

生产者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test3


创建topic 分区数3
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test


查看堆积情况,位移差值越大,堆积越严重
[root@localhost kafka_2.12-2.5.1] # [kube:] ./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group g1

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                 HOST             CLIENT-ID
g1              test3           0          4               4               0               sarama-c0869d1a-9a8e-400d-a6f5-887ca95587d5 /0:0:0:0:0:0:0:1 sarama
g1              test3           1          4               4               0               sarama-c0869d1a-9a8e-400d-a6f5-887ca95587d5 /0:0:0:0:0:0:0:1 sarama
g1              test3           2          3               3               0               sarama-c0869d1a-9a8e-400d-a6f5-887ca95587d5 /0:0:0:0:0:0:0:1 sarama
g1              test            0          4               4               0               -                                           -                -