前两天看到一个自称30多岁的Java开发发的讨论帖说很少用线程池,听完之后不免有些诧异,心想这也太随意了吧!这不,现在就安排上线程池的文章!当然了,本篇文章不是以会用线程池为目的,而是以搞懂线程池为目的!相信在看完这篇文章后你不仅会用线程池了,还能真真正正的了解到线程池的底层运行原理。
看完本篇文章后你将会从源码级别明白线程池的整个执行流程,比如:
线程池运行状态的表示和当前线程数表示?
任务提交时线程池做了哪些工作?
何时新建线程?何时执行拒绝策略?
线程池是如何从阻塞队列获取待执行的任务的?
keepAliveTime是通过什么逻辑实现的?
执行结果是通过什么方式返回的?
有界队列无界队列分析实现?
前排提醒,由于讲的比较细致,导致本篇文章篇幅较长,还请耐心看完。如果耐心看完却还没有学会可以顺着网线来打我。

|为什么需要线程池
以传统阻塞式Socket IO编程为例,在不使用线程池的情况下编写是这样的

稍作分析便知道,短时间内如果有大量客户端连接进来,将会创建大量线程。这将会导致程序崩溃甚至系统宕机。这样的程序是极其脆落的,一次恶意攻击便可产生非常严重的影响。此外,线程的创建、销毁、切换的开销都比较大,这将造成程序性能的下降。
那现在我们便可以得到结论了,频繁的手动创建线程有以下问题:
创建的线程数量得不到控制,可能导致系统资源耗尽、系统宕机。
线程的创建、销毁、线程间的切换都比较耗费性能,可能比处理任务所所花费的时间更多,反而会使程序运行效率下降。
那线程池又怎样理解呢?线程池可以看做是线程的集合。在没有任务时线程处于空闲状态,当请求到来:线程池给这个请求分配⼀个空闲的线程,任务完成后回到线程池中等待下次任务(⽽不是销毁)。这样就实现了线程的重⽤。并且我们可以控制这个线程集合所创建的最大线程数,这样就不会出现线程资源耗尽问题了。
阿里巴巴Java开发手册关于线程池是这样说的:
【强制】线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。
说明:使用线程池的好处是减少在创建和销毁线程上所花的时间以及系统资源的开销,解决资源不足的问题。如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。
|显式创建线程池
线程池的创建方式有很多种,最常使用也是最推荐使用的方式是使用new ThreadPoolExecutor显式创建线程池。
直接new一个ThreadPoolExecutor对象即可实现线程池的创建:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
在这里列出来的是一个最全参数的ThreadPoolExecutor构造方法,在大多数情况下ThreadFactory和RejectedExecutionHandler可以省略,直接使用默认值。接下来详细介绍一下这些参数。
corePoolSize:核心线程数
maximumPoolSize:最大线程数
keepAliveTime:允许线程空闲时间
unit:上述空闲时间的时间单位
workQueue:指定阻塞队列
threadFactory:指定线程工厂
handler:设定任务拒绝策略
核心线程数&最大线程数&空闲时间
如果当前运行的线程数少于核心线程数corePoolSize,当有新任务到来时,直接创建一个新的线程出来。
如果当前运行的线程数大于核心线程数corePoolSize,且阻塞队列workQueue没满,则会将新任务放入阻塞队列。
如果当前运行的线程数大于核心线程数corePoolSize,且阻塞队列workQueue已满,且小于最大线程数maximumPoolSize,则会开辟新的线程。
如果当前运行的线程数大于核心线程数corePoolSize,且阻塞队列workQueue已满,且达到最大线程数maximumPoolSize,则执行拒绝策略。
默认情况下,如果当前线程数大于核心线程数corePoolSize,则超出核心线程数的那部分线程如果空闲时间超过keepAliveTime设定的时间将会被回收(可以理解为这部分线程是借的,不用了就及时归还。核心线程也可以设置成超时回收,通过设置allowCoreThreadTimeOut实现)。
阻塞队列
阻塞队列主要分为有界队列、无界队列、同步移交队列、延时队列这几种。
1)有界队列:队列的容量有上限,达到上限后创建新的线程(对应上面的第三条)。

ArrayBlockingQueue、指定了容量的LinkedBlockingQueue都是有界队列。


2)无界队列:队列的容量无上限,即使用无界队列的线程池实际运行的线程数总是小于等于核心线程数。调用LinkedBlockingQueue无参构造器构造出来的队列为无界队列。
3)延时工作队列:用于实现预执行、定时执行任务,队列DelayedWorkQueue也是无界的。
4)同步移交队列:虽然叫队列,但却没有容量,是无缓冲阻塞队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。
指定线程工厂
1)可以不指定,不指定的情况下默认使用Executors类的静态内部类
DefaultThreadFactory当作默认线程工厂。
2)指定线程工厂的最常用的场景是指定线程名。
指定任务拒绝策略
RejectedExecutionHandler接口有以下4个实现类

抛出异常
使⽤调⽤者的线程来处理
丢掉最老的任务
静默丢掉这个任务
默认拒绝策略是抛出异常

|快捷创建线程池
使用Executors提供的静态工厂方法快捷创建,在工厂方法内部实则也是调用的new ThreadPoolExecutor。
下面介绍一下Executors提供的几个线程池的默认实现:
SingleThreadPool:
核心线程数、最大线程数为1,使用单个线程运行。

CachedThreadPool:
核心线程数为0,最大线程数不限制,队列为同步移交队列。如果没有空闲线程,则直接创建一个新的线程。一旦创建的线程空闲时间超过60s,就将其回收。具有弹性伸缩能力,但有可能出现短时间内创建过多线程而导致系统资源耗尽问题。

FixedThreadPool:
核心线程数和最大线程数相等,阻塞队列为无界队列。

SchemaThreadPool:
使用延迟队列创建具有预定执行/定时执行功能的线程池。核心线程数可以自定义,最大线程数不限制(实际上创建的线程数不会超过核心线程数,因为DelayedWorkQueue为无界队列,可以一直往里面添加元素,容量不够了就扩容)。


看上去比第一种直接new ThreadPoolExecutor方便多了不过,不建议这样用。阿里巴巴Java开发手册对这部分的说明如下:
【强制】线程池不允许使用Executors创建,而是通过ThreadPoolExecutor的方式创建,这样的处理方式能够让编写代码的工程师更加明白线程池的运行规则,规避资源耗尽的风险。
说明:Executors返回的线程池对象的弊端如下。
1)FixedThreadPool和SingleThreadPool允许请求队列的长度为Integer.MAX_VALUE,可能堆积大量的请求,从而导致OOM。
2)CachedThreadPool和ScheduledThreadPool允许创建线程数量为Integer.MAX_VALUE,可能会创建大量线程,从而导致OOM。




说明:我认为阿里巴巴开发手册对ScheduledThreadPool的表述并不准确。ScheduledThreadPool不会因为最大线程数设置成Integer.MAX_VALUE而出现OOM问题,因为队列DelayedWorkQueue会自动扩容,实为无界队列。倒是会出现因为可能堆积大量的请求在无界队列导致OOM问题。

|提交单任务
提交单个任务使用submit方法,submit方法有以下3个不同形式

第一个方法提交一个实现了Runnable接口的任务,返回一个Future<?>对象,可以使用这样一个对象来调用isDone、 cancel 或 isCancelled。但是, get 方法在完成的时候只是简单地返回 null。
第二个版本的 Submit 也提交一个 Runnable, 并且 Future 的 get 方法在完成的时候返回指定的 result 对象。
第三个版本的 Submit 提交一个 Callable, 并且返回的 Future 对象的get方法得到的是代码块的返回结果。
|Callable与Future
前面看到了使用submit提交任务时可以提交实现了Runnable接口的任务,也可以提交实现了Callable接口的任务,submit方法返回一个Future对象。Runnable可能大家都很熟悉,Callable和Future有些人可能会感到陌生,顺便说一说吧。
Callable与Runnable的最大不同是Callable能够返回任务代码块的执行结果。
Runnable接口,run方法无返回值

Callable接口,call方法有返回值

使用Future对象的get方法获取代码块的返回值,该方法是阻塞的。

看一个例子,就明白了:
public class SimpleApplication {public static void main(String[] args) {ThreadPoolExecutor pool = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));Future<List<Integer>> future1 = pool.submit(new Counter(100));Future<List<Integer>> future2 = pool.submit(new Counter(50));try {List<Integer> result1 = future1.get();List<Integer> result2 = future2.get();System.out.println("得到的结果1:" + "大小:" + result1.size() + ",生成的数据:" + result1);System.out.println("得到的结果2:" + "大小:" + result2.size() + ",生成的数据:" + result2);} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}pool.shutdown();}/*** 生成特定个数的随机数*/static class Counter implements Callable<List<Integer>> {private final int size;public Counter(int size) {this.size = size;}@Overridepublic List<Integer> call() throws Exception {if (size < 0) {return new ArrayList<>();}ArrayList<Integer> arrayList = new ArrayList<>(size);Random random = new Random();for (int i = 0; i < size ; i++) {arrayList.add(random.nextInt());}return arrayList;}}}
执行结果

|提交任务组
提交任务组使用invokeAny或invokeAll方法,两者的区别是invokeAny只返回某一个任务的执行结果,至于返回的是哪一个任务的执行结果,并不能确定,也许是第一个执行完的那个任务;invokeAll则通过返回一个List<Future>对象返回所有任务的执行结果。

使用invokeAll的伪代码如下:
List<Callable<T>> tasks = ...;List<Future<T>> results = pool.invokeAll(tasks);for (Future<T> result : results) {result.get();}
这个写法儿是有些问题的,因为如果恰好第一个任务的执行所需的时间比较长,则不得不进行等待。更好的做法儿是使用ExecutorCompletionService将ThreadPoolExecutor包装一下,这样程序就能优先得到先执行完的结果了。

其中主要关心的是
构造函数:传递一个实现了Executor接口的对象。
submit: 提交一个任务给执行器
take:移除下一个已完成的结果,如果没有结果可用则阻塞
pool:移除下一个已完成的结果,如果没有结果可用则返回null
伪代码如下:
ExecutorCompletionService<T> service = new ExecutorCompletionService<>(pool);for (Callable<T> task : tasks) {service.submit(task);}for (int i = 0; i < tasks.size(); i++) {service.take().get();}
|关闭线程池
关闭线程池有2种方式

当用完一个线程池的时候, 调用 shutdown。该方法启动该池的关闭序列。被关闭的执行器不再接受新的任务(当前阻塞队列中的任务仍会被执行)。当所有任务都完成以后,线程池中的线程死亡。另一种方法是调用shutdownNow。该池取消尚未开始的所有任务并试图中断正在运行的线程。
调用shutdown,线程池将进入SHUTDOWN状态;调用shutdownNow,线程池将会进入STOP状态,这一部分到下面源码分析处详细解释。
|线程池关键源码分析
【1】线程池运行状态
线程池有RUNNING、SHUTDOWN、STOP、TIDYING、YERMINATED,这几种状态,在代码中用一个AtomicInteger类型的变量ctl的高3位来表示线程池状态,低29位表示当前线程数。

源码注释如下:

翻译一下:
RUNNING:接受新的任务,处理阻塞队列中的任务。
SHUTDOWN:不接受新的任务,处理阻塞队列中未处理的任务。
STOP:不接受新的任务,不处理阻塞队列中未处理的任务,中断正在运行的任务。
TIDYING:当所有任务已经处理完,任务计数为0,此时线程池状态将变为TIDYING,此时将会调用函数terminated()。
TERMINATED:函数terminated()执行完后会进入此状态。
变为SHUTDOWN状态,通过调用函数shutdown()实现;变为STOP状态,通过调用shutdownNow()实现;由SHUTDOWN状态变为TIDYING状态,是在工作队列以及阻塞队列的任务全部处理完后;由STOP状态变为TIDYING状态,是在工作队列任务全部处理完后;由TIDYING变为TERMINATED是在执行完函数terminated()后。
线程池状态可以用下面的图示表示:

【2】submit代码分析
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}
submit方法是由ThreadPoolExecutor的父类AbstractExecutorService实现的,该方法会将实现了Runnable接口或实现了Callable接口的对象封装成RunnableFuture对象(很重要,后面会用到),然后调用execute方法进一步处理。
【3】executor代码分析
public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}
execute方法由ThreadPoolExecutor实现,该方法的运行逻辑如下:
1)判断当前线程数是否小于核心线程数,如果小于,则执行addWorker方法,并将要提交的任务传递进去。如果addWorker方法执行成功,则直接返回,否则继续往下执行判断。
2)判断当前线程池是否为RUNNING状态,如果为RUNNING状态则将任务放入阻塞队列,当放入阻塞队列后再判断一次线程的运行状态(二次判断的目的是在多线程环境下,该操作并不是原子的,线程的状态很有可能被改变。所以需要在任务入队后再判断一次,以在必要时回滚或在没有线程运行的情况下创建新的线程。如果没有二次判断,则有可能造成任务成功提交了,但却运行不了的情况。因为有二次判断,所以当任务在运行不了的情况下至少还会给出提示信息),如果此时不是RUNNING状态,则将刚刚加入阻塞队列的任务从阻塞队列中移除,并执行拒绝策略。如果线程处于run状态,但当前线程数为0,则调用addWorker方法,此时addWorker方法的第一个参数传的是null。
3)如果workQueue.offer执行失败(队列满了),则执行addWorker方法,,并将要提交的任务传递进去,并判断addWorker方法是否执行成功(比如在线程池非RUNNING状态下会执行失败,或者达到最大线程数会执行失败),如果执行失败,则执行拒绝策略。
【4】workQueue.offer代码分析
前面提到了有界队列,无界队列,并指出哪些队列是有界的,哪些队列是无界的,这些都是通过分析的源码workQueue.offer分析出来的。
以队列ArrayBlockQueue为例:
public boolean offer(E e) {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lock();try {if (count == items.length)return false;else {enqueue(e);return true;}} finally {lock.unlock();}}
方法内部使用了使用了可重入锁保证了线程安全,ArrayBlockQueue内部实际上是一个循环数组。当判断到队列中的元素个数等于数组大小时,便认为队列已经满了,返回false。

而这个iterms的大小则是在构造函数中指定的:

所以,我们分析出了ArrayBlockQueue是有界的。对其他队列的分析方式和此一样,不妨你自己分析一下。
【5】addWorker代码分析
前面的execute方法调用的核心方法是addWorker方法,所以来分析一下addWorker方法做了什么。
先看看注释怎么说

翻译过来就是:
检查是否可以针对当前池状态和给定的边界(核心或最大值)添加新的工作程序。如果是这样,则会相应地调整工作程序计数,并且如果可能,会创建并启动一个新的工作程序,并运行firstTask作为其第一个任务。如果池已停止或有资格关闭,则此方法返回false。如果在询问时线程工厂无法创建线程,则它还会返回false。如果线程创建失败(由于线程工厂返回null或由于异常(通常在Thread.start()中为OutOfMemoryError)),我们将进行干净的回滚。
这部分源码我直接用注释的方式进行分析。
private boolean addWorker(Runnable firstTask, boolean core) {//对代码块进行标记retry:for (;;) {int c = ctl.get();//获取高3位的线程运行状态int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);//线程数超限,直接返回falseif (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//线程计数执行原子加1操作,成功直接跳出最外面那层循环if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctl//状态改变了,跳出内层循环,重新执行外层循环,重新获取状态if (runStateOf(c) != rs)continue retry;//状态未改变,一直在内层循环重试CAS加1操作// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//将任务封装成Worker对象w = new Worker(firstTask);//获取到Worker对象创建的Thread对象final Thread t = w.thread;if (t != null) {//上锁,保证线程安全final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());//线程池处于运行状态 或 线程池处于关闭状态且task为nullif (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();//添加到任务队列,任务队列用Set存储workers.add(w);int s = workers.size();//有一个变量记录了线程池运行过程中创建的最大线程数,在这里更新的是这个变量if (s > largestPoolSize)largestPoolSize = s;//任务添加成功workerAdded = true;}} finally {//解锁mainLock.unlock();}if (workerAdded) {//启动线程t.start();workerStarted = true;}}} finally {//如果任务没有添加成功执行失败处理逻辑if (! workerStarted)addWorkerFailed(w);}return workerStarted;}
看完代码后,我们可以得出addWorker方法的作用是做一系列状态判断,并在状态符合要求时,直接开辟一个新的线程运行任务,并通过将任务添加到workers中实现对任务的管理。
addWorker方法最后执行了t.start()启动了线程,但启动的线程所执行的代码块并不是所提交的任务而是执行了addWorker对象的runWorker方法,真正核心的代码也正在此处。

【6】addWorker.runWorker代码分析
还是老规矩,先看注释再分析代码

翻译一下:
主循环。反复从队列中获取任务并执行它们,同时解决许多问题:
1)我们可以从初始任务开始,在这种情况下,我们不需要获取第一个任务。否则,只要池正在运行,我们就会从getTask获得任务。如果返回null,则工作器由于池状态或配置参数更改而退出。其他退出是由于外部代码中的异常引发而导致的,在这种情况下,completedAbruptly成立,这通常导致processWorkerExit替换此线程。
2)在运行任何任务之前,先获取锁,以防止任务执行时其他池中断,然后确保除非池正在停止,否则此线程不会设置其中断。
3)每个任务运行之前都会调用beforeExecute,这可能会引发异常,在这种情况下,我们将导致线程死掉(中断带有completelyAbruptly true的循环)而不处理该任务。
4)假设beforeExecute正常完成,我们运行任务,收集其引发的任何异常以发送给afterExecute。我们分别处理RuntimeException,Error(规范保证我们可以捕获它们)和任意Throwables。因为我们无法在Throwable.run中抛出Throwable,所以我们将它们包装在Errors中(输出到线程的UncaughtExceptionHandler)。任何抛出的异常也会保守地导致线程死亡。
5) task.run完成后,我们调用afterExecute,这也可能引发异常,这也将导致线程死亡。根据JLS Sec 14.20,此异常是即使task.run抛出也会生效的异常。
提取一下有效信息:
0)该方法正常情况下会一直运行(是一个循环)
1)该方法会执行fastTask或通过getTask从阻塞队列中获取任务。
2)在执行任务前后会执行beforeExecute和afterExecute
3)getTask()返回null时,线程结束运行(注意,这里下面会用到)
通过加注释的方式对代码进行分析:
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {//while循环,当firstTask执行完成后或本来就为空时//会通过getTask方法从阻塞队列中获取新的任务执行//如果getTask()结果为null,则跳出循环,结束线程运行while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {//默认什么都不做//可以通过重写此方法实现扩展beforeExecute(wt, task);Throwable thrown = null;try {//真正执行的代码块//这里是在submit方法上封装的FutureTask对象的run方法task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {//默认什么都不做//可以通过重写此方法实现扩展afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}
在目标任务执行前后执行的beforeExecute和afterExecute默认什么都不做,可以通过重写这两个方法实现自定义。


【7】addWorker.getTask代码分析
runWorker方法通过getTask方法从阻塞队列中获取待执行的任务,下面就来看看getTask方法的实现吧。
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}

这一部分是getTask的核心代码,还记得前面说过的keepAliveTime吗?其实keepAliveTime的逻辑就是在这里实现的!
核心线程允许超时标志位为true或当前线程数超出了核心线程数,timed标志置位:

当超时标志置位时,通过设置超时参数,从队列中获取元素,获取不到直接返回null,对应的线程将会结束执行。

【8】FutureTask代码分析
前面在分析submit时,我们知道了submit方法会将传进来的任务对象封装成FutureTask对象。在分析runWorker时也看到了执行的任务代码块实际上是前面封装的FutureTask对象的run方法。那我们来看看FutureTask对象的run方法到底做了什么吧。

看看run源码

通过get方法阻塞获取执行结果

代码分析到这里就可以了,已经把线程池的大部分处理逻辑串起来了。强烈建议在看文章的同时,自己去跟读代码。
|线程池实战
ThreadPoolExecutor实战:
下面演示一个使用ThreadPoolExecutor以及Future的一个较为复杂的例子,实现了在指定目录下的文件中查找是否含有指定的关键字,并计算出符合条件的文件个数。
public class Application {private static final ExecutorService pool = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));public static void main(String[] args) {try (Scanner in = new Scanner(System.in)) {System.out.println("请输入要检索的目录:");String directory = in.nextLine();System.out.println("请输入要检索的关键字:");String keyword = in.nextLine();MatchCounter counter = new MatchCounter(new File(directory), keyword);Future<Integer> future = pool.submit(counter);System.out.println(future.get() + "个匹配的文件");System.out.println("largest pool size:" + ((ThreadPoolExecutor)pool).getLargestPoolSize());pool.shutdown();} catch (Exception e) {e.printStackTrace();}}/*** 搜索关键字出现的文件数*/static class MatchCounter implements Callable<Integer> {private final File directory;private final String keyword;public MatchCounter(File directory, String keyword) {this.directory = directory;this.keyword = keyword;}@Overridepublic Integer call() {int count = 0;File[] files = directory.listFiles();if (files == null) {return 0;}List<Future<Integer>> results = new ArrayList<>();for (File file : files) {if (file.isDirectory()) {MatchCounter counter = new MatchCounter(file, keyword);Future<Integer> future = pool.submit(counter);results.add(future);} else {if (search(file)) {count++;}}}for (Future<Integer> result : results) {try {count += result.get();} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}}return count;}private boolean search(File file) {try (Scanner in = new Scanner(file, "UTF-8")) {boolean found = false;while (in.hasNextLine()) {String line = in.nextLine();if (line.contains(keyword)) {System.out.println("检索到的文件:" + file.getName());return true;}}return false;} catch (IOException e) {return false;}}}}
执行结果

ScheduledThreadPoolExecutor实战:
下面代码实现了预期执行任务以及定时执行任务的功能,实现起来较为简单:
public class ScheduledApplication {private static final ScheduledThreadPoolExecutor sPool = new ScheduledThreadPoolExecutor(4);public static void main(String[] args) {Scanner scanner = new Scanner(System.in);//每1秒执行一次sPool.scheduleAtFixedRate(new TimeReport(), 1000, 1000, TimeUnit.MILLISECONDS);System.out.println("2s后执行预期任务:" + new Date());sPool.schedule(new FutureTask(), 2000, TimeUnit.MILLISECONDS);if (scanner.hasNext()) {sPool.shutdown();}}static class TimeReport implements Runnable {@Overridepublic void run() {System.out.println(new Date());}}static class FutureTask implements Runnable {@Overridepublic void run() {System.out.println("预期执行:" + new Date());}}}
程序执行结果:

|写在最后
关于线程池的使用、线程池源码分析到这里就结束了,如有纰漏之处还望及时指出,有任何疑问也可以直接给公众号发消息,我看到后会回复的。由于这篇文章比较长,看完是需要很大的耐心的。相信,如果你能认真的看到这里一定会有所收获的。




