服务端收到请求,并返回响应。响应也有两种情况: 正常写入消息,返回正常的响应 发生异常,返回异常的响应 服务端未返回响应,直到超时
Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();
...
pollSelectionKeys(readyKeys, false, endSelect);
复制
1.1 通过selectionKeys()方法,获取了所有准备好的SelectionKey的集合,然后通过pollSelectionKeys()方法进行处理。该方法主要遍历所有的selectionKey,然后根据注册的不同事件进行处理。这里客户端要读取响应,那么重点看读事件对应的逻辑操作,这里调用了attemptRead方法:
//如果是处理返回的响应,走这个方法
attemptRead(key, channel);
复制
private void attemptRead(SelectionKey key, KafkaChannel channel) throws IOException {
//如果是读请求
//hasStagedReceive(channel)判断指定那个channel连接是否有接收到但是还未处理的响应
if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel)
&& !explicitlyMutedChannels.contains(channel)) {
//接收服务端的响应(本质也是一个请求)
//NetworkReceive代表的就是服务端返回来的响应
NetworkReceive networkReceive;
//channel.read方法不断读取数据
while ((networkReceive = channel.read()) != null) {
//madeReadProgressLastPoll用来标记前一次对poll方法的调用是否能够读到已经缓存的数据,即NetworkReceive是否为null
madeReadProgressLastPoll = true;
//不断地读取数据,将这个响应放到stagedReceive队列中
addToStagedReceives(channel, networkReceive);
}
if (channel.isMute()) {//如果channel中内存满了
outOfMemory = true; //channel has muted itself due to memory pressure.
} else {
madeReadProgressLastPoll = true;
}
}
}
复制
在该方法中可以看到,KafkaChannel.read()方法返回NetworkReceive响应对象,通过while循环不断地读取响应,然后通过addToStagedReceives方法将该响应对象放到stagedReceive结构中:
stagedReceive是一个Map结构,存放了节点连接和对应的响应队列:
Map<KafkaChannel, Deque<NetworkReceive>>
private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
//channel代表的就是一个网络的连接,一台kafka节点对应一个channel连接
/**
* 如果stagesReceives结构中已经有指定的channel,那么就拿到对应的响应队列,将NetworkReceive响应对象放进去
* 如果没有指定的channel,就创建一个新的队列,然后把响应对象放进去
*/
if (!stagedReceives.containsKey(channel))
stagedReceives.put(channel, new ArrayDeque<>());
Deque<NetworkReceive> deque = stagedReceives.get(channel);
deque.add(receive);
}
复制
addToCompletedReceives();
复制
private void addToCompletedReceives() {
//如果stagedReceives不为空,说明已经接收到了响应
if (!this.stagedReceives.isEmpty()) {
Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator();
//遍历
while (iter.hasNext()) {
Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();
KafkaChannel channel = entry.getKey();
//
if (!explicitlyMutedChannels.contains(channel)) {
//获取KafkaChanenl对应到NetworkReceive队列
Deque<NetworkReceive> deque = entry.getValue();
//从这个队列放中取出一个响应对象放到completedReceives集合中
addToCompletedReceives(channel, deque);
if (deque.isEmpty())
iter.remove();
}
}
}
}
复制
public List<ClientResponse> poll(long timeout, long now) {
...
//TODO 步骤一:封装一个拉取元数据的请求
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
//TODO 步骤二:发送请求,进行复杂的网络操作,这里用的就是java的NIO
this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
long updatedNow = this.time.milliseconds();
//新建一个响应的集合
List<ClientResponse> responses = new ArrayList<>();
//这里默认什么都不执行
handleCompletedSends(responses, updatedNow);
//将构建的ClientResponse放到responses集合中
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleInitiateApiVersionRequests(updatedNow);
//处理超时的请求
handleTimedOutRequests(responses, updatedNow);
//TODO 步骤三:处理响应
// 如果是获取集群元数据的请求,那么获取的响应中就包含集群元数据
completeResponses(responses);
return responses;
}
复制
执行完selector.poll方法后,新建了一个ClientResponse类型的集合response,然后调用了handleCompletedReceives方法,逻辑如下:
private void handleCompletedReceives(List<ClientResponse> responses, long now) {
//遍历completedReceives集合中的响应对象NetworkReceive
for (NetworkReceive receive : this.selector.completedReceives()) {
//获取broker id
String source = receive.source();
//从inFlightRequests集合中对应的inFlightRequest队列中移除已经获取响应的请求
InFlightRequest req = inFlightRequests.completeNext(source);
//解析服务端返回的请求
Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
throttleTimeSensor, now);
if (log.isTraceEnabled()) {
log.trace("Completed receive from node {} for {} with correlation id {}, received {}", req.destination,
req.header.apiKey(), req.header.correlationId(), responseStruct);
}
AbstractResponse body = AbstractResponse.
parseResponse(req.header.apiKey(), responseStruct, req.header.apiVersion());
maybeThrottle(body, req.header.apiVersion(), req.destination, now);
//TODO 如果是关于元数据信息的响应
if (req.isInternalRequest && body instanceof MetadataResponse)
metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
else
//构建ClientResponse对象并放到responses集合中,body就是响应的内容
responses.add(req.completed(body, now));
}
}
复制
该方法的逻辑是:
遍历上一步获取到的NetworkReceive集合(completedReceives)中的响应对象
获取响应对应的节点,将发往该节点最早的请求从inFlightRequests移除(因为已经收到了响应)
解析服务端返回的响应,因为是二进制的
根据返回的响应构建ClientResponse对象,并存放到responses集合中
handleTimedOutRequests(responses, updatedNow);
复制
private void handleTimedOutRequests(List<ClientResponse> responses, long now) {
//获取有超时请求的节点id的集合
List<String> nodeIds = this.inFlightRequests.nodesWithTimedOutRequests(now);
for (String nodeId : nodeIds) {
// close connection to the node
//关闭和该节点的连接
this.selector.close(nodeId);
log.debug("Disconnecting from node {} due to request timeout.", nodeId);
processDisconnection(responses, nodeId, now, ChannelState.LOCAL_CLOSE);
}
// we disconnected, so we should probably refresh our metadata
if (!nodeIds.isEmpty())
metadataUpdater.requestUpdate();
}
复制
获取所有有超时请求的节点id,这里判断是否超时的标准是:当前时间-请求的创建时间 > 请求的超时时间(默认30秒)
List<String> nodeIds = this.inFlightRequests.nodesWithTimedOutRequests(now);
复制
关闭客户端和有超时请求的节点的连接:
this.selector.close(nodeId);
复制
执行processDisconnection方法:
private void processDisconnection(List<ClientResponse> responses,
String nodeId,
long now,
ChannelState disconnectState) {
//将这个节点的连接状态修改为DISCONNECTED
connectionStates.disconnected(nodeId, now);
apiVersions.remove(nodeId);
nodesNeedingApiVersionsFetch.remove(nodeId);
//根据传入都参数,状态是LOCAL_CLOSE,走default,break
switch (disconnectState.state()) {
case AUTHENTICATION_FAILED:
AuthenticationException exception = disconnectState.exception();
connectionStates.authenticationFailed(nodeId, now, exception);
metadataUpdater.handleFatalException(exception);
log.error("Connection to node {} ({}) failed authentication due to: {}", nodeId,
disconnectState.remoteAddress(), exception.getMessage());
break;
case AUTHENTICATE:
log.warn("Connection to node {} ({}) terminated during authentication. This may happen " +
"due to any of the following reasons: (1) Authentication failed due to invalid " +
"credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS " +
"traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.",
nodeId, disconnectState.remoteAddress());
break;
case NOT_CONNECTED:
log.warn("Connection to node {} ({}) could not be established. Broker may not be available.", nodeId, disconnectState.remoteAddress());
break;
default:
break; // Disconnections in other states are logged at debug level in Selector
}
//清空inFlightRequests中该节点所有批次
for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId)) {
log.trace("Cancelled request {} {} with correlation id {} due to node {} being disconnected",
request.header.apiKey(), request.request, request.header.correlationId(), nodeId);
//isInternalRequest默认为false
if (!request.isInternalRequest)
//构建一个没有响应体的响应并添加到responses集合中
responses.add(request.disconnected(now, disconnectState.exception()));
else if (request.header.apiKey() == ApiKeys.METADATA)
metadataUpdater.handleDisconnection(request.destination);
}
}
复制
该方法的逻辑是:
将关闭连接的节点的状态改为DISCONNECTED
清空inFlightRequests中该节点对应的所有inFlightRequest请求
每个inFlightRequest请求构建一个没有响应体的ClientResponse对象,并放入responses集合
更新元数据:
metadataUpdater.requestUpdate();
复制
至此,不管是服务的返回响应的请求,还是超时的请求,都封装了一个ClientResponse对象,并保存到了responses集合中。
1.5 在NetworkClient.poll方法中,通过调用completeResponses方法处理响应(包括服务端返回的响应和发送超时的响应)
completeResponses(responses)
复制
private void completeResponses(List<ClientResponse> responses) {
//遍历响应对象
for (ClientResponse response : responses) {
try {
//对响应进行处理
response.onComplete();
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
}
复制
public void onComplete() {
//如果绑定了回调函数
if (callback != null)
//调用回调函数的onComplete方法对响应进行处理
callback.onComplete(this);
}
复制
对于正常返回的响应,是调用了InFlightRequest.completed方法
对于超时的响应,是调用了InFlightRequest.disconnected方法
这两个方法调用的共同点是并没有传入callback参数,所以用的就是InFlightRequest的callback属性。这样来看,这个callback回调函数是构建InFlightRequest对象时给定的,而这个对象是在构建发送消息的请求时创建的。所以找到创建InFlightRequest对象的方法,在NetWorkClient.doSend,如下:
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
...
//封装一个inFlightRequest请求
InFlightRequest inFlightRequest = new InFlightRequest(
clientRequest,
header,
isInternalRequest,
request,
send,
now);
...
}
复制
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
...
//构建请求的回调函数
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};
String nodeId = Integer.toString(destination);
//TODO 构建发送数据的请求:ClientRequest
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
requestTimeoutMs, callback);
}
复制
private void handleProduceResponse(ClientResponse response, Map<TopicPartition, ProducerBatch> batches, long now) {
//获取响应的请求头
RequestHeader requestHeader = response.requestHeader();
long receivedTimeMs = response.receivedTimeMs();
int correlationId = requestHeader.correlationId();
//特殊情况,真正要发送请求了,但是broker失去连接了
//超时的响应走这里
if (response.wasDisconnected()) {
log.trace("Cancelled request with header {} due to node {} being disconnected",
requestHeader, response.destination());
for (ProducerBatch batch : batches.values())
completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, now, 0L);
//如果是版本不匹配导致无法发送请求的响应
} else if (response.versionMismatch() != null) {
log.warn("Cancelled request {} due to a version mismatch with node {}",
response, response.destination(), response.versionMismatch());
for (ProducerBatch batch : batches.values())
completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.UNSUPPORTED_VERSION), correlationId, now, 0L);
//正常的响应走这里
} else {
log.trace("Received produce response from node {} with correlation id {}", response.destination(), correlationId);
// if we have a response, parse it
//如果有响应,解析,正常情况走的都是这个分支
if (response.hasResponse()) {
ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
//遍历每个分区的响应,因为不同分区的leader副本可能在同一个节点,那么发送请求时就有多个分区的批次,这里获取每个分区对应的响应
for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
TopicPartition tp = entry.getKey();
//获取每个分区的响应
ProduceResponse.PartitionResponse partResp = entry.getValue();
//获取每个分区的批次对象
ProducerBatch batch = batches.get(tp);
//处理对应分区的响应
completeBatch(batch, partResp, correlationId, now, receivedTimeMs + produceResponse.throttleTimeMs());
}
this.sensors.recordLatency(response.destination(), response.requestLatencyMs());
} else {
//如果ack=0,即不需要返回响应,即客户端发送完就不管了,不管有没有响应
for (ProducerBatch batch : batches.values()) {
completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NONE), correlationId, now, 0L);
}
}
}
}
复制
completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, now, 0L);
复制
completeBatch(batch, partResp, correlationId, now, receivedTimeMs + produceResponse.throttleTimeMs());
复制
private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
long now, long throttleUntilTimeMs) {
//获取响应中的error对象
Errors error = response.error;
//消息过大异常
if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 && !batch.isDone() &&
(batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) {
...
//如果有其它异常
} else if (error != Errors.NONE) {
//如果可以重试
if (canRetry(batch, response, now)) {
log.warn(
"Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
correlationId,
batch.topicPartition,
this.retries - batch.attempts() - 1,
error);
if (transactionManager == null) {
//重新把发送失败的批次加入到队列中
reenqueueBatch(batch, now);
} else if (transactionManager.hasProducerIdAndEpoch(batch.producerId(), batch.producerEpoch())) {
log.debug("Retrying batch to topic-partition {}. ProducerId: {}; Sequence number : {}",
batch.topicPartition, batch.producerId(), batch.baseSequence());
reenqueueBatch(batch, now);
} else {
failBatch(batch, response, new OutOfOrderSequenceException("Attempted to retry sending a " +
"batch but the producer id changed from " + batch.producerId() + " to " +
transactionManager.producerIdAndEpoch().producerId + " in the mean time. This batch will be dropped."), false);
}
} else if (error == Errors.DUPLICATE_SEQUENCE_NUMBER) {
completeBatch(batch, response);
//如果无法重试:1。不允许重试;2。重试次数超了
} else {
//构建一个RuntimeException实例
final RuntimeException exception;
//如果响应中带有Topic没有权限的异常
if (error == Errors.TOPIC_AUTHORIZATION_FAILED)
exception = new TopicAuthorizationException(batch.topicPartition.topic());
//如果响应中带有Cluster没有权限的异常
else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED)
exception = new ClusterAuthorizationException("The producer is not authorized to do idempotent sends");
else
exception = error.exception();
//调用回调函数处理响应,标记批次的状态并释放内存
failBatch(batch, response, exception, batch.attempts() < this.retries);
}
//如果是元数据无效的异常
if (error.exception() instanceof InvalidMetadataException) {
//未知topic或者partition异常
if (error.exception() instanceof UnknownTopicOrPartitionException) {
log.warn("Received unknown topic or partition error in produce request on partition {}. The " +
"topic-partition may not exist or the user may not have Describe access to it",
batch.topicPartition);
} else {
log.warn("Received invalid metadata error in produce request on partition {} due to {}. Going " +
"to request metadata update now", batch.topicPartition, error.exception().toString());
}
//更新元数据
metadata.requestUpdate();
}
//如果没有异常
} else {
completeBatch(batch, response);
}
if (guaranteeMessageOrder)
this.accumulator.unmutePartition(batch.topicPartition, throttleUntilTimeMs);
}
复制
如果是消息过大的异常,则对大的消息进行切分,放回缓存中,然后移除inFlightRequests中对应的请求并释放内存
如果是可重试的异常,则进行重试。更新已经重试的次数,将批次放回缓存中对应的Deque中,然后移除inFlightRequests中对应的请求
如果无法重试,则根据不同的异常类型封装RuntimeException对象,然后调用failBach方法。无法重试有两种情况:
不允许重试
超过了重试的次数
如果是元数据无效的异常,则更新元数据。
如果没有异常,则执行completeBatch方法
private void failBatch(ProducerBatch batch,
long baseOffset,
long logAppendTime,
RuntimeException exception,
boolean adjustSequenceNumbers) {
if (transactionManager != null) {
transactionManager.handleFailedBatch(batch, exception, adjustSequenceNumbers);
}
this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
//如果这个批次还未标记状态,则标记状态(失败或者成功)
if (batch.done(baseOffset, logAppendTime, exception)) {
//归还内存池的内存并从inFlightBatches集合中移除
maybeRemoveAndDeallocateBatch(batch);
}
}
复制
这里的batch.done是给批次标记一个状态,由于这里exception不为null,所以标记为FAILED,即这个批次失败了。然后遍历批次中的消息,执行我们生产消息时绑定的那个回调函数在有异常情况下的逻辑。
最后将该批次对应的从inFlightBatches中移除,并释放批次占用的内存。
private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response) {
if (transactionManager != null) {
transactionManager.handleCompletedBatch(batch, response);
}
//标记批次的状态:成功
if (batch.done(response.baseOffset, response.logAppendTime, null)) {
//移除批次并释放内存
maybeRemoveAndDeallocateBatch(batch);
}
}
复制
最后将该批次对应的从inFlightBatches中移除,并释放批次占用的内存。
总结:
客户端通过注册的OP_READ事件,不断读取服务端返回的响应,将读取到的响应封装成NetworkReceive对象并放入stagedReceive结构 从stagedReceive结构中取出NetworkReceive对象,放入completedReceives集合中 从completedReceives集合中取出NetworkReceive对象,进行解析,并封装成ClientResongse对象,放入responses集合 对于超时的请求,首先断开和目标节点的网络连接,标记该节点为DISCONNECTED状态,然后封装一个没有响应体的ClientResponse对象,放入responses集合 不管是收到响应的请求还是超时的请求,最后都从InFlightRequests结构中将该请求移除 遍历responses中的响应对象,执行其回调方法 回调方法是在构建请求的时候绑定的,针对不同的结果执行不同的逻辑: 对于可重试的异常,进行重试,将该批次放回缓存队列进行重新发送 对于不可重试的异常,封装一个RuntimeException异常对象,然后释放内存,标记该批次为FAILED 对于元数据无效的异常,则重新更新元数据 对于消息过大的异常,则进行批次切分,重新放回缓存队列 对于正常的响应,则直接释放掉内存,标记该批次为SUCCEEDED 对于已经完成状态标记的批次,将该批次从InFlightBatches中移除,然后遍历批次中的消息,执行生产消息时绑定的回调函数