暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

KAFKA容错及高可用测试一

民生运维人 2022-01-08
5460

环境准备

  • 准备测试所需的docker image

    在测试中将使用confluent三个主要系列的最新版本进行测试,检查不同版本在相同测试场景下的异同

    • docker pull confluentinc/cp-zookeeper:7.0.0
    • docker pull confluentinc/cp-kafka:5.5.6
      • 对应Apache Kafka® 2.5.1
    • docker pull confluentinc/cp-kafka:6.2.1
      • 对应Apache Kafka® 2.8.0
    • docker pull confluentinc/cp-kafka:7.0.0
      • 对应Apache Kafka® 3.0.0
  • 准备搭建测试环境的工具

    • 安装测试工具blockade,详情可见:https://blockade.readthedocs.io/en/latest/install.html
  • 准备测试脚本

    https://github.com/zhan-yl/ChaosTestingCode.git

  • 测试准备

    • 数据丢失的定义:

      在生产端已经收到kafka端返回确认的情况下,生产端认为数据已经被kafka写入成功,不会再对这部分数据发起重试操作,但这时由于kafka端某种异常导致这部分已经返回过确认的数据无法找回的情况(非所有数据副本发生不可修复故障),定义为我们测试中的数据丢失

    • 测试方法:

      客户端:统计已成功发送到kafka端并获得成功确认的数据量

      kafka端:每次新建单分区topic,然后通过获取kafka.tools.GetOffsetShell即可知道kafka实际存储的数据量

      测试过程:在客户端写入数据的过程中模拟制造各种生产故障

      比对:如果客户端的成功发送数据量大于kafka实际存储的数据量,则发生数据丢失

    • 测试操作流程

      每次测试时重新初始化整个集群,集群中通过容器建立kafka、zookeeper节点,集群创建完毕后创建kafka的单分区topic test1。在多个不同kafka版本间进行对比测试

      在一个窗口持续模拟向kafka的topic写入大量数据

      在另一个窗口对集群的broker节点模拟产生各种故障:节点宕机、网络缓慢、网络分区等

      最终检查kafka集群中实际写入的数据与预期是否相符

    • 测试前提:

      在测试中我们没有对发送失败的数据进行重试操作,这是为了避免对测试结果统计造成影响,但是数据重试在生产操作中是必须有的环节

    • 第一次启动环境:

      blockade up

    • 测试过程中zookeeper、kafka的数据存储均使用本地存储,保存在~/ChaosTestingCode/Kafka/cluster/volumes/

测试场景一:

场景设定:

ACKS=0,即数据生产客户端并不需等待kafka端是否返回ACK即认为数据已经发送成功,期间topic的leader发生故障切换。

操作过程:

客户端模拟向kafka发送100000条数据,ack设置为0,期间杀死topic的leader强制leader发生切换

具体命令:

  • 测试版本:confluentinc/cp-kafka:5.5.6

窗口一执行命令:其中blockade kill命令在窗口二正在写入数据过程中发起

zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ reset-cluster.sh;create-topic.sh kafka1 test1
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          5da02d8887d5    UP      172.17.0.3      NORMAL                
kafka2          34238c04862a    UP      172.17.0.4      NORMAL                
kafka3          fd202d94bd05    UP      172.17.0.5      NORMAL                
zk1             61fc580b7a7e    UP      172.17.0.2      NORMAL                
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
Created topic test1.
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1
        
当前leader为2节点,模拟节点2出现宕机
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade kill kafka2
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          5da02d8887d5    UP      172.17.0.3      NORMAL                
kafka2          34238c04862a    DOWN                    UNKNOWN               
kafka3          fd202d94bd05    UP      172.17.0.5      NORMAL                
zk1             61fc580b7a7e    UP      172.17.0.2      NORMAL             

新leader切换到3节点
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka1 test1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 3       Replicas: 2,3,1 Isr: 3,1
        
检查kefka实际存储数据
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka1 19092 test1
test1:0:99947

复制

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 100000 0.0001 0 test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
%6|1638169471.598|FAIL|rdkafka#producer-1| [thrd:172.17.0.4:9092/bootstrap]: 172.17.0.4:9092/2: Disconnected (after 19087ms in state UP)
%3|1638169471.598|FAIL|rdkafka#producer-1| [thrd:172.17.0.4:9092/bootstrap]: 172.17.0.4:9092/2: Connect to ipv4#172.17.0.4:9092 failed: Connection refused (after 0ms in state CONNECT)
Success: 50000 Failed: 0
Success: 60000 Failed: 0
Success: 70000 Failed: 0
Success: 80000 Failed: 0
Success: 90000 Failed: 0
Success: 100000 Failed: 0
Sent: 100000
Delivered: 100000
Failed: 0

复制
  • 测试版本:confluentinc/cp-kafka:6.2.1

窗口一执行命令:其中blockade kill命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

复制

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

复制
  • 测试版本:confluentinc/cp-kafka:7.0.0

窗口一执行命令:其中blockade kill命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

复制

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

复制

场景结论:

  • 测试版本:confluentinc/cp-kafka:5.5.6

    由于网络连接失败及发生leader切换,客户端发送100000条数据没有失败,kafka中实际写入99947条数据,因此导致53条数据丢失

  • 测试版本:confluentinc/cp-kafka:6.2.1

    与confluentinc/cp-kafka:5.5.6测试结果无差异

  • 测试版本:confluentinc/cp-kafka:7.0.0

    与confluentinc/cp-kafka:5.5.6测试结果无差异

测试场景二:

场景设定:

ACKS=1,即数据生产客户端只需接收到kafka端topic leader的返回ACK即认为数据已经发送成功,期间topic的leader发生故障切换。

操作过程:

客户端模拟向kafka发送100000条数据,ack设置为1,期间杀死topic的leader强制leader发生切换

具体命令:

  • 测试版本:confluentinc/cp-kafka:5.5.6

窗口一执行命令:其中blockade kill命令在窗口二正在写入数据过程中发起

zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ reset-cluster.sh;create-topic.sh kafka1 test1
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          aa0f36c550e6    UP      172.17.0.3      NORMAL                
kafka2          7eaea8c1d88f    UP      172.17.0.4      NORMAL                
kafka3          d1489c809d78    UP      172.17.0.5      NORMAL                
zk1             1d533d285fec    UP      172.17.0.2      NORMAL                
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
Created topic test1.
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2
        
当前leader为3,模拟节点3出现宕机
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade kill kafka3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          aa0f36c550e6    UP      172.17.0.3      NORMAL                
kafka2          7eaea8c1d88f    UP      172.17.0.4      NORMAL                
kafka3          d1489c809d78    DOWN                    UNKNOWN               
zk1             1d533d285fec    UP      172.17.0.2      NORMAL                
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka1 test1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 1       Replicas: 3,1,2 Isr: 1,2
        
检查kafka实际存储的数据量
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka1 19092 test1
test1:0:99944

复制

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 100000 0.0001 1 test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
%4|1638172063.335|FAIL|rdkafka#producer-1| [thrd:172.17.0.5:9092/bootstrap]: 172.17.0.5:9092/3: Disconnected (after 17386ms in state UP)
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
... ...
KafkaError{code=_TRANSPORT,val=-195,str="Local: Broker transport failure"}
%6|1638172063.336|FAIL|rdkafka#producer-1| [thrd:172.17.0.5:9092/bootstrap]: 172.17.0.5:9092/3: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 0ms in state APIVERSION_QUERY)
Success: 39944 Failed: 56
Success: 49944 Failed: 56
Success: 59944 Failed: 56
Success: 69944 Failed: 56
Success: 79944 Failed: 56
Success: 89944 Failed: 56
Success: 99944 Failed: 56
Sent: 100000
Delivered: 99944
Failed: 56

复制
  • 测试版本:confluentinc/cp-kafka:6.2.1

窗口一执行命令:其中blockade kill命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

复制

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

复制
  • 测试版本:confluentinc/cp-kafka:7.0.0

窗口一执行命令:其中blockade kill命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

复制

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

复制

场景结论:

  • 测试版本:confluentinc/cp-kafka:5.5.6

    由于网络连接失败及发生leader切换,客户端发送100000条数据,其中56条数据发送失败,kafka中实际写入99944条数据,没有发生数据丢失

  • 测试版本:confluentinc/cp-kafka:6.2.1

    与confluentinc/cp-kafka:5.5.6测试结果无差异

  • 测试版本:confluentinc/cp-kafka:7.0.0

    与confluentinc/cp-kafka:5.5.6测试结果无差异

测试场景三:

场景设定:

ACKS=all,即数据生产客户端需要接收到kafka端topic所有broker的返回ACK才认为数据已经发送成功,期间topic的leader发生故障切换。

操作过程:

客户端模拟向kafka发送1000000条数据,以十个并发进程同时进行,ack设置为all,期间杀死topic的leader强制leader发生切换

具体命令:

  • 测试版本:confluentinc/cp-kafka:5.5.6

窗口一执行命令:其中blockade kill命令在窗口二正在写入数据过程中发起

zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ reset-cluster.sh;create-topic.sh kafka1 test1
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          9e5f2c18923c    UP      172.17.0.3      NORMAL                
kafka2          f6bc773f5325    UP      172.17.0.4      NORMAL                
kafka3          3d6fa9e2182b    UP      172.17.0.5      NORMAL                
zk1             281f10b871ff    UP      172.17.0.2      NORMAL                
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
Created topic test1.
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 1       Replicas: 1,3,2 Isr: 1,3,2
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade kill kafka1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka3 test1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 3       Replicas: 1,3,2 Isr: 3,2
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka3 19092 test1
test1:0:998903

复制

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ concurrent-producer-silent.sh 100000 0.0001 all test1    
Runs complete
Acknowledged total: 998833

复制
  • 测试版本:confluentinc/cp-kafka:6.2.1

窗口一执行命令:其中blockade kill命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

复制

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

复制
  • 测试版本:confluentinc/cp-kafka:7.0.0

窗口一执行命令:其中blockade kill命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

复制

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

复制

场景结论:

  • 测试版本:confluentinc/cp-kafka:5.5.6

    在发生leader切换后kafka实际收到数据998903,客户端收到实际发送成功数据998833。kafka实际落地的数据大于返回客户端成功的数据,说明有70条数据虽然返回客户端失败,但仍然被写入kafka集群,没有发生数据丢失。

  • 测试版本:confluentinc/cp-kafka:6.2.1

    与confluentinc/cp-kafka:5.5.6测试结果无差异

  • 测试版本:confluentinc/cp-kafka:7.0.0

    与confluentinc/cp-kafka:5.5.6测试结果无差异

测试场景四:

场景设定:

ACKS=1,即生产端只要接收到kafka端topic leader的返回ACK即认为数据已经发送成功,期间将topic的leader与kafka其他节点及zk进行网络隔离。原leader将不能与其他kafka节点及zk进行通讯,但是仍然可以和客户端进行通讯

操作过程:

客户端模拟向kafka发送100000条数据,ack设置为1,期间将topic的leader进行网络隔离

具体命令:

  • 测试版本:confluentinc/cp-kafka:5.5.6

窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起

zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ reset-cluster.sh;create-topic.sh kafka1 test1
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          d8552041dd61    UP      172.17.0.3      NORMAL                
kafka2          7a45eba05d55    UP      172.17.0.4      NORMAL                
kafka3          029c6d8ad695    UP      172.17.0.5      NORMAL                
zk1             6f3918e245ea    UP      172.17.0.2      NORMAL                
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
Created topic test1.
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2
        
当前leader为3,对节点3进行网络隔离
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade partition kafka3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          d8552041dd61    UP      172.17.0.3      NORMAL     2          
kafka2          7a45eba05d55    UP      172.17.0.4      NORMAL     2          
kafka3          029c6d8ad695    UP      172.17.0.5      NORMAL     1          
zk1             6f3918e245ea    UP      172.17.0.2      NORMAL     2

被网络分区的节点029c6d8ad695已经不能与其他节点进行通讯
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ docker exec -it 029c6d8ad695 bash -c "ping 172.17.0.2"            
PING 172.17.0.2 (172.17.0.2) 56(84) bytes of data.
^C
--- 172.17.0.2 ping statistics ---
4 packets transmitted, 0 received, 100% packet loss, time 3053ms

zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ docker exec -it 029c6d8ad695 bash -c "ping 172.17.0.3"
PING 172.17.0.3 (172.17.0.3) 56(84) bytes of data.
^C
--- 172.17.0.3 ping statistics ---
4 packets transmitted, 0 received, 100% packet loss, time 3069ms

zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ docker exec -it 029c6d8ad695 bash -c "ping 172.17.0.4"
PING 172.17.0.4 (172.17.0.4) 56(84) bytes of data.
^C
--- 172.17.0.4 ping statistics ---
6 packets transmitted, 0 received, 100% packet loss, time 5103ms

剩余节点已经发起重新选举,新的leader为1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh  kafka1 test1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 1       Replicas: 3,1,2 Isr: 1,2
        
从新leader处检查kafka集群实际收到的数据
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka1 19092 test1
test1:0:50740

查看原leader节点日志:
原leader kafka3已经不能与其他节点通讯,试图将ISR缩减到3,即自身,但由于不能与zk通信,所以不能成功,一直等待与zk的连接
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade logs kafka3 | tail -50
[2021-11-30 08:33:03,949] WARN Client session timed out, have not heard from server in 19797ms for sessionid 0x1000cfa943c0005 (org.apache.zookeeper.ClientCnxn)
[2021-11-30 08:33:03,949] INFO Client session timed out, have not heard from server in 19797ms for sessionid 0x1000cfa943c0005, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2021-11-30 08:33:05,713] INFO Opening socket connection to server zk1/172.17.0.2:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2021-11-30 08:33:17,254] INFO [Partition test1-0 broker=3] Shrinking ISR from 3,1,2 to 3. Leader: (highWatermark: 49804, endOffset: 100000). Out of sync replicas: (brokerId: 1, endOffset: 50740) (brokerId: 2, endOffset: 49804). (kafka.cluster.Partition)
[2021-11-30 08:33:22,056] WARN Client session timed out, have not heard from server in 18005ms for sessionid 0x1000cfa943c0005 (org.apache.zookeeper.ClientCnxn)
[2021-11-30 08:33:22,056] INFO Client session timed out, have not heard from server in 18005ms for sessionid 0x1000cfa943c0005, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2021-11-30 08:33:22,157] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2021-11-30 08:33:23,200] INFO Opening socket connection to server zk1/172.17.0.2:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2021-11-30 08:33:41,205] WARN Client session timed out, have not heard from server in 19049ms for sessionid 0x1000cfa943c0005 (org.apache.zookeeper.ClientCnxn)
...

查看其他节点日志
从原leader kafka3同步数据出错,停止数据复制并将复制关系去除。重新选举并成为新leader,从offset 50740开始处理数据,选择周期进化到epoch 1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade logs kafka1 | tail -50
 ...
[2021-11-30 08:28:57,308] INFO [RequestSendThread controllerId=1] Controller 1 connected to kafka2:19092 (id: 2 rack: null) for sending state change requests (kafka.controller.RequestSendThread)
[2021-11-30 08:32:50,827] INFO [Controller id=1] Newly added brokers: , deleted brokers: 3, bounced brokers: , all live brokers: 1,2 (kafka.controller.KafkaController)
[2021-11-30 08:32:50,828] INFO [RequestSendThread controllerId=1] Shutting down (kafka.controller.RequestSendThread)
[2021-11-30 08:32:50,829] INFO [RequestSendThread controllerId=1] Stopped (kafka.controller.RequestSendThread)
[2021-11-30 08:32:50,829] INFO [RequestSendThread controllerId=1] Shutdown completed (kafka.controller.RequestSendThread)
[2021-11-30 08:32:50,832] INFO [Controller id=1] Broker failure callback for 3 (kafka.controller.KafkaController)
[2021-11-30 08:32:50,851] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(test1-0) (kafka.server.ReplicaFetcherManager)
[2021-11-30 08:32:50,852] INFO [Partition test1-0 broker=1] test1-0 starts at leader epoch 1 from offset 50740 with high watermark 49804. Previous leader epoch was 0. (kafka.cluster.Partition)
[2021-11-30 08:32:50,858] INFO [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread)
[2021-11-30 08:32:50,863] INFO [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Error sending fetch request (sessionId=1277039458, epoch=11432) to node 3: {}. (org.apache.kafka.clients.FetchSessionHandler)
java.io.IOException: Client was shutdown before response was read
        at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:109)
        at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:108)
        at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:206)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:304)
        at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
        at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:119)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2021-11-30 08:32:50,863] INFO [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)
[2021-11-30 08:32:50,864] INFO [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Shutdown completed (kafka.server.ReplicaFetcherThread)
[2021-11-30 08:32:50,882] INFO [Controller id=1] Updated broker epochs cache: Map(1 -> 26, 2 -> 53) (kafka.controller.KafkaController)
[2021-11-30 08:32:56,368] INFO [Controller id=1] Processing automatic preferred replica leader election (kafka.controller.KafkaController)
[2021-11-30 08:32:56,369] INFO [Controller id=1] Starting replica leader election (PREFERRED) for partitions  triggered by AutoTriggered (kafka.controller.KafkaController)
[2021-11-30 08:37:51,249] INFO [GroupMetadataManager brokerId=1] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2021-11-30 08:37:56,370] INFO [Controller id=1] Processing automatic preferred replica leader election (kafka.controller.KafkaController)
[2021-11-30 08:37:56,370] INFO [Controller id=1] Starting replica leader election (PREFERRED) for partitions  triggered by AutoTriggered (kafka.controller.KafkaController)

停止从原leader broker 3复制数据,启动从新leader broker 1的offset49794开始复制数据
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade logs kafka2 | tail -50
...
[2021-11-30 08:32:50,859] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions Set(test1-0) (kafka.server.ReplicaFetcherManager)
[2021-11-30 08:32:50,863] INFO [ReplicaFetcherManager on broker 2] Added fetcher to broker 1 for partitions Map(test1-0 -> (offset=49794, leaderEpoch=1)) (kafka.server.ReplicaFetcherManager)
[2021-11-30 08:32:50,864] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Starting (kafka.server.ReplicaFetcherThread)
[2021-11-30 08:32:50,865] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread)
[2021-11-30 08:32:50,868] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Error sending fetch request (sessionId=1851347235, epoch=11294) to node 3: {}. (org.apache.kafka.clients.FetchSessionHandler)
java.io.IOException: Client was shutdown before response was read
        at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:109)
        at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:108)
        at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:206)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:304)
        at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
        at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:119)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2021-11-30 08:32:50,871] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Shutdown completed (kafka.server.ReplicaFetcherThread)
[2021-11-30 08:32:50,871] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)
[2021-11-30 08:32:50,883] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Truncating to 49804 has no effect as the largest offset in the log is 49803 (kafka.log.Log)
[2021-11-30 08:38:01,701] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

复制

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 100000 0.0001 1 test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
Success: 50000 Failed: 0
Success: 60000 Failed: 0
Success: 70000 Failed: 0
Success: 80000 Failed: 0
Success: 90000 Failed: 0
Success: 100000 Failed: 0
Sent: 100000
Delivered: 100000
Failed: 0

复制
  • 测试版本:confluentinc/cp-kafka:6.2.1

窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

复制

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

复制
  • 测试版本:confluentinc/cp-kafka:7.0.0

窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

复制

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

复制

场景结论:

  • 测试版本:confluentinc/cp-kafka:5.5.6

    原topic leader broker 3在写入数据过程中发生网络隔离,但它并未意识到已经和其他节点及zk已经失联,仍然响应producer端的写入请求。producer端也没有收到写入失败的信息。最终kafka实际收到50740条数据,返回生产端写入失败0条,最终数据丢失(100000-50740-0=49260) 在leader broker发生网络隔离的情况下将导致大量数据丢失

  • 测试版本:confluentinc/cp-kafka:6.2.1

    与confluentinc/cp-kafka:5.5.6测试结果无差异

  • 测试版本:confluentinc/cp-kafka:7.0.0

    与confluentinc/cp-kafka:5.5.6测试结果无差异

测试场景五:

场景设定:

ACKS=all,即生产端需要接收到kafka端topic所有broker的返回ACK才认为数据已经发送成功,期间将topic的leader与kafka其他节点及zk进行网络隔离。原leader将不能与其他kafka节点及zk进行通讯,但是仍然可以和客户端进行通讯

操作过程:

客户端模拟向kafka发送100000条数据,ack设置为all,期间将topic的leader进行网络隔离

具体命令:

  • 测试版本:confluentinc/cp-kafka:5.5.6

窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起

zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ reset-cluster.sh;create-topic.sh kafka1 test1
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          5c098b68a8d9    UP      172.17.0.3      NORMAL                
kafka2          62a38ec7c939    UP      172.17.0.4      NORMAL                
kafka3          f965e49b96bd    UP      172.17.0.5      NORMAL                
zk1             6a94658d45bf    UP      172.17.0.2      NORMAL                
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
Created topic test1.
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade partition kafka3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          5c098b68a8d9    UP      172.17.0.3      NORMAL     2          
kafka2          62a38ec7c939    UP      172.17.0.4      NORMAL     2          
kafka3          f965e49b96bd    UP      172.17.0.5      NORMAL     1          
zk1             6a94658d45bf    UP      172.17.0.2      NORMAL     2          
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka1 test1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 1       Replicas: 3,1,2 Isr: 1,2
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka1 19092 test1
test1:0:36577

zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade logs kafka3 | tail -50
[2021-12-01 06:18:34,639] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions Set(test1-0) (kafka.server.ReplicaFetcherManager)
[2021-12-01 06:18:34,706] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2021-12-01 06:18:34,712] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 37 ms (kafka.log.Log)
[2021-12-01 06:18:34,719] INFO Created log for partition test1-0 in /var/lib/kafka/data/test1-0 with properties {} (kafka.log.LogManager)
[2021-12-01 06:18:34,719] INFO [Partition test1-0 broker=3] No checkpointed highwatermark is found for partition test1-0 (kafka.cluster.Partition)
[2021-12-01 06:18:34,720] INFO [Partition test1-0 broker=3] Log loaded for partition test1-0 with initial high watermark 0 (kafka.cluster.Partition)
[2021-12-01 06:18:34,722] INFO [Partition test1-0 broker=3] test1-0 starts at leader epoch 0 from offset 0 with high watermark 0. Previous leader epoch was -1. (kafka.cluster.Partition)
[2021-12-01 06:18:35,814] INFO I/O exception (java.net.NoRouteToHostException) caught when processing request to {}->http://support-metrics.confluent.io:80: No route to host (Host unreachable) (org.apache.http.impl.execchain.RetryExec)
[2021-12-01 06:18:35,814] INFO Retrying request to {}->http://support-metrics.confluent.io:80 (org.apache.http.impl.execchain.RetryExec)
[2021-12-01 06:18:38,887] ERROR Could not submit metrics to Confluent: No route to host (Host unreachable) (io.confluent.support.metrics.utils.WebClient)
[2021-12-01 06:18:38,887] ERROR Failed to submit metrics to Confluent via insecure endpoint=http://support-metrics.confluent.io/anon -- giving up (io.confluent.support.metrics.submitters.ConfluentSubmitter)
[2021-12-01 06:20:04,723] WARN Client session timed out, have not heard from server in 12006ms for sessionid 0x1000e51d60c0005 (org.apache.zookeeper.ClientCnxn)
[2021-12-01 06:20:04,723] INFO Client session timed out, have not heard from server in 12006ms for sessionid 0x1000e51d60c0005, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2021-12-01 06:20:06,760] INFO Opening socket connection to server zk1/172.17.0.2:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2021-12-01 06:20:24,778] WARN Client session timed out, have not heard from server in 19954ms for sessionid 0x1000e51d60c0005 (org.apache.zookeeper.ClientCnxn)
[2021-12-01 06:20:24,778] INFO Client session timed out, have not heard from server in 19954ms for sessionid 0x1000e51d60c0005, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2021-12-01 06:20:26,534] INFO Opening socket connection to server zk1/172.17.0.2:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2021-12-01 06:20:28,772] INFO [Partition test1-0 broker=3] Shrinking ISR from 3,1,2 to 3. Leader: (highWatermark: 36577, endOffset: 36613). Out of sync replicas: (brokerId: 1, endOffset: 36577) (brokerId: 2, endOffset: 36597). (kafka.cluster.Partition)
[2021-12-01 06:20:42,894] WARN Client session timed out, have not heard from server in 18014ms for sessionid 0x1000e51d60c0005 (org.apache.zookeeper.ClientCnxn)
[2021-12-01 06:20:42,894] INFO Client session timed out, have not heard from server in 18014ms for sessionid 0x1000e51d60c0005, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2021-12-01 06:20:42,996] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2021-12-01 06:20:44,577] INFO Opening socket connection to server zk1/172.17.0.2:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2021-12-01 06:21:02,592] WARN Client session timed out, have not heard from server in 19597ms for sessionid 0x1000e51d60c0005 (org.apache.zookeeper.ClientCnxn)
[2021-12-01 06:21:02,592] INFO Client session timed out, have not heard from server in 19597ms for sessionid 0x1000e51d60c0005, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2021-12-01 06:21:03,809] INFO Opening socket connection to server zk1/172.17.0.2:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
...

zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade logs kafka1 | tail -50
...
[2021-12-01 06:20:12,383] INFO [Controller id=1] Newly added brokers: , deleted brokers: 3, bounced brokers: , all live brokers: 1,2 (kafka.controller.KafkaController)
[2021-12-01 06:20:12,385] INFO [RequestSendThread controllerId=1] Shutting down (kafka.controller.RequestSendThread)
[2021-12-01 06:20:12,385] INFO [RequestSendThread controllerId=1] Stopped (kafka.controller.RequestSendThread)
[2021-12-01 06:20:12,385] INFO [RequestSendThread controllerId=1] Shutdown completed (kafka.controller.RequestSendThread)
[2021-12-01 06:20:12,388] INFO [Controller id=1] Broker failure callback for 3 (kafka.controller.KafkaController)
[2021-12-01 06:20:12,408] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(test1-0) (kafka.server.ReplicaFetcherManager)
[2021-12-01 06:20:12,414] INFO [Partition test1-0 broker=1] test1-0 starts at leader epoch 1 from offset 36577 with high watermark 36577. Previous leader epoch was 0. (kafka.cluster.Partition)
[2021-12-01 06:20:12,421] INFO [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread)
[2021-12-01 06:20:12,427] INFO [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Error sending fetch request (sessionId=1322674560, epoch=8051) to node 3: {}. (org.apache.kafka.clients.FetchSessionHandler)
java.io.IOException: Client was shutdown before response was read
        at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:109)
        at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:108)
        at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:206)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:304)
        at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
        at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:119)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2021-12-01 06:20:12,428] INFO [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)
[2021-12-01 06:20:12,428] INFO [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Shutdown completed (kafka.server.ReplicaFetcherThread)
[2021-12-01 06:20:12,436] INFO [Controller id=1] Updated broker epochs cache: Map(1 -> 26, 2 -> 53) (kafka.controller.KafkaController)
[2021-12-01 06:22:52,664] INFO [Controller id=1] Processing automatic preferred replica leader election (kafka.controller.KafkaController)
[2021-12-01 06:22:52,665] INFO [Controller id=1] Starting replica leader election (PREFERRED) for partitions  triggered by AutoTriggered (kafka.controller.KafkaController)

zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade logs kafka2 | tail -50
...
[2021-12-01 06:20:12,412] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions Set(test1-0) (kafka.server.ReplicaFetcherManager)
[2021-12-01 06:20:12,420] INFO [ReplicaFetcherManager on broker 2] Added fetcher to broker 1 for partitions Map(test1-0 -> (offset=36577, leaderEpoch=1)) (kafka.server.ReplicaFetcherManager)
[2021-12-01 06:20:12,420] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Starting (kafka.server.ReplicaFetcherThread)
[2021-12-01 06:20:12,421] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread)
[2021-12-01 06:20:12,423] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Error sending fetch request (sessionId=347527133, epoch=7998) to node 3: {}. (org.apache.kafka.clients.FetchSessionHandler)
java.io.IOException: Client was shutdown before response was read
        at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:109)
        at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:108)
        at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:206)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:304)
        at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
        at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:119)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2021-12-01 06:20:12,426] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Shutdown completed (kafka.server.ReplicaFetcherThread)
[2021-12-01 06:20:12,437] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Truncating to offset 36577 (kafka.log.Log)
[2021-12-01 06:20:12,439] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Scheduling segments for deletion List() (kafka.log.Log)
[2021-12-01 06:20:12,440] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)
[2021-12-01 06:20:12,444] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Loading producer state till offset 36577 with message format version 2 (kafka.log.Log)
[2021-12-01 06:20:12,445] INFO [ProducerStateManager partition=test1-0] Writing producer snapshot at offset 36577 (kafka.log.ProducerStateManager)
[2021-12-01 06:21:12,440] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Deleting segments List() (kafka.log.Log)

复制

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 100000 0.0001 all test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
...
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
Success: 36577 Failed: 63423
Sent: 100000
Delivered: 36577
Failed: 63423

复制
  • 测试版本:confluentinc/cp-kafka:6.2.1

窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

复制

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

复制
  • 测试版本:confluentinc/cp-kafka:7.0.0

窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起

zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ reset-cluster.sh;create-topic.sh kafka1 test1
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          2642be3c590c    UP      172.17.0.3      NORMAL                
kafka2          b226d706b431    UP      172.17.0.4      NORMAL                
kafka3          6b225d0834be    UP      172.17.0.5      NORMAL                
zk1             dd0128271974    UP      172.17.0.2      NORMAL                
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
Created topic test1.
Topic: test1    TopicId: nO2rXLtXQue3-tqS5skJiQ PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 1       Replicas: 1,3,2 Isr: 1,3,2
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade partition kafka1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          2642be3c590c    UP      172.17.0.3      NORMAL     1          
kafka2          b226d706b431    UP      172.17.0.4      NORMAL     2          
kafka3          6b225d0834be    UP      172.17.0.5      NORMAL     2          
zk1             dd0128271974    UP      172.17.0.2      NORMAL     2          
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka2 test1
Topic: test1    TopicId: nO2rXLtXQue3-tqS5skJiQ PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 3       Replicas: 1,3,2 Isr: 3,2
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka2 19092 test1
test1:0:89649

复制

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 100000 0.0001 all test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
Success: 50000 Failed: 0
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
%5|1638342943.254|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60350ms, timeout #0)
%5|1638342943.254|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60344ms, timeout #1)
%5|1638342943.254|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60338ms, timeout #2)
%5|1638342943.254|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60332ms, timeout #3)
%5|1638342943.254|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60327ms, timeout #4)
...
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
Success: 59638 Failed: 10362
Success: 69638 Failed: 10362
Success: 79638 Failed: 10362
Success: 89638 Failed: 10362
Sent: 100000
Delivered: 89638
Failed: 10362

复制

场景结论:

  • 测试版本:confluentinc/cp-kafka:5.5.6

    相对于场景四acks=1时,在发生leader被网络分区时,producer端会收到明确的发送失败信息

    Producer端成功发送36577条数据,kafka端成功接收36577条数据,未发生数据丢失

    可以看到在ACKS设置为all的情况下,无论leader是发生了节点故障还是网络分区而导致leader发生切换,都可以有效地防止由于leader切换而丢失数据

  • 测试版本:confluentinc/cp-kafka:6.2.1

    与confluentinc/cp-kafka:5.5.6测试结果无差异

  • 测试版本:confluentinc/cp-kafka:7.0.0

    该版本在发生leader分区故障后进行切换的速度快于前面两个版本,因此客户端能够发送更多的数据到达kafka。其余与confluentinc/cp-kafka:5.5.6测试结果无差异

测试场景六:

场景设定:

ACKS=1,即生产端只要接收到kafka端topic leader的返回ACK即认为数据已经发送成功,期间将topic的leader与zk间进行网络隔离。原leader将不能与zk进行通讯,但是仍然可以和客户端进行通讯

操作过程:

客户端模拟向kafka发送100000条数据,ack设置为1,期间将topic的leader与zk间进行网络隔离

具体命令:

  • 测试版本:confluentinc/cp-kafka:5.5.6

窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起

zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ reset-cluster.sh;create-topic.sh kafka1 test1
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          b3d312073984    UP      172.17.0.3      NORMAL                
kafka2          563f88a79dc1    UP      172.17.0.4      NORMAL                
kafka3          5073f138ee27    UP      172.17.0.5      NORMAL                
zk1             3101398f990c    UP      172.17.0.2      NORMAL                
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
Created topic test1.
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1
        
当前leader为2,将网络划分为两个区域,一个区域为kafka1,kafka2,kafka3,另一个为kafka1,kafka3,zk1,因此leader除了不能与zk通讯外,其他通讯是正常的
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade partition kafka1,kafka2,kafka3 kafka1,kafka3,zk1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          b3d312073984    UP      172.17.0.3      NORMAL     2          
kafka2          563f88a79dc1    UP      172.17.0.4      NORMAL     1          
kafka3          5073f138ee27    UP      172.17.0.5      NORMAL     3          
zk1             3101398f990c    UP      172.17.0.2      NORMAL     4   

原leader被网络隔离后,仅与zookeeper失去联系
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ docker exec -it 563f88a79dc1 bash -c "ping 172.17.0.2"            
PING 172.17.0.2 (172.17.0.2) 56(84) bytes of data.
^C
--- 172.17.0.2 ping statistics ---
6 packets transmitted, 0 received, 100% packet loss, time 5111ms

zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ docker exec -it 563f88a79dc1 bash -c "ping 172.17.0.3"
PING 172.17.0.3 (172.17.0.3) 56(84) bytes of data.
64 bytes from 172.17.0.3: icmp_seq=1 ttl=64 time=0.049 ms
64 bytes from 172.17.0.3: icmp_seq=2 ttl=64 time=0.055 ms
64 bytes from 172.17.0.3: icmp_seq=3 ttl=64 time=0.056 ms
^C
--- 172.17.0.3 ping statistics ---
3 packets transmitted, 3 received, 0% packet loss, time 2052ms
rtt min/avg/max/mdev = 0.049/0.053/0.056/0.006 ms
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ docker exec -it 563f88a79dc1 bash -c "ping 172.17.0.5"
PING 172.17.0.5 (172.17.0.5) 56(84) bytes of data.
64 bytes from 172.17.0.5: icmp_seq=1 ttl=64 time=0.056 ms
64 bytes from 172.17.0.5: icmp_seq=2 ttl=64 time=0.080 ms
64 bytes from 172.17.0.5: icmp_seq=3 ttl=64 time=0.109 ms
^C
--- 172.17.0.5 ping statistics ---
3 packets transmitted, 3 received, 0% packet loss, time 2037ms
rtt min/avg/max/mdev = 0.056/0.081/0.109/0.024 ms

剩余的follower重新进行选举,新的leader为3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka1 test1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 3       Replicas: 2,3,1 Isr: 3,1
      
从新leader上检查kafka实际保存的数据
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka3 19092 test1
test1:0:78427

复制

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 100000 0.0001 1 test1  
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
Success: 50000 Failed: 0
Success: 60000 Failed: 0
Success: 70000 Failed: 0
Success: 80000 Failed: 0
Success: 90000 Failed: 0
Success: 100000 Failed: 0
Sent: 100000
Delivered: 100000
Failed: 0

复制
  • 测试版本:confluentinc/cp-kafka:6.2.1

窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

复制

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

复制
  • 测试版本:confluentinc/cp-kafka:7.0.0

窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

复制

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

复制

场景结论:

  • 测试版本:confluentinc/cp-kafka:5.5.6

    该场景和场景四类似,原leader在与zk脱离关系后,zk将会把原leader标记为离线,同时触发新的选举。原leader将仍然接收写请求,同时试图ISR缩减为自身,但由于不能与zk通讯,该操作不能成功。

    区别是其他kafka节点停止向原leader发送fetch request的原因不是由于不能与原leader通讯,而是由于重新选出了新的leader

    Producer端成功发送100000条数据,kafka端成功接收78427条数据,数据丢失21573

  • 测试版本:confluentinc/cp-kafka:6.2.1

    与confluentinc/cp-kafka:5.5.6测试结果无差异

  • 测试版本:confluentinc/cp-kafka:7.0.0

    与confluentinc/cp-kafka:5.5.6测试结果无差异

测试场景七:

场景设定:

ACKS=all,即生产端只要接收到kafka端topic所有broker的返回ACK才认为数据已经发送成功,期间将topic的leader与zk间进行网络隔离。

操作过程:

客户端模拟向kafka发送100000条数据,ack设置为all,期间将topic的leader与zk间进行网络隔离

具体命令:

  • 测试版本:confluentinc/cp-kafka:5.5.6

窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起

zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ reset-cluster.sh;create-topic.sh kafka1 test1
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          825c158231f4    UP      172.17.0.3      NORMAL                
kafka2          fe64d2227f35    UP      172.17.0.4      NORMAL                
kafka3          4f57c56a3893    UP      172.17.0.5      NORMAL                
zk1             bf63b423c3ea    UP      172.17.0.2      NORMAL                
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
Created topic test1.
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 1       Replicas: 1,3,2 Isr: 1,3,2
        
当前leader为1,将网络划分为两个区域,一个区域为kafka1,kafka2,kafka3,另一个为kafka2,kafka3,zk1,因此leader除了不能与zk通讯外,其他通讯是正常的
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade partition kafka1,kafka2,kafka3 kafka2,kafka3,zk1
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ blockade status
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
kafka1          825c158231f4    UP      172.17.0.3      NORMAL     1          
kafka2          fe64d2227f35    UP      172.17.0.4      NORMAL     2          
kafka3          4f57c56a3893    UP      172.17.0.5      NORMAL     3          
zk1             bf63b423c3ea    UP      172.17.0.2      NORMAL     4  

剩余的follower重新进行选举,新leader为3
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-topic-details.sh kafka2 test1
Topic: test1    PartitionCount: 1       ReplicationFactor: 3    Configs: 
        Topic: test1    Partition: 0    Leader: 3       Replicas: 1,3,2 Isr: 3,2
     
从新leader上检查kafka实际存储的数据
zhanyl@zyl:~/ChaosTestingCode/Kafka/cluster$ print-hw.sh kafka3 19092 test1
test1:0:90057

复制

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

zhanyl@zyl:~/ChaosTestingCode/Kafka/client$ python producer.py 100000 0.0001 all test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
Success: 50000 Failed: 0
Success: 60000 Failed: 0
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
%5|1638348015.783|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60167ms, timeout #0)
%5|1638348015.783|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60162ms, timeout #1)
%5|1638348015.783|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60156ms, timeout #2)
%5|1638348015.783|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60150ms, timeout #3)
%5|1638348015.783|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out ProduceRequest in flight (after 60144ms, timeout #4)
%4|1638348015.783|REQTMOUT|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Timed out 29 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
%3|1638348015.783|FAIL|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: 29 request(s) timed out: disconnect (after 92087ms in state UP)
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
...
Success: 70043 Failed: 9957
Success: 80043 Failed: 9957
Success: 90043 Failed: 9957
Sent: 100000
Delivered: 90043
Failed: 9957

复制
  • 测试版本:confluentinc/cp-kafka:6.2.1

窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

复制

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

复制
  • 测试版本:confluentinc/cp-kafka:7.0.0

窗口一执行命令:其中blockade partition命令在窗口二正在写入数据过程中发起

与confluentinc/cp-kafka:5.5.6执行结果无差异

复制

窗口二执行命令:该窗口命令是在完成kafka集群设置及topic创建后执行

与confluentinc/cp-kafka:5.5.6执行结果无差异

复制

场景结论:

  • 测试版本:confluentinc/cp-kafka:5.5.6

    该场景和场景五类似,原leader在与zk脱离关系后,zk将会把原leader标记为离线,同时触发新的选举。原leader将仍然接收写请求,同时试图ISR缩减为自身,但由于不能与zk通讯,该操作不能成功。

    区别是其他kafka节点停止向原leader发送fetch request的原因不是由于不能通讯,而是由于重新选出了新的leader

    Producer端成功发送90043条数据,kafka端成功接收90057条数据,未发生数据丢失

  • 测试版本:confluentinc/cp-kafka:6.2.1

    与confluentinc/cp-kafka:5.5.6测试结果无差异

  • 测试版本:confluentinc/cp-kafka:7.0.0

    与confluentinc/cp-kafka:5.5.6测试结果无差异

作者简介

詹玉林,中国民生银行信息科技部开源软件支持组工程师


文章转载自民生运维人,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论