暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Kafka高吞吐量揭秘

BearKang 2021-06-28
2280

Kafka是高吞吐低延迟的高并发、高性能的消息中间件。一般来说,Kafka集群可以处理每秒几十万、甚至上百万的写入请求。Kafka是怎样做到如此高的吞吐量和性能的呢?

1 批量消息发送。

2 页缓存技术以及顺序写磁盘

3 零拷贝

批量消息发送

Kafka会启动一个生产者时, 启动一个叫sender的io线程,发送消息时,实际仅返回了添加进消息累加器的Future,如果ack条件为0,那么实际发送仅仅会添加到消息累加器即为结束了,发送流程大致有一下几个步骤。




    this.errors = this.metrics.sensor("errors");
        this.sender = this.newSender(logContext, kafkClient, this.metadata);
        String ioThreadName = "kafka-producer-network-thread | " + this.clientId;
        this.ioThread = new KafkaThread(ioThreadName,this.sender, true);
    this.ioThread.start();
    config.logUnused();
        AppInfoParser.registerAppInfo("kafka.producer",this.clientId, this.metrics, time.milliseconds());
    this.log.debug("Kafka producer started");


    Kafka在消息发送时,主要在代码层面做了一个批量的优化,即批量消息的发送。


        public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
      if (!this.recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
      return null;
      } else {
      Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
      this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(this.magic(), this.recordsBuilder.compressionType(), key, value, headers));
      this.lastAppendTime = now;
      FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, (long)this.recordCount, timestamp, checksum, key == null ? -1 : key.length, value == null ? -1 : value.length, Time.SYSTEM);
      this.thunks.add(new ProducerBatch.Thunk(callback, future));
      ++this.recordCount;
      return future;
      }
      }


      发送消息的任务则完全由sender线程完成,sender的run方法核心如下


          public void run() {
             this.log.debug("Starting Kafka producer I/O thread.");


        while (this.running) {
        try {
        this.runOnce();
        } catch (Exception var5) {
                      this.log.error("Uncaught error in kafka producer I/O thread: ", var5);
        }
        }
        }


            private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
             Iterator var4 = collated.entrySet().iterator();
        while(var4.hasNext()) {
                    Entry<Integer, List<ProducerBatch>> entry =(Entry)var4.next();
                    this.sendProduceRequest(now, (Integer)entry.getKey(), this.acks, this.requestTimeoutMs,(List)entry.getValue());
                }
        }


        sender线程在running情况下,不停轮询就绪的的分区和批次,然后将消息按批次发送到Broker,这个过程如下图所示:



        Broker页缓存和磁盘顺序写


        Kafka消息是持久化在磁盘上的,因此Kafka才有了持久化保证。但是我们知道磁盘io是比较耗时的操作,因此大多数中间件都会尽量避免直接和磁盘打交道,Kafka也是如此,在写入数据的时候,Kafka会先写入page cache里,也就是仅仅写入内存中,接下来由操作系统自己决定什么时候把page cache里的数据真的刷入磁盘文件中(刷盘策略),另一方面,就是kafka写数据的时候,它是以磁盘顺序写的方式来写的。也就是说,仅仅将数据追加到文件的末尾,而不是在文件的随机位置来修改数据。这种追加写的方式在性能上要比随机写效率高很多,因此Broker端的写入实际上基本可以认为是写内存。

        Broker零拷贝



        传统发送磁盘数据,数据需要经过磁盘->page cache->用户进程->Socket,经过三次拷贝,而大名鼎鼎的零拷贝只需要将数据文件从磁盘->page cache->Socket,而且从page cache拷贝到socket缓冲区只需要拷贝文件fd即可,因此可以减少两次文件的拷贝和内核到用户态的切换,因此性能得到大幅度提升。

        总结

        Kafka实现高吞吐量的原因是有很多方面原因的,其中包括消息的批量发送(批次大小和间隔可配置),页缓存技术和磁盘顺序写(Broker适当调大内存给更多的页缓存使用是有帮助的),消费时的零拷贝技术等。





        文章转载自BearKang,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

        评论