暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

深入理解Kafka客户端之服务端响应及超时请求的处理

大数据记事本 2020-11-15
5257
一、场景分析
    上一篇分析了客户端如何将消息发送给服务端,当封装的请求发送出去后,有两种结果:
  • 服务端收到请求,并返回响应。响应也有两种情况:
    • 正常写入消息,返回正常的响应
    • 发生异常,返回异常的响应
  • 服务端未返回响应,直到超时
    针对上面的几种情况,Kafka客户端是如何处理的呢?这篇进行详细的分析。
二、图示说明

三、过程源码分析
    在之前的发送消息的流程中,当客户端和指定的节点建立网络连接后,会移除对应KafkaChannel上注册的OP_CONNECT事件,同时绑定一个OP_READ事件,用来读取服务端返回的响应。
    在发送消息时,通过Sender.run -> Sender.runOnce -> NetworkClient.poll -> Selector.poll 方法的调用链,完成了消息的发送。真正发送消息的是Selector的poll方法,那么我们继续从poll方法看起。
    Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();
    ...
    pollSelectionKeys(readyKeys, false, endSelect);
    复制

    1.1 通过selectionKeys()方法,获取了所有准备好的SelectionKey的集合,然后通过pollSelectionKeys()方法进行处理。该方法主要遍历所有的selectionKey,然后根据注册的不同事件进行处理。这里客户端要读取响应,那么重点看读事件对应的逻辑操作,这里调用了attemptRead方法:

      //如果是处理返回的响应,走这个方法
      attemptRead(key, channel);
      复制
        private void attemptRead(SelectionKey key, KafkaChannel channel) throws IOException {
        //如果是读请求
        //hasStagedReceive(channel)判断指定那个channel连接是否有接收到但是还未处理的响应
        if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel)
        && !explicitlyMutedChannels.contains(channel)) {
        //接收服务端的响应(本质也是一个请求)
        //NetworkReceive代表的就是服务端返回来的响应
        NetworkReceive networkReceive;
        //channel.read方法不断读取数据
        while ((networkReceive = channel.read()) != null) {
        //madeReadProgressLastPoll用来标记前一次对poll方法的调用是否能够读到已经缓存的数据,即NetworkReceive是否为null
        madeReadProgressLastPoll = true;
        //不断地读取数据,将这个响应放到stagedReceive队列中
        addToStagedReceives(channel, networkReceive);
        }
        if (channel.isMute()) {//如果channel中内存满了
        outOfMemory = true; //channel has muted itself due to memory pressure.
        } else {
        madeReadProgressLastPoll = true;
        }
        }
        }
        复制

        在该方法中可以看到,KafkaChannel.read()方法返回NetworkReceive响应对象,通过while循环不断地读取响应,然后通过addToStagedReceives方法将该响应对象放到stagedReceive结构中:

        stagedReceive是一个Map结构,存放了节点连接和对应的响应队列:

        Map<KafkaChannel, Deque<NetworkReceive>>

          private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
          //channel代表的就是一个网络的连接,一台kafka节点对应一个channel连接
          /**
          * 如果stagesReceives结构中已经有指定的channel,那么就拿到对应的响应队列,将NetworkReceive响应对象放进去
          * 如果没有指定的channel,就创建一个新的队列,然后把响应对象放进去
          */
          if (!stagedReceives.containsKey(channel))
          stagedReceives.put(channel, new ArrayDeque<>());


          Deque<NetworkReceive> deque = stagedReceives.get(channel);
          deque.add(receive);
          }
          复制
          1.2 在Selecter.poll方法的最后,通过addToCompletedReceives()方法将上面stagedReceive各个队列中的NetworkReceives对象放入completedReceives集合中
            addToCompletedReceives();
            复制
              private void addToCompletedReceives() {
              //如果stagedReceives不为空,说明已经接收到了响应
              if (!this.stagedReceives.isEmpty()) {
              Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator();
              //遍历
              while (iter.hasNext()) {
              Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();
              KafkaChannel channel = entry.getKey();
              //
              if (!explicitlyMutedChannels.contains(channel)) {
              //获取KafkaChanenl对应到NetworkReceive队列
              Deque<NetworkReceive> deque = entry.getValue();
              //从这个队列放中取出一个响应对象放到completedReceives集合中
              addToCompletedReceives(channel, deque);
              if (deque.isEmpty())
              iter.remove();
              }
              }
              }
              }
              复制
              1.3 回退到NetworkClient.poll方法:
                public List<ClientResponse> poll(long timeout, long now) {
                ...
                //TODO 步骤一:封装一个拉取元数据的请求
                long metadataTimeout = metadataUpdater.maybeUpdate(now);
                try {
                //TODO 步骤二:发送请求,进行复杂的网络操作,这里用的就是java的NIO
                this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
                } catch (IOException e) {
                log.error("Unexpected error during I/O", e);
                    }
                    
                long updatedNow = this.time.milliseconds();
                    //新建一个响应的集合
                List<ClientResponse> responses = new ArrayList<>();
                //这里默认什么都不执行
                handleCompletedSends(responses, updatedNow);
                //将构建的ClientResponse放到responses集合中
                handleCompletedReceives(responses, updatedNow);


                handleDisconnections(responses, updatedNow);


                handleConnections();


                handleInitiateApiVersionRequests(updatedNow);
                //处理超时的请求
                handleTimedOutRequests(responses, updatedNow);
                //TODO 步骤三:处理响应
                // 如果是获取集群元数据的请求,那么获取的响应中就包含集群元数据
                completeResponses(responses);


                return responses;
                }
                复制

                执行完selector.poll方法后,新建了一个ClientResponse类型的集合response,然后调用了handleCompletedReceives方法,逻辑如下:

                  private void handleCompletedReceives(List<ClientResponse> responses, long now) {
                  //遍历completedReceives集合中的响应对象NetworkReceive
                  for (NetworkReceive receive : this.selector.completedReceives()) {
                  //获取broker id
                  String source = receive.source();
                  //从inFlightRequests集合中对应的inFlightRequest队列中移除已经获取响应的请求
                  InFlightRequest req = inFlightRequests.completeNext(source);
                  //解析服务端返回的请求
                  Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
                  throttleTimeSensor, now);
                  if (log.isTraceEnabled()) {
                  log.trace("Completed receive from node {} for {} with correlation id {}, received {}", req.destination,
                  req.header.apiKey(), req.header.correlationId(), responseStruct);
                          }
                  AbstractResponse body = AbstractResponse.
                  parseResponse(req.header.apiKey(), responseStruct, req.header.apiVersion());
                  maybeThrottle(body, req.header.apiVersion(), req.destination, now);
                  //TODO 如果是关于元数据信息的响应
                  if (req.isInternalRequest && body instanceof MetadataResponse)
                  metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
                  else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
                  handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
                  else
                  //构建ClientResponse对象并放到responses集合中,body就是响应的内容
                  responses.add(req.completed(body, now));
                  }
                  }
                  复制

                  该方法的逻辑是:

                  1. 遍历上一步获取到的NetworkReceive集合(completedReceives)中的响应对象

                  2. 获取响应对应的节点,将发往该节点最早的请求从inFlightRequests移除(因为已经收到了响应)

                  3. 解析服务端返回的响应,因为是二进制的

                  4. 根据返回的响应构建ClientResponse对象,并存放到responses集合中

                  1.4 在NetworkClient.poll方法中,通过调用handleTimedOutRequests方法处理超时的请求:
                    handleTimedOutRequests(responses, updatedNow);
                    复制
                      private void handleTimedOutRequests(List<ClientResponse> responses, long now) {
                      //获取有超时请求的节点id的集合
                      List<String> nodeIds = this.inFlightRequests.nodesWithTimedOutRequests(now);
                      for (String nodeId : nodeIds) {
                      // close connection to the node
                              //关闭和该节点的连接
                      this.selector.close(nodeId);
                      log.debug("Disconnecting from node {} due to request timeout.", nodeId);


                      processDisconnection(responses, nodeId, now, ChannelState.LOCAL_CLOSE);
                      }


                      // we disconnected, so we should probably refresh our metadata
                      if (!nodeIds.isEmpty())
                      metadataUpdater.requestUpdate();
                      }
                      复制

                      获取所有有超时请求的节点id,这里判断是否超时的标准是:当前时间-请求的创建时间 > 请求的超时时间(默认30秒)

                        List<String> nodeIds = this.inFlightRequests.nodesWithTimedOutRequests(now);
                        复制

                        关闭客户端和有超时请求的节点的连接:

                          this.selector.close(nodeId);
                          复制

                          执行processDisconnection方法:

                            private void processDisconnection(List<ClientResponse> responses,
                            String nodeId,
                            long now,
                            ChannelState disconnectState) {
                            //将这个节点的连接状态修改为DISCONNECTED
                            connectionStates.disconnected(nodeId, now);
                            apiVersions.remove(nodeId);
                            nodesNeedingApiVersionsFetch.remove(nodeId);
                            //根据传入都参数,状态是LOCAL_CLOSE,走default,break
                            switch (disconnectState.state()) {
                            case AUTHENTICATION_FAILED:
                            AuthenticationException exception = disconnectState.exception();
                            connectionStates.authenticationFailed(nodeId, now, exception);
                            metadataUpdater.handleFatalException(exception);
                            log.error("Connection to node {} ({}) failed authentication due to: {}", nodeId,
                            disconnectState.remoteAddress(), exception.getMessage());
                            break;
                            case AUTHENTICATE:
                            log.warn("Connection to node {} ({}) terminated during authentication. This may happen " +
                            "due to any of the following reasons: (1) Authentication failed due to invalid " +
                            "credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS " +
                            "traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.",
                            nodeId, disconnectState.remoteAddress());
                            break;
                            case NOT_CONNECTED:
                            log.warn("Connection to node {} ({}) could not be established. Broker may not be available.", nodeId, disconnectState.remoteAddress());
                            break;
                            default:
                            break; // Disconnections in other states are logged at debug level in Selector
                            }
                            //清空inFlightRequests中该节点所有批次
                            for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId)) {
                            log.trace("Cancelled request {} {} with correlation id {} due to node {} being disconnected",
                            request.header.apiKey(), request.request, request.header.correlationId(), nodeId);
                            //isInternalRequest默认为false
                            if (!request.isInternalRequest)
                            //构建一个没有响应体的响应并添加到responses集合中
                            responses.add(request.disconnected(now, disconnectState.exception()));
                            else if (request.header.apiKey() == ApiKeys.METADATA)
                            metadataUpdater.handleDisconnection(request.destination);
                            }
                            }
                            复制

                            该方法的逻辑是:

                            1. 将关闭连接的节点的状态改为DISCONNECTED

                            2. 清空inFlightRequests中该节点对应的所有inFlightRequest请求

                            3. 每个inFlightRequest请求构建一个没有响应体的ClientResponse对象,并放入responses集合

                            更新元数据:

                              metadataUpdater.requestUpdate();
                              复制

                              至此,不管是服务的返回响应的请求,还是超时的请求,都封装了一个ClientResponse对象,并保存到了responses集合中。

                              1.5 在NetworkClient.poll方法中,通过调用completeResponses方法处理响应(包括服务端返回的响应和发送超时的响应)

                                completeResponses(responses)
                                复制
                                  private void completeResponses(List<ClientResponse> responses) {
                                  //遍历响应对象
                                  for (ClientResponse response : responses) {
                                  try {
                                  //对响应进行处理
                                  response.onComplete();
                                  } catch (Exception e) {
                                  log.error("Uncaught error in request completion:", e);
                                  }
                                  }
                                  }
                                  复制
                                    public void onComplete() {
                                    //如果绑定了回调函数
                                    if (callback != null)
                                    //调用回调函数的onComplete方法对响应进行处理
                                    callback.onComplete(this);
                                    }
                                    复制
                                    处理响应的方法就是:根据绑定的回调函数进行处理。
                                        那么问题来了,这个回调函数是什么时候绑定的呢?我们返回来看一下响应对象ClientResponse是如何生成的:
                                    • 对于正常返回的响应,是调用了InFlightRequest.completed方法

                                    • 对于超时的响应,是调用了InFlightRequest.disconnected方法

                                        这两个方法调用的共同点是并没有传入callback参数,所以用的就是InFlightRequest的callback属性。这样来看,这个callback回调函数是构建InFlightRequest对象时给定的,而这个对象是在构建发送消息的请求时创建的。所以找到创建InFlightRequest对象的方法,在NetWorkClient.doSend,如下:

                                      private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
                                          ...
                                      //封装一个inFlightRequest请求
                                      InFlightRequest inFlightRequest = new InFlightRequest(
                                      clientRequest,
                                      header,
                                      isInternalRequest,
                                      request,
                                      send,
                                      now);
                                      ...
                                      }
                                      复制
                                      这里回调函数是参数clientRequest对象的一个属性,继续查找创建clientRequest对象的代码,在Sender.sendProducerRequest方法中,该方法的调用链是:Sender.runOnce() -> sendProducerData() -> sendProducerRequests() -> sendProducerRequest,具体代码如下(截取部分):
                                        private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
                                            ...
                                        //构建请求的回调函数
                                        RequestCompletionHandler callback = new RequestCompletionHandler() {
                                        public void onComplete(ClientResponse response) {
                                        handleProduceResponse(response, recordsByPartition, time.milliseconds());
                                        }
                                        };


                                        String nodeId = Integer.toString(destination);
                                        //TODO 构建发送数据的请求:ClientRequest
                                        ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
                                                    requestTimeoutMs, callback);
                                        }
                                        复制
                                        可以看到回调函数是在这个方法中构建的,下面看一下这个回调函数的具体内容,即handleProduceResponse方法:
                                          private void handleProduceResponse(ClientResponse response, Map<TopicPartition, ProducerBatch> batches, long now) {
                                          //获取响应的请求头
                                          RequestHeader requestHeader = response.requestHeader();
                                          long receivedTimeMs = response.receivedTimeMs();


                                          int correlationId = requestHeader.correlationId();
                                          //特殊情况,真正要发送请求了,但是broker失去连接了
                                              //超时的响应走这里
                                          if (response.wasDisconnected()) {
                                          log.trace("Cancelled request with header {} due to node {} being disconnected",
                                          requestHeader, response.destination());
                                          for (ProducerBatch batch : batches.values())
                                          completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, now, 0L);
                                          //如果是版本不匹配导致无法发送请求的响应
                                          } else if (response.versionMismatch() != null) {
                                          log.warn("Cancelled request {} due to a version mismatch with node {}",
                                          response, response.destination(), response.versionMismatch());
                                          for (ProducerBatch batch : batches.values())
                                          completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.UNSUPPORTED_VERSION), correlationId, now, 0L);
                                          //正常的响应走这里
                                          } else {
                                          log.trace("Received produce response from node {} with correlation id {}", response.destination(), correlationId);
                                          // if we have a response, parse it
                                          //如果有响应,解析,正常情况走的都是这个分支
                                                  if (response.hasResponse()) {
                                          ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
                                          //遍历每个分区的响应,因为不同分区的leader副本可能在同一个节点,那么发送请求时就有多个分区的批次,这里获取每个分区对应的响应
                                          for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
                                          TopicPartition tp = entry.getKey();
                                          //获取每个分区的响应
                                          ProduceResponse.PartitionResponse partResp = entry.getValue();
                                          //获取每个分区的批次对象
                                          ProducerBatch batch = batches.get(tp);
                                          //处理对应分区的响应
                                          completeBatch(batch, partResp, correlationId, now, receivedTimeMs + produceResponse.throttleTimeMs());
                                          }
                                          this.sensors.recordLatency(response.destination(), response.requestLatencyMs());
                                                  } else {
                                          //如果ack=0,即不需要返回响应,即客户端发送完就不管了,不管有没有响应
                                          for (ProducerBatch batch : batches.values()) {


                                          completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NONE), correlationId, now, 0L);
                                          }
                                          }
                                          }
                                          }
                                          复制
                                          对于超时的响应,执行:
                                            completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, now, 0L);
                                            复制
                                            对于正常返回的响应,执行:
                                              completeBatch(batch, partResp, correlationId, now, receivedTimeMs + produceResponse.throttleTimeMs());
                                              复制
                                              最终执行的都是completeBatch方法,如下:
                                                private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
                                                long now, long throttleUntilTimeMs) {
                                                //获取响应中的error对象
                                                Errors error = response.error;
                                                //消息过大异常
                                                if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 && !batch.isDone() &&
                                                (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) {
                                                            ...
                                                //如果有其它异常
                                                } else if (error != Errors.NONE) {
                                                //如果可以重试
                                                if (canRetry(batch, response, now)) {
                                                log.warn(
                                                "Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
                                                correlationId,
                                                batch.topicPartition,
                                                this.retries - batch.attempts() - 1,
                                                error);
                                                if (transactionManager == null) {
                                                //重新把发送失败的批次加入到队列中
                                                reenqueueBatch(batch, now);
                                                                } else if (transactionManager.hasProducerIdAndEpoch(batch.producerId(), batch.producerEpoch())) {
                                                log.debug("Retrying batch to topic-partition {}. ProducerId: {}; Sequence number : {}",
                                                batch.topicPartition, batch.producerId(), batch.baseSequence());
                                                reenqueueBatch(batch, now);
                                                } else {
                                                failBatch(batch, response, new OutOfOrderSequenceException("Attempted to retry sending a " +
                                                "batch but the producer id changed from " + batch.producerId() + " to " +
                                                transactionManager.producerIdAndEpoch().producerId + " in the mean time. This batch will be dropped."), false);
                                                }
                                                            } else if (error == Errors.DUPLICATE_SEQUENCE_NUMBER) {
                                                completeBatch(batch, response);
                                                //如果无法重试:1。不允许重试;2。重试次数超了
                                                } else {
                                                //构建一个RuntimeException实例
                                                final RuntimeException exception;
                                                //如果响应中带有Topic没有权限的异常
                                                if (error == Errors.TOPIC_AUTHORIZATION_FAILED)
                                                exception = new TopicAuthorizationException(batch.topicPartition.topic());
                                                //如果响应中带有Cluster没有权限的异常
                                                else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED)
                                                exception = new ClusterAuthorizationException("The producer is not authorized to do idempotent sends");
                                                else
                                                                    exception = error.exception();
                                                //调用回调函数处理响应,标记批次的状态并释放内存
                                                failBatch(batch, response, exception, batch.attempts() < this.retries);
                                                }
                                                //如果是元数据无效的异常
                                                if (error.exception() instanceof InvalidMetadataException) {
                                                //未知topic或者partition异常
                                                if (error.exception() instanceof UnknownTopicOrPartitionException) {
                                                log.warn("Received unknown topic or partition error in produce request on partition {}. The " +
                                                "topic-partition may not exist or the user may not have Describe access to it",
                                                batch.topicPartition);
                                                } else {
                                                log.warn("Received invalid metadata error in produce request on partition {} due to {}. Going " +
                                                "to request metadata update now", batch.topicPartition, error.exception().toString());
                                                }
                                                //更新元数据
                                                metadata.requestUpdate();
                                                }
                                                //如果没有异常
                                                } else {
                                                completeBatch(batch, response);
                                                        }
                                                if (guaranteeMessageOrder)
                                                this.accumulator.unmutePartition(batch.topicPartition, throttleUntilTimeMs);
                                                }
                                                复制
                                                • 如果是消息过大的异常,则对大的消息进行切分,放回缓存中,然后移除inFlightRequests中对应的请求并释放内存

                                                • 如果是可重试的异常,则进行重试。更新已经重试的次数,将批次放回缓存中对应的Deque中,然后移除inFlightRequests中对应的请求

                                                • 如果无法重试,则根据不同的异常类型封装RuntimeException对象,然后调用failBach方法。无法重试有两种情况:

                                                  • 不允许重试

                                                  • 超过了重试的次数

                                                • 如果是元数据无效的异常,则更新元数据。

                                                • 如果没有异常,则执行completeBatch方法

                                                下面具体看一下failBatch方法和completeBatch方法:
                                                failBatch:
                                                  private void failBatch(ProducerBatch batch,
                                                  long baseOffset,
                                                  long logAppendTime,
                                                  RuntimeException exception,
                                                  boolean adjustSequenceNumbers) {
                                                  if (transactionManager != null) {
                                                  transactionManager.handleFailedBatch(batch, exception, adjustSequenceNumbers);
                                                  }


                                                  this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);


                                                  //如果这个批次还未标记状态,则标记状态(失败或者成功)
                                                  if (batch.done(baseOffset, logAppendTime, exception)) {
                                                  //归还内存池的内存并从inFlightBatches集合中移除
                                                  maybeRemoveAndDeallocateBatch(batch);
                                                  }
                                                  }
                                                  复制

                                                  这里的batch.done是给批次标记一个状态,由于这里exception不为null,所以标记为FAILED,即这个批次失败了。然后遍历批次中的消息,执行我们生产消息时绑定的那个回调函数在有异常情况下的逻辑。

                                                  最后将该批次对应的从inFlightBatches中移除,并释放批次占用的内存。

                                                  completeBatch:
                                                    private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response) {
                                                    if (transactionManager != null) {
                                                    transactionManager.handleCompletedBatch(batch, response);
                                                    }


                                                    //标记批次的状态:成功
                                                    if (batch.done(response.baseOffset, response.logAppendTime, null)) {
                                                    //移除批次并释放内存
                                                    maybeRemoveAndDeallocateBatch(batch);
                                                    }
                                                    }
                                                    复制
                                                    标记批次状态为SUCCEEDED,然后遍历批次中的消息,执行我们生产消息时绑定的那个回调函数在没有异常情况下的逻辑。

                                                    最后将该批次对应的从inFlightBatches中移除,并释放批次占用的内存。

                                                    总结:

                                                    • 客户端通过注册的OP_READ事件,不断读取服务端返回的响应,将读取到的响应封装成NetworkReceive对象并放入stagedReceive结构
                                                    • 从stagedReceive结构中取出NetworkReceive对象,放入completedReceives集合中
                                                    • 从completedReceives集合中取出NetworkReceive对象,进行解析,并封装成ClientResongse对象,放入responses集合
                                                    • 对于超时的请求,首先断开和目标节点的网络连接,标记该节点为DISCONNECTED状态,然后封装一个没有响应体的ClientResponse对象,放入responses集合
                                                    • 不管是收到响应的请求还是超时的请求,最后都从InFlightRequests结构中将该请求移除
                                                    • 遍历responses中的响应对象,执行其回调方法
                                                    • 回调方法是在构建请求的时候绑定的,针对不同的结果执行不同的逻辑:
                                                      • 对于可重试的异常,进行重试,将该批次放回缓存队列进行重新发送
                                                      • 对于不可重试的异常,封装一个RuntimeException异常对象,然后释放内存,标记该批次为FAILED
                                                      • 对于元数据无效的异常,则重新更新元数据
                                                      • 对于消息过大的异常,则进行批次切分,重新放回缓存队列
                                                      • 对于正常的响应,则直接释放掉内存,标记该批次为SUCCEEDED
                                                    • 对于已经完成状态标记的批次,将该批次从InFlightBatches中移除,然后遍历批次中的消息,执行生产消息时绑定的回调函数
                                                    文章转载自大数据记事本,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                    评论