暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

golang源码分析:sarama kafka client(part II:消费者)

        这一讲,我们接着介绍下sarama kafka client的消费者的实现,先从例子开始:

    package main


    import (
    "fmt"
    "log"
    "sync"


    "github.com/Shopify/sarama"
    )


    // 消费者练习


    func main() {
    // 生成消费者 实例
    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
    if err != nil {
    log.Print(err)
    return
    }
    // 拿到 对应主题下所有分区
    partitionList, err := consumer.Partitions("test")
    if err != nil {
    log.Println(err)
    return
    }


    var wg sync.WaitGroup
    wg.Add(1)
    // 遍历所有分区
    for partition := range partitionList {
    //消费者 消费 对应主题的 具体 分区 指定 主题 分区 offset return 对应分区的对象
    pc, err := consumer.ConsumePartition("test", int32(partition), sarama.OffsetNewest)
    if err != nil {
    log.Println(err)
    return
    }


    // 运行完毕记得关闭
    defer pc.AsyncClose()


    // 去出对应的 消息
    // 通过异步 拿到 消息
    go func(sarama.PartitionConsumer) {
    defer wg.Done()
    for msg := range pc.Messages() {
    fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
    }
    }(pc)
    }
    wg.Wait()
    }
    复制

    分三个部分:

    1,sarama.NewConsumer ,创建一个consumer

    2,consumer.ConsumePartition 从指定topic,指定分区消费消息

    3, msg := range pc.Messages() 获取消息

    如果不需要拿到所有的分区,也可以只指定comsumer group

      package main


      import (
      "context"
      "fmt"
      "os"
      "os/signal"
      "sync"


      "github.com/Shopify/sarama"
      )


      type consumerGroupHandler struct {
      name string
      }


      func (consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
      func (consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
      func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
      for msg := range claim.Messages() {
      fmt.Printf("%s Message topic:%q partition:%d offset:%d value:%s\n", h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
      // 手动确认消息
      sess.MarkMessage(msg, "")
      }
      return nil
      }


      func handleErrors(group *sarama.ConsumerGroup, wg *sync.WaitGroup) {
      wg.Done()
      for err := range (*group).Errors() {
      fmt.Println("ERROR", err)
      }
      }


      func consume(group *sarama.ConsumerGroup, wg *sync.WaitGroup, name string) {
      fmt.Println(name + "start")
      wg.Done()
      ctx := context.Background()
      for {
      topics := []string{"test"}
      handler := consumerGroupHandler{name: name}
      err := (*group).Consume(ctx, topics, handler)
      fmt.Println("consume group end")
      if err != nil {
      panic(err)
      }
      }
      }


      func main() {
      var wg sync.WaitGroup
      config := sarama.NewConfig()
      config.Consumer.Return.Errors = false
      config.Version = sarama.V0_10_2_0
      client, err := sarama.NewClient([]string{"localhost:9092"}, config)
      defer client.Close()
      if err != nil {
      panic(err)
      }
      group1, err := sarama.NewConsumerGroupFromClient("c1", client)
      if err != nil {
      panic(err)
      }
      group2, err := sarama.NewConsumerGroupFromClient("c2", client)
      if err != nil {
      panic(err)
      }
      group3, err := sarama.NewConsumerGroupFromClient("c3", client)
      if err != nil {
      panic(err)
      }
      defer group1.Close()
      defer group2.Close()
      defer group3.Close()
      wg.Add(3)
      go consume(&group1, &wg, "c1")
      go consume(&group2, &wg, "c2")
      go consume(&group3, &wg, "c3")
      wg.Wait()
      signals := make(chan os.Signal, 1)
      signal.Notify(signals, os.Interrupt)
      select {
      case <-signals:
      }
      }
      复制

      我们从NewConsumerGroup作为入口开始源码分析

      consumer_group.go

        func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerGroup, error) {
        client, err := NewClient(addrs, config)
        c, err := newConsumerGroup(groupID, client)
        }
        复制

        先创建一个client,然后生成一个consumerGroup 对象:

          type ConsumerGroup interface {
            Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
          // Errors returns a read channel of errors that occurred during the consumer life-cycle.
          // By default, errors are logged and not returned over this channel.
          // If you want to implement any custom error handling, set your config's
          // Consumer.Return.Errors setting to true, and read from this channel.
            Errors() <-chan error
          // Close stops the ConsumerGroup and detaches any running sessions. It is required to call
          // this function before the object passes out of scope, as it will otherwise leak memory.
          Close() error
          }
          复制

            type consumerGroup struct {
            client Client


            config *Config
            consumer Consumer
            groupID string
            memberID string
            errors chan error


            lock sync.Mutex
            closed chan none
            closeOnce sync.Once


            userData []byte
            }
            复制
              func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) {
                consumer, err := NewConsumerFromClient(client) 
              }
              复制

              创建consumerGroup的同时会创建consumer对象:

              consumer.go

                func NewConsumerFromClient(client Client) (Consumer, error) {
                cli := &nopCloserClient{client}
                return newConsumer(cli)
                }
                复制
                  func newConsumer(client Client) (Consumer, error) {
                  }
                  复制
                    type consumer struct {
                    conf *Config
                    children map[string]map[int32]*partitionConsumer
                    brokerConsumers map[*Broker]*brokerConsumer
                    client Client
                    lock sync.Mutex
                    }
                    复制

                    创建完ConsumerGroup后我们就开始消费了,对应的接口是Consume

                      func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error {
                         c.client.RefreshMetadata(topics...)//加载元数据
                         sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
                         go c.loopCheckPartitionNumbers(topics, sess)
                      }
                      复制

                      RefreshMetadata用于获取对应元数据信息,代码在client.go

                        func (client *client) RefreshMetadata(topics ...string) error {
                        return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max, deadline)
                        }
                        复制
                          func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, deadline time.Time) error {
                          broker = client.any()
                              req := &MetadataRequest{Topics: topics, AllowAutoTopicCreation: allowAutoTopicCreation}
                          response, err := broker.GetMetadata(req)
                            shouldRetry, err := client.updateMetadata(response, allKnownMetaData)
                          }
                          复制

                                  每个 partition 与 consumer 的分配关系称作一个 “claim”;组 ConsumerGroupClain 这一轮的生命周期称作一个 session。session 的退出发生在 ctx 退出,或者 partition rebalance。session 要求客户端与 coordinator 保持一定的心跳,原版 kafka 客户端为此有一条 session.timeout.ms 的配置,客户端需要在时间范围内对 coordinator 发送心跳,不然将视为该客户端退出而出发 Rebalance。

                            func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
                            coordinator, err := c.client.Coordinator(c.groupID)
                            join, err := c.joinGroupRequest(coordinator, topics)
                            groupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
                            return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
                            }
                            复制

                              func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) {
                              offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client)
                                  go sess.heartbeatLoop() 
                                   // start consuming
                              for topic, partitions := range claims {
                              for _, partition := range partitions {
                                      sess.waitGroup.Add(1)
                              go func(topic string, partition int32) {
                              sess.consume(topic, partition)
                              }(topic, partition)
                              }
                              }
                              }
                              复制
                                type consumerGroupSession struct {
                                parent *consumerGroup
                                memberID string
                                generationID int32
                                handler ConsumerGroupHandler


                                claims map[string][]int32
                                offsets *offsetManager
                                ctx context.Context
                                cancel func()


                                waitGroup sync.WaitGroup
                                releaseOnce sync.Once
                                hbDying, hbDead chan none
                                }
                                复制

                                调用了 sess.consume(topic, partition) 这个接口:

                                  func (s *consumerGroupSession) consume(topic string, partition int32) {
                                  // create new claim
                                  claim, err := newConsumerGroupClaim(s, topic, partition, offset)
                                  s.handler.ConsumeClaim(s, claim)
                                  }
                                  复制
                                    func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition int32, offset int64) (*consumerGroupClaim, error) {
                                    pcm, err := sess.parent.consumer.ConsumePartition(topic, partition, offset)
                                    }
                                    复制
                                      type consumerGroupClaim struct {
                                      topic string
                                      partition int32
                                      offset int64
                                      PartitionConsumer
                                      }
                                      复制

                                      调用了ConsumePartition消费对应的partition

                                      consumer.go

                                        func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) {
                                        child := &partitionConsumer
                                        if err := child.chooseStartingOffset(offset); err != nil
                                        if leader, err = c.client.Leader(child.topic, child.partition); err != nil
                                        if err := c.addChild(child); err != nil
                                        c.children[child.topic] = topicChildren
                                        topicChildren[child.partition] = child
                                        go withRecover(child.dispatcher)
                                        go withRecover(child.responseFeeder)
                                        child.broker = c.refBrokerConsumer(leader)
                                        bc := c.brokerConsumers[broker]
                                        bc.refs++
                                              child.broker.input <- child
                                        }
                                        复制

                                        创建了一个partitionConsumer对象:

                                          type partitionConsumer struct {
                                          highWaterMarkOffset int64 must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG


                                          consumer *consumer
                                          conf *Config
                                          broker *brokerConsumer
                                          messages chan *ConsumerMessage
                                          errors chan *ConsumerError
                                          feeder chan *FetchResponse


                                          preferredReadReplica int32


                                          trigger, dying chan none
                                          closeOnce sync.Once
                                          topic string
                                          partition int32
                                          responseResult error
                                          fetchSize int32
                                          offset int64
                                          retries int32
                                          }
                                          复制

                                          同时起了两个协程,这两个协程是核心


                                          1,先看dispatcher,主要是维护订阅者信息

                                                func (child *partitionConsumer) dispatcher() 
                                            for range child.trigger
                                            if err := child.dispatch(); err != nil {
                                            child.consumer.unrefBrokerConsumer(child.broker)
                                            child.consumer.removeChild(child)
                                            close(child.feeder
                                            复制

                                            看下dispatcher协程里的dispatch方法

                                               func (child *partitionConsumer) dispatch() error 
                                              if err := child.consumer.client.RefreshMetadata(child.topic); err != nil
                                              broker, err := child.preferredBroker()
                                              child.broker = child.consumer.refBrokerConsumer(broker)
                                              child.broker.input <- child
                                              复制

                                              先获得一个brokerConsumer 对象

                                                type brokerConsumer struct {
                                                consumer *consumer
                                                broker *Broker
                                                input chan *partitionConsumer
                                                newSubscriptions chan []*partitionConsumer
                                                subscriptions map[*partitionConsumer]none
                                                wait chan none
                                                acks sync.WaitGroup
                                                refs int
                                                }
                                                复制
                                                  func (c *consumer) refBrokerConsumer(broker *Broker) *brokerConsumer {
                                                  bc = c.newBrokerConsumer(broker)
                                                  }
                                                  复制
                                                    func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer 
                                                    go withRecover(bc.subscriptionManager)
                                                    go withRecover(bc.subscriptionConsumer)
                                                    复制

                                                    起了两个协程:

                                                      func (bc *brokerConsumer) subscriptionManager(){
                                                      case event, ok := <-bc.input:
                                                             buffer = append(buffer, event)
                                                            case bc.newSubscriptions <- buffer
                                                             buffer = nil
                                                       }
                                                      复制

                                                      input里面有新的订阅请求,会appende到newSubscriptions 里面,不是带缓冲的channel,是一个chnel,里面是个slice

                                                        func (bc *brokerConsumer) subscriptionConsumer() 
                                                        for newSubscriptions := range bc.newSubscriptions {
                                                        bc.updateSubscriptions(newSubscriptions)
                                                        response, err := bc.fetchNewMessages()
                                                                  bc.acks.Add(len(bc.subscriptions))
                                                                  child.feeder <- response
                                                                  bc.acks.Wait()
                                                                  bc.handleResponses()
                                                        复制

                                                        每次收到消费者变换的消息后,都会调用fetchNewMessages,然后放到feeder里面

                                                          func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
                                                          for child := range bc.subscriptions {
                                                          request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
                                                          }
                                                          return bc.broker.Fetch(request)
                                                          }
                                                          复制

                                                          Fetch就是请求broker,获取消息

                                                            func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
                                                              response := new(FetchResponse)
                                                              err := b.sendAndReceive(request, response)
                                                            }
                                                            复制

                                                            2,接着看下responseFeeder协程

                                                               func (child *partitionConsumer) responseFeeder() {
                                                                feederLoop:
                                                              从broker获取消息的大循环
                                                              for response := range child.feeder
                                                              for i, msg := range msgs {
                                                              case child.messages <- msg:
                                                              child.broker.input <- child
                                                              continue feederLoop
                                                               }
                                                              复制

                                                              这是整个consumer的消息大循环,不断从feeder里面消费消息,放到messages里面,处理完毕以后将自己放回broker的input里面。

                                                              subscriptionManager会从input里面把它取出来,然后取kafka拉取消息,完成了完整的消息循环


                                                              最后看下Messages接口

                                                                func (child *partitionConsumer) Messages() <-chan *ConsumerMessage {
                                                                return child.messages
                                                                }
                                                                复制

                                                                很简单,就是把处理好的消息从messages这个chanel里面取出来。


                                                                总结下:

                                                                    partitonConsumer 会启动 dispatcher 和 responseFeeder 两个 goroutine:

                                                                      1,dispatcher goroutine 用于跟踪 broker 的变化,偏元信息性质的控制侧,dispatcher 这个 goroutine 用于发现 broker 的变化。它会侦听 dispatcher.trigger 这个 channel 的通知,来发现 Partition 的 Leader 变化。而 trigger 这个 channel 的更新来自 brokerConsumer 对象。

                                                                最后 child.broker.input<- child 这一句,相当于使 partitionConsumer 加入 brokerConsumer 的订阅。

                                                                2, responseFeeder 用于跟踪消息的到来,偏数据侧。

                                                                child.feed 这个 channel 也是来自 brokerConsumer。大约是处理来自 brokerConsumer 的消息,转发给 messages chan。

                                                                值得留意有一个配置项目 child.conf.Consumer.MaxProcessingTime,默认值为 100ms,看注释它的意思是如果朝 messages chan 写入超过 100ms 仍未成功,则停止再向 Broker 发送 fetch 请求。


                                                                文章转载自golang算法架构leetcode技术php,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                                评论