最近在自己的服务器上用docker构建了一个Elasticsearch服务,发现原来的TransportClient相关的api都已经废弃了,目前都提倡使用RestClient。为了更好地使用它,利用空闲时间对RestClient的源码进行如下分析。
RestClient的构造过程
创建过程代码示例:
RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost("192.168.1.100", 9200));
RestClient restClient = restClientBuilder.build();
restClient.performRequest(new Request(...))
复制
从上面的代码示例可以看出RestClient的实例化是依赖于RestClientBuilder的build方法,也就是应用了builder模式。HttpHost实例的构造方法入参为ip和端口。
关于RestClient的builder方法:
public static RestClientBuilder builder(HttpHost... hosts) {
-----------
List<Node> nodes = Arrays.stream(hosts).map(Node::new).collect(Collectors.toList());
return new RestClientBuilder(nodes);
}
复制
方法中主要的操作是将HttpHost映射成Node实例,Node中存放的是HttpHost中的元数据信息。我们来简单地看一下RestClientBuilder的构造方法:
RestClientBuilder(List<Node> nodes) {
----------
this.nodes = nodes;
}
复制
将nodes列表赋值给RestClientBuilder实例。我们接着来看下RestClientBuilder的属性列表:
// 默认的连接超时 毫秒数
public static final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 1000;
public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 30000;
public static final int DEFAULT_MAX_CONN_PER_ROUTE = 10;
public static final int DEFAULT_MAX_CONN_TOTAL = 30;
private static final Header[] EMPTY_HEADERS = new Header[0];
private final List<Node> nodes;
private Header[] defaultHeaders = EMPTY_HEADERS;
private RestClient.FailureListener failureListener;
private HttpClientConfigCallback httpClientConfigCallback;
private RequestConfigCallback requestConfigCallback;
private String pathPrefix;
private NodeSelector nodeSelector = NodeSelector.ANY;
private boolean strictDeprecationMode = false;
复制
DEFAULTCONNECTTIMEOUT_MILLIS参数用来控制默认的连接超时时间,为1000ms;
DEFAULTSOCKETTIMEOUT_MILLIS参数用来控制socket的默认超时时间为30000ms;
DEFAULTMAXCONNPERROUTE参数用来表示每个路由(我的理解为每个Node或主分片)最大连接数,默认为10;
DEFAULTMAXCONN_TOTAL参数用来表示默认的最大连接数,默认为30;
EMPTY_HEADERS参数用来表示空的header,也是默认的header;
nodes代表配置的可以连接的各个节点;
failureListener代表RestClient的失败监听器;
httpClientConfigCallback用来代表客户端配置的回调;
requestConfigCallback表示请求配置的回调;
pathPrefix表示路径前缀;
nodeSelector表示节点选择器;
strictDeprecationMode表示脚本模式是否过期。
org.elasticsearch.client.RestClientBuilder#build方法
直接上代码:
public RestClient build() {
if (failureListener == null) {
failureListener = new RestClient.FailureListener();
}
CloseableHttpAsyncClient httpClient = AccessController.doPrivileged(
(PrivilegedAction<CloseableHttpAsyncClient>) this::createHttpClient);
RestClient restClient = new RestClient(httpClient, defaultHeaders, nodes,
pathPrefix, failureListener, nodeSelector, strictDeprecationMode);
httpClient.start();
return restClient;
}
复制
这个方法主要包括三步:1. 创建CloseableHttpAsyncClient;2.用RestClient来代理CloseableHttpAsyncClient;3. 启动httpClient。
1. 创建CloseableHttpAsyncClient
AccessController.doPrivileged的作用是授予特权,跳过其他checkpermission的检查。在它里面执行的是创建httpClient的方法,createHttpClient方法的代码如下:
private CloseableHttpAsyncClient createHttpClient() {
//default timeouts are all infinite
// 设置请求的一些基本配置
RequestConfig.Builder requestConfigBuilder = RequestConfig.custom()
.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_MILLIS)
.setSocketTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS);
if (requestConfigCallback != null) {
requestConfigBuilder =
// 设置客户端的请求配置
requestConfigCallback.customizeRequestConfig(requestConfigBuilder);
}
try {
HttpAsyncClientBuilder httpClientBuilder = HttpAsyncClientBuilder.create().setDefaultRequestConfig(requestConfigBuilder.build())
//default settings for connection pooling may be too constraining // 设置一些配置
.setMaxConnPerRoute(DEFAULT_MAX_CONN_PER_ROUTE).setMaxConnTotal(DEFAULT_MAX_CONN_TOTAL)
.setSSLContext(SSLContext.getDefault())
.setTargetAuthenticationStrategy(new PersistentCredentialsAuthenticationStrategy());
if (httpClientConfigCallback != null) {
httpClientBuilder =
// 如果客户端有传入configCallback则使用客户端的config来设置httpClientBuilder
httpClientConfigCallback.customizeHttpClient(httpClientBuilder);
}
final HttpAsyncClientBuilder finalBuilder = httpClientBuilder;
return AccessController.doPrivileged((PrivilegedAction<CloseableHttpAsyncClient>) finalBuilder::build);
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException("could not create the default ssl context", e);
}
}
复制
这里主要的操作是先构建requestConfigBuilder,如果客户端没有做requestConfigCallback配置则使用默认的配置,如果有requestConfigCallback配置则使用客户端的配置来构建requestConfigBuilder;然后使用requestConfigBuilder来构建httpClientBuilder,如果客户端没有做httpClientConfigCallback配置则使用默认配置来设置httpClientBuilder,否则使用客户端的配置。最后用最终的finalBuilder来构建CloseableHttpAsyncClient实例,来看下具体的finalBuilder的build方法的关键代码:
// org.apache.http.impl.nio.client.HttpAsyncClientBuilder#build:
public CloseableHttpAsyncClient build() {
PublicSuffixMatcher publicSuffixMatcher = this.publicSuffixMatcher;
if (publicSuffixMatcher == null) {
publicSuffixMatcher = PublicSuffixMatcherLoader.getDefault();
}
// 连接管理器
NHttpClientConnectionManager connManager = this.connManager;
if (connManager == null) {
SchemeIOSessionStrategy sslStrategy = this.sslStrategy;
if (sslStrategy == null) {
------------------
sslStrategy = new SSLIOSessionStrategy(
sslcontext, supportedProtocols, supportedCipherSuites, hostnameVerifier);
}
// 连接的reactor线程
final ConnectingIOReactor ioreactor = IOReactorUtils.create(
defaultIOReactorConfig != null ? defaultIOReactorConfig : IOReactorConfig.DEFAULT, threadFactory);
// 池化的连接管理器
final PoolingNHttpClientConnectionManager poolingmgr = new PoolingNHttpClientConnectionManager(
ioreactor,
RegistryBuilder.<SchemeIOSessionStrategy>create()
.register("http", NoopIOSessionStrategy.INSTANCE)
.register("https", sslStrategy)
.build());
--------------
connManager = poolingmgr;
---------------
// 客户端执行器
final MainClientExec exec = new MainClientExec(
httpprocessor,routePlanner, redirectStrategy,targetAuthStrategy,
proxyAuthStrategy,userTokenHandler);
ThreadFactory threadFactory = null;
NHttpClientEventHandler eventHandler = null;
if (!this.connManagerShared) {
threadFactory = this.threadFactory;
if (threadFactory == null) {
// 默认的线程创建工厂
threadFactory = Executors.defaultThreadFactory();
}
eventHandler = this.eventHandler;
if (eventHandler == null) {
// 异步的请求执行器
eventHandler = new HttpAsyncRequestExecutor();
}
}
return new InternalHttpAsyncClient(
connManager, reuseStrategy, keepAliveStrategy,
threadFactory, eventHandler,exec,
cookieSpecRegistry,authSchemeRegistry,defaultCookieStore,
defaultCredentialsProvider,defaultRequestConfig);
复制
这里会对client的一些核心组件进行初始化,比如连接管理器,一些重用和连接保活的策略、线程工厂、事件处理器、执行器、cookie注册和存储策略及一些授权的操作。
继续来看InternalHttpAsyncClient的构造方法,直接来看构造方法主要是一些赋值的操作,这里主要需要关注的是它调用的父类org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase#CloseableHttpAsyncClientBase的构造方法:
public CloseableHttpAsyncClientBase(
final NHttpClientConnectionManager connmgr,
final ThreadFactory threadFactory,
final NHttpClientEventHandler handler) {
super();
this.connmgr = connmgr;
if (threadFactory != null && handler != null) {
this.reactorThread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
try {
final IOEventDispatch ioEventDispatch = new InternalIODispatch(handler);
connmgr.execute(ioEventDispatch);
} catch (final Exception ex) {
log.error("I/O reactor terminated abnormally", ex);
} finally {
status.set(Status.STOPPED);
}
}
});
} else {
this.reactorThread = null;
}
this.status = new AtomicReference<Status>(Status.INACTIVE);
}
复制
CloseableHttpAsyncClientBase的初始status的值为INACTIVE。CloseableHttpAsyncClientBase是通过nio的reactor模式来处理各个IO请求操作的。它的reactorThread会通过线程工厂进行创建,它就是对整个处理请求分发的线程。InternalIODispatch实例中有一个handler,这个handler为HttpAsyncRequestExecutor实例。connmgr.execute(ioEventDispatch)方法:
// org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager#execute:
@Override
public void execute(final IOEventDispatch eventDispatch) throws IOException {
this.ioreactor.execute(eventDispatch);
}
复制
它的ioreactor对象就是上面在创建CloseableHttpAsyncClient对象时传入connectionManager的,创建部分的代码如下:
final ConnectingIOReactor ioreactor = IOReactorUtils.create(
defaultIOReactorConfig != null ? defaultIOReactorConfig : IOReactorConfig.DEFAULT, threadFactory);
复制
真正的IO的reactor操作都是由它来处理的。针对ioreactor.execute方法我们再继续跟进一步看看org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor#execute方法:
@Override
public void execute(
final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException {
Args.notNull(eventDispatch, "Event dispatcher");
// 状态锁
synchronized (this.statusLock) {// 用的是对象的监视器锁 moniterin和moniterout标识
if (this.status.compareTo(IOReactorStatus.SHUTDOWN_REQUEST) >= 0) {
this.status = IOReactorStatus.SHUT_DOWN;
// 唤醒所有在锁上等待的
this.statusLock.notifyAll();
return;
}
Asserts.check(this.status.compareTo(IOReactorStatus.INACTIVE) == 0,
"Illegal state %s", this.status);
this.status = IOReactorStatus.ACTIVE;
// Start I/O dispatchers
for (int i = 0; i < this.dispatchers.length; i++) {
// 设置dispatcher
final BaseIOReactor dispatcher = new BaseIOReactor(this.selectTimeout, this.interestOpsQueueing);
// 设置异常处理handler
dispatcher.setExceptionHandler(exceptionHandler);
// 对dispatchers数组进行初始化
this.dispatchers[i] = dispatcher;
}
for (int i = 0; i < this.workerCount; i++) {
final BaseIOReactor dispatcher = this.dispatchers[i];
// 设置worker,对workers数组进行初始化
this.workers[i] = new Worker(dispatcher, eventDispatch);
// 设置线程,对threads数组进行初始化,这里需要注意的是传入的线程都是worker线程
this.threads[i] = this.threadFactory.newThread(this.workers[i]);
}
}
try {
for (int i = 0; i < this.workerCount; i++) {
// 如果当前reactor线程的状态不是active则直接返回
if (this.status != IOReactorStatus.ACTIVE) {
return;
}
// 启动数组中所有的线程,启动的是worker
this.threads[i].start();
}
for (;;) {//无限循环
final int readyCount;
try {
// reactor线程进行select操作,默认select超时时间为1s
// 在没有事件的情况下readyCount的值为0
readyCount = this.selector.select(this.selectTimeout);
} catch (final InterruptedIOException ex) {
------------------------------
if (this.status.compareTo(IOReactorStatus.ACTIVE) == 0) {
// 如果当前reactor线程是active状态,则执行processEvents操作
processEvents(readyCount);
}
// Verify I/O dispatchers
for (int i = 0; i < this.workerCount; i++) {
final Worker worker = this.workers[i];
final Throwable ex = worker.getThrowable();
if (ex != null) {
throw new IOReactorException(
"I/O dispatch worker terminated abnormally", ex);
}
}
if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0) {
break;
}
}
-------------------
} finally {
// 由于上面有个无限循环,执行到这里时就会执行一些shutdown的操作
doShutdown();
synchronized (this.statusLock) {
this.status = IOReactorStatus.SHUT_DOWN;
this.statusLock.notifyAll();
}
}
}
复制
这里主要是处理reactor线程的一些初始化和selector轮询操作,包括线程池,dispatcher池和worker池的初始化。关于线程池这里有一点需要特别注意的就是,这里的threads数组中是worker线程,我们来看看worker线程中的操作:
@Override
public void run() {
try {
this.dispatcher.execute(this.eventDispatch);
-------------
复制
org.apache.http.impl.nio.reactor.BaseIOReactor#execute方法:
@Override
public void execute(
final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException {
Args.notNull(eventDispatch, "Event dispatcher");
this.eventDispatch = eventDispatch;
execute();
}
复制
无参的execute方法代码如下:
protected void execute() throws InterruptedIOException, IOReactorException {
this.status = IOReactorStatus.ACTIVE;
try {
for (;;) {
final int readyCount;
try {
// 进行select操作
readyCount = this.selector.select(this.selectTimeout);
}
---------------------
// Process selected I/O events
if (readyCount > 0) {
// 当有ready事件时进行相应的event处理,当然这是针对一些老的请求
processEvents(this.selector.selectedKeys());
}
// Validate active channels
validate(this.selector.keys());
// Process closed sessions
processClosedSessions();
// If active process new channels
if (this.status == IOReactorStatus.ACTIVE) {
// 对于新进入的请求
processNewChannels();
}
-------------------------
复制
这里我们只针对比较关键的点进行分析,我总结了一下主要有两点:1. 针对老的Channel,在selector监听到有ready事件时,会调用processEvents方法来进行处理;2. 针对新创建的Channel,会调用processNewChannels方法进行处理。
1. processEvents方法:
直接上代码:
private void processEvents(final Set<SelectionKey> selectedKeys) {
for (final SelectionKey key : selectedKeys) {
processEvent(key);
}
selectedKeys.clear();
}
protected void processEvent(final SelectionKey key) {
final IOSessionImpl session = (IOSessionImpl) key.attachment();
try {
if (key.isAcceptable()) {
acceptable(key);
}
if (key.isConnectable()) {
connectable(key);
}
if (key.isReadable()) {
session.resetLastRead();
readable(key);
}
if (key.isWritable()) {
session.resetLastWrite();
writable(key);
}
}
------------------------
}
复制
这里主要是对已经准备好的key进行处理,会调用相应的acceptable、connectable、readable、writable方法进行连接和读写的处理,我们简单地来看下对应的处理方法:
//org.apache.http.impl.nio.reactor.BaseIOReactor#readable:
@Override
protected void readable(final SelectionKey key) {
final IOSession session = getSession(key);
try {
// Try to gently feed more data to the event dispatcher
// if the session input buffer has not been fully exhausted
// (the choice of 5 iterations is purely arbitrary)
for (int i = 0; i < 5; i++) {
// 因为考虑到input buffer可能一次接收不完整全部的数据,所以这里会处理5次来保证处理完整数据包
this.eventDispatch.inputReady(session);
if (!session.hasBufferedInput()
|| (session.getEventMask() & SelectionKey.OP_READ) == 0) {
break;
}
}
if (session.hasBufferedInput()) {
this.bufferingSessions.add(session);
}
------------------------------------
}
// org.apache.http.impl.nio.reactor.BaseIOReactor#writable
protected void writable(final SelectionKey key) {
final IOSession session = getSession(key);
try {
// 执行输出,也就是把服务端的响应信息输出到用户端
this.eventDispatch.outputReady(session);
------------------------
复制
这里主要对监听到的读写事件进行相应的处理,关于如何处理的部分在之后再专门进行梳理,这里就先分析到这里。
2. processNewChannels方法:
直接上代码:
private void processNewChannels() throws IOReactorException {
ChannelEntry entry;
while ((entry = this.newChannels.poll()) != null) {
final SocketChannel channel;
final SelectionKey key;
try {
channel = entry.getChannel();
channel.configureBlocking(false);
key = channel.register(this.selector, SelectionKey.OP_READ);
--------------
session = new IOSessionImpl(key, interestOpsCallback, sessionClosedCallback);
session.setAttribute(IOSession.ATTACHMENT_KEY, entry.getAttachment());
session.setSocketTimeout(timeout);
---------------------
this.sessions.add(session);
final SessionRequestImpl sessionRequest = entry.getSessionRequest();
if (sessionRequest != null) {
sessionRequest.completed(session);
}
key.attach(session);
sessionCreated(key, session);
--------------------
复制
这里的主要操作是当有新连接请求进入时,也就是newChannels队列中有新的客户端Channel加入时会取出相应的channel,然后向各个worker线程的selector上注册OP_READ事件(这点注意与reactor线程的select区别开来),并设置相应的session。这里有一点需要注意下,在sessionRequest的completed方法中会有相应的客户端回调操作,见代码:
//org.apache.http.impl.nio.reactor.SessionRequestImpl#completed
public void completed(final IOSession session) {
Args.notNull(session, "Session");
if (this.completed) {
return;
}
this.completed = true;
synchronized (this) {
this.session = session;
if (this.callback != null) {
this.callback.completed(this);
}
notifyAll();
}
}
复制
在这里会回调callback中的completed方法。在客户端请求中一般都是包装成一个Future对象,里面会在completed方法中做响应到达时的一些处理。
在看完上面两点之后,我们继续来看BaseIOReactor#execute方法。在其中还有一个无限循环,主要执行reactor线程的select操作(对空轮询操作没有处理,关于空轮询有兴趣的可以看下netty中对于空轮询的处理措施),这里我们进入到processEvents(readyCount)方法来看下reactor线程的具体处理逻辑:
//org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor#processEvents:
@Override
protected void processEvents(final int readyCount) throws IOReactorException {
//处理当前session域中的请求,实际上是消费请求队列
processSessionRequests();
if (readyCount > 0) {//如果readyCount的值大于0,代表有select到响应
final Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
for (final SelectionKey key : selectedKeys) {
// 根据selectKey 处理具体的io事件
processEvent(key);
}
selectedKeys.clear();
}
final long currentTime = System.currentTimeMillis();
if ((currentTime - this.lastTimeoutCheck) >= this.selectTimeout) {
this.lastTimeoutCheck = currentTime;
final Set<SelectionKey> keys = this.selector.keys();
processTimeouts(keys);
}
}
复制
在processEvents方法主要用于处理session请求队列中的请求任务,并通过向reactor线程的selector中注册相应的selectionKey事件来处理相应的IO事件。
processSessionRequests方法:
private void processSessionRequests() throws IOReactorException {
SessionRequestImpl request;
// 一个循环着从请求队列中取请求任务
while ((request = this.requestQueue.poll()) != null) {
if (request.isCompleted()) {//如果请求已完成
continue;
}
final SocketChannel socketChannel;
try {//打开channel
socketChannel = SocketChannel.open();
} catch (final IOException ex) {
-----------
}
try {
validateAddress(request.getLocalAddress());
validateAddress(request.getRemoteAddress());
socketChannel.configureBlocking(false);
prepareSocket(socketChannel.socket());
if (request.getLocalAddress() != null) {
final Socket sock = socketChannel.socket();
sock.setReuseAddress(this.config.isSoReuseAddress());
// 将socket与address进行bind
sock.bind(request.getLocalAddress());
}
//使用socketChannel与address进行连接
final boolean connected = socketChannel.connect(request.getRemoteAddress());
if (connected) {
final ChannelEntry entry = new ChannelEntry(socketChannel, request);
addChannel(entry);
continue;
}
------------------------------
final SessionRequestHandle requestHandle = new SessionRequestHandle(request);
try {
// 注册OP_CONNECT事件
final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT,requestHandle);
request.setKey(key);
} catch (final IOException ex) {
------------
复制
在这个方法中主要处理requestQueue中的请求任务,对未处理的请求创建相应的SocketChannel进行处理。
processEvent()方法:
private void processEvent(final SelectionKey key) {
try {
// key是否可连接
if (key.isConnectable()) {
// 获取channel
final SocketChannel channel = (SocketChannel) key.channel();
// Get request handle
final SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment();
// 获取sessionRequest
final SessionRequestImpl sessionRequest = requestHandle.getSessionRequest();
// Finish connection process
try {
// 结束连接进程
channel.finishConnect();
} catch (final IOException ex) {
sessionRequest.failed(ex);
}
key.cancel();
key.attach(null);
// 如果请求没有完成
if (!sessionRequest.isCompleted()) {
// 添加一个ChannelEntry到dispatchers中去
addChannel(new ChannelEntry(channel, sessionRequest));
} else {
try {
channel.close();
} catch (final IOException ignore) {
-----------------
复制
实际处理请求事件的方法,会在连接建立后进行连接事件的处理。
2. 用RestClient来包装CloseableHttpAsyncClient
可以理解成包装,其实也是一种静态代理的方式。主要是将CloseableHttpAsyncClient实例作为RestClient的一个属性,从而实现相应方法的代理。
3. 启动HttpClient
主要是通过CloseableHttpAsyncClient的start()方法来处理启动操作,代码如下:
// org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase#start方法
@Override
public void start() {
if (this.status.compareAndSet(Status.INACTIVE, Status.ACTIVE)) {
if (this.reactorThread != null) {
this.reactorThread.start();
}
}
}
复制
在这里会启动reactor线程。
客户端部分
用户调用restClient.performRequest()来执行请求,我们来简单地分析下请求执行的流程,直接看代码:
//org.elasticsearch.client.RestClient#performRequest(org.elasticsearch.client.Request)
public Response performRequest(Request request) throws IOException {
InternalRequest internalRequest = new InternalRequest(request);
return performRequest(nextNodes(), internalRequest, null);
}
private Response performRequest(final NodeTuple<Iterator<Node>> nodeTuple,
final InternalRequest request,
Exception previousException) throws IOException {
RequestContext context = request.createContextForNextAttempt(nodeTuple.nodes.next(), nodeTuple.authCache);
HttpResponse httpResponse;
try {
httpResponse = client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, null).get();
} catch(Exception e) {
-------
复制
会调用restClient的execute方法来进行请求的produce和响应的异步消费。
来看下具体的流程:
//org.apache.http.impl.nio.client.InternalHttpAsyncClient#execute方法
public <T> Future<T> execute(
final HttpAsyncRequestProducer requestProducer,
final HttpAsyncResponseConsumer<T> responseConsumer,
final HttpContext context,
final FutureCallback<T> callback) {
-----------------
@SuppressWarnings("resource")
final DefaultClientExchangeHandlerImpl<T> handler = new DefaultClientExchangeHandlerImpl<T>(
this.log,requestProducer, responseConsumer,
localcontext,future,this.connmgr,
this.connReuseStrategy,this.keepaliveStrategy,this.exec);
try {
handler.start();
} catch (final Exception ex) {
handler.failed(ex);
}
return new FutureWrapper<T>(future, handler);
}
// org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl#start
public void start() throws HttpException, IOException {
final HttpHost target = this.requestProducer.getTarget();
final HttpRequest original = this.requestProducer.generateRequest();
if (original instanceof HttpExecutionAware) {
((HttpExecutionAware) original).setCancellable(this);
}
// 提交请求到队列前的准备工作
this.exec.prepare(target, original, this.state, this);
// 请求连接
requestConnection();
}
//org.apache.http.impl.nio.client.AbstractClientExchangeHandler#requestConnection
final void requestConnection() {
// 获取对应的http路由,也就是找到对应的分片节点
final HttpRoute route = this.routeRef.get();
---------------------------
discardConnection();
this.validDurationRef.set(null);
this.routeTrackerRef.set(null);
this.routeEstablished.set(false);
final Object userToken = this.localContext.getUserToken();
final RequestConfig config = this.localContext.getRequestConfig();
this.connectionFutureRef.set(this.connmgr.requestConnection(
route,userToken,config.getConnectTimeout(),
config.getConnectionRequestTimeout(),TimeUnit.MILLISECONDS,
// 这个就是上面提到的completed回调,会在response回来之后进行相应的回调操作,但是需要注意的是它不是最直接的回调,它是在leaseFuture回调之后触发的二级回调
new FutureCallback<NHttpClientConnection>() {
@Override
public void completed(final NHttpClientConnection managedConn) {
connectionAllocated(managedConn);
}
@Override
public void failed(final Exception ex) {
connectionRequestFailed(ex);
}
@Override
public void cancelled() {
connectionRequestCancelled();
}
}));
}
复制
上面的流程可以看出,真正的produce request的操作发生在requestConnection()方法中,这个方法会调用this.connmgr.requestConnection,关于org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager#requestConnection方法:
@Override
public Future<NHttpClientConnection> requestConnection(
final HttpRoute route,
final Object state,
final long connectTimeout,
final long leaseTimeout,
final TimeUnit tunit,
final FutureCallback<NHttpClientConnection> callback) {
----------------------
final BasicFuture<NHttpClientConnection> resultFuture = new BasicFuture<NHttpClientConnection>(callback);
--------------------
final Future<CPoolEntry> leaseFuture = this.pool.lease(route, state,
connectTimeout, leaseTimeout, tunit != null ? tunit : TimeUnit.MILLISECONDS,
new FutureCallback<CPoolEntry>() {
@Override
public void completed(final CPoolEntry entry) {
--------------------
final NHttpClientConnection managedConn = CPoolProxy.newProxy(entry);
if (!resultFuture.completed(managedConn)) {
pool.release(entry, true);
}
}
-------------------------------
});
return new Future<NHttpClientConnection>() {
----------------------------------
@Override
public NHttpClientConnection get() throws InterruptedException, ExecutionException {
return resultFuture.get();
}
---------------------------
};
}
复制
这个方法中的核心是this.pool.lease方法,也是我们接下来要分析的重点,围绕着它的是一系列的Future,并用leaseFuture包装了外面传进来的callback Future,这里我们主要分析下org.apache.http.nio.pool.AbstractNIOConnPool#lease(T, java.lang.Object, long, long, java.util.concurrent.TimeUnit, org.apache.http.concurrent.FutureCallback
public Future<E> lease(
final T route, final Object state,
final long connectTimeout, final long leaseTimeout, final TimeUnit timeUnit,
final FutureCallback<E> callback) {
------------------------------------
final BasicFuture<E> future = new BasicFuture<E>(callback);
final LeaseRequest<T, C, E> leaseRequest = new LeaseRequest<T, C, E>(route, state,
connectTimeout >= 0 ? timeUnit.toMillis(connectTimeout) : -1,
leaseTimeout > 0 ? timeUnit.toMillis(leaseTimeout) : 0,
future);
this.lock.lock();
try {
final boolean completed = processPendingRequest(leaseRequest);
// 如果请求还未完成,则放入leasingRequests队列
if (!leaseRequest.isDone() && !completed) {
this.leasingRequests.add(leaseRequest);
}
if (leaseRequest.isDone()) {
// 如果请求已经完成,则放入完成队列
this.completedRequests.add(leaseRequest);
}
} finally {
this.lock.unlock();
}
// 执行callback
fireCallbacks();
return new Future<E>() {
@Override
public E get() throws InterruptedException, ExecutionException {
return future.get();
}
--------省略部分代码--------------
};
}
复制
继续来看org.apache.http.nio.pool.AbstractNIOConnPool#processPendingRequest方法(由于方法比较长,只截取比较重要部分来分析):
private boolean processPendingRequest(final LeaseRequest<T, C, E> request) {
---------------------------------------
// 根据route获取对应route节点的连接池
final RouteSpecificPool<T, C, E> pool = getPool(route);
E entry;
for (;;) {//无限循环,直到获取到一个可用的entry
//从连接池中去一个空闲的连接,优先取state相同的。state默认是null
entry = pool.getFree(state);
if (entry == null) {//获取到的为null的时候直接跳出循环
break;
}
//不为null时需要判断entry是否关闭或过期
if (entry.isClosed() || entry.isExpired(System.currentTimeMillis())) {
// 关闭entry
entry.close();
// 从可用列表中移除
this.available.remove(entry);
// 从pool中释放
pool.free(entry, false);
} else {
// 如果没有关闭或过期,直接break
break;
}
}
if (entry != null) {
// 从可用列表中移除
this.available.remove(entry);
//添加到leased列表中
this.leased.add(entry);
// 将请求标识为已完成状态
request.completed(entry);
//这些onReuse和onLease也是一些客户端回调
onReuse(entry);
onLease(entry);
return true;
}
// New connection is needed
// 当需要创建新的连接时会进入到这里,主要进行一些统计列表的维护
final int maxPerRoute = getMax(route);
// Shrink the pool prior to allocating a new connection
final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
if (excess > 0) {
for (int i = 0; i < excess; i++) {
final E lastUsed = pool.getLastUsed();
if (lastUsed == null) {
break;
}
lastUsed.close();
this.available.remove(lastUsed);
pool.remove(lastUsed);
}
}
if (pool.getAllocatedCount() < maxPerRoute) {
final int totalUsed = this.pending.size() + this.leased.size();
final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
if (freeCapacity == 0) {
return false;
}
final int totalAvailable = this.available.size();
if (totalAvailable > freeCapacity - 1) {
if (!this.available.isEmpty()) {
final E lastUsed = this.available.removeLast();
lastUsed.close();
final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
otherpool.remove(lastUsed);
}
}
------------------------------------------
// 在这里创建sessionRequest
final SessionRequest sessionRequest = this.ioReactor.connect(
// 这里需要特别注意sessionRequestCallback,它是在AbstractNIOConnPool的构造方法中初始化的,初始化是一个new InternalSessionRequestCallback()实例
remoteAddress, localAddress, route, this.sessionRequestCallback);
// 放入request的attach中
request.attachSessionRequest(sessionRequest);
final long connectTimeout = request.getConnectTimeout();
-------------------------
// 添加到pending列表中
this.pending.add(sessionRequest);
pool.addPending(sessionRequest, request.getFuture());
return true;
}
return false;
}
// org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor#connect
@Override
public SessionRequest connect(
final SocketAddress remoteAddress,
final SocketAddress localAddress,
final Object attachment,
final SessionRequestCallback callback) {
Asserts.check(this.status.compareTo(IOReactorStatus.ACTIVE) <= 0,
"I/O reactor has been shut down");
final SessionRequestImpl sessionRequest = new SessionRequestImpl(
remoteAddress, localAddress, attachment, callback);
sessionRequest.setConnectTimeout(this.config.getConnectTimeout());
// 向request队列中添加一个任务,这个队列会在reactor线程中进行消费
this.requestQueue.add(sessionRequest);
// 唤醒在selector上阻塞的reactor线程
this.selector.wakeup();
return sessionRequest;
}
复制
上面的整个流程做的事情无非是如下几点:
池中可用数量、pending数量、lease数量的列表维护;
如果从池中取到了相同状态的连接Entry,会返回true,这里会进入org.apache.http.nio.pool.AbstractNIOConnPool#lease(T, java.lang.Object, long, long, java.util.concurrent.TimeUnit, org.apache.http.concurrent.FutureCallback
)的剩下的方法中:
try {
final boolean completed = processPendingRequest(leaseRequest);
if (!leaseRequest.isDone() && !completed) {
this.leasingRequests.add(leaseRequest);
}
if (leaseRequest.isDone()) {
// 如果标识为completed,则会将该请求加入到completedRequests队列中
this.completedRequests.add(leaseRequest);
}
} finally {
this.lock.unlock();
}
// 触发对completedRequests队列的消费
fireCallbacks();
复制
如果标识为completed,则会将该请求加入到completedRequests队列中,并触发对completedRequests队列的消费,消费部分代码如下:
private void fireCallbacks() {
LeaseRequest<T, C, E> request;
while ((request = this.completedRequests.poll()) != null) {
final BasicFuture<E> future = request.getFuture();
final Exception ex = request.getException();
final E result = request.getResult();
boolean successfullyCompleted = false;
if (ex != null) {
future.failed(ex);
} else if (result != null) {
if (future.completed(result)) {
successfullyCompleted = true;
}
} else {
future.cancel();
}
if (!successfullyCompleted) {
// 在release方法中会将有些请求从leased列表迁移到available列表中
release(result, true);
}
}
}
// future.completed(result)方法中的入参result为一个NHttpClientConnection连接对象,future对象为org.apache.http.impl.nio.client.AbstractClientExchangeHandler#requestConnection方法中定义的 new FutureCallback<NHttpClientConnection>() ,它的completed方法定义如下:
@Override
public void completed(final NHttpClientConnection managedConn) {
connectionAllocated(managedConn);
}
//org.apache.http.impl.nio.client.AbstractClientExchangeHandler#connectionAllocated方法
----
synchronized (context) {
context.setAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER, this);
if (managedConn.isStale()) {
failed(new ConnectionClosedException("Connection closed"));
} else {
// 使用连接发送请求
managedConn.requestOutput();
}
}
------
///org.apache.http.impl.nio.conn.CPoolProxy#requestOutput方法
@Override
public void requestOutput() {
final NHttpClientConnection conn = getConnection();
if (conn != null) {
conn.requestOutput();
}
}
// org.apache.http.impl.nio.NHttpConnectionBase#requestOutput方法
@Override
public void requestOutput() {
this.session.setEvent(EventMask.WRITE);
}
// org.apache.http.impl.nio.reactor.IOSessionImpl#setEvent方法
@Override
public synchronized void setEvent(final int op) {
if (this.status == CLOSED) {
return;
}
if (this.interestOpsCallback != null) {
// update the current event mask
this.currentEventMask |= op;
// local variable
final InterestOpEntry entry = new InterestOpEntry(this.key, this.currentEventMask);
// add this operation to the interestOps() queue
this.interestOpsCallback.addInterestOps(entry);
} else {
final int ops = this.key.interestOps();
this.key.interestOps(ops | op);
}
this.key.selector().wakeup();
}
复制
可以看出,从available列表中取出的复用的连接,会自己通过setEvent方法向selector上注册自己的WRITE key。能这样做的原因是,一个连接在进入available列表之前都经历了通过reactor线程分配一个worker线程然后维护着一个配置好的IOSessionImpl的过程。
新连接的创建,会加入一个sessionRequest到requestQueue中,这个队列会在reactor线程中进行消费。同时会唤醒在selector上阻塞的reactor线程。在sessionRequest中会设置请求的回调方法,它是一个InternalSessionRequestCallback实例,简单看下代码:
class InternalSessionRequestCallback implements SessionRequestCallback {
@Override
public void completed(final SessionRequest request) {
requestCompleted(request);
}
}
protected void requestCompleted(final SessionRequest request) {
if (this.isShutDown.get()) {
return;
}
@SuppressWarnings("unchecked")
final
T route = (T) request.getAttachment();
this.lock.lock();
try {
this.pending.remove(request);
final RouteSpecificPool<T, C, E> pool = getPool(route);
final IOSession session = request.getSession();
try {
// 创建 conn
final C conn = this.connFactory.create(route, session);
// 创建entry
final E entry = pool.createEntry(request, conn);
if (pool.completed(request, entry)) {
this.leased.add(entry);
onLease(entry);
} else {
this.available.add(entry);
if (this.ioReactor.getStatus().compareTo(IOReactorStatus.ACTIVE) <= 0) {
// 处理下一个pending状态的请求
processNextPendingRequest();
}
}
} catch (final IOException ex) {
pool.failed(request, ex);
}
} finally {
this.lock.unlock();
}
// 触发回调
fireCallbacks();
}
复制
流程图
上面的代码分析可能略显得比较乱,下面分别针对请求进入和响应回来的场景进行详细的流程图分析如下:
总结
整体流程采取的是NIO+Reactor模式来处理网络IO通信问题,而且在这个的基础上增加了一个池,通过池一方面可以弹性配置连接的大小,另一方面可以客户端做一些实现来对连接池的状态进行监控。所以在Elasticsearch的restClient使用中,为什么使用单例而且能保持比较高的性能,这篇文章应该给出了答案。