暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

SpringBoot如何使用多线程执行任务

程序餐厅 2020-10-13
750

1,SpringBoot中可以使用@EnableAsync和@Async配合来开启异步

package com.cy;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
@SpringBootApplication
@EnableAsync //表示开启异步
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
    }
}
复制

2,在业务层需要异步执行的方法上添加@Async(线程池中配置好的线程方法)

如果使用默认线程池则不需要写括号里面的

3,自定义线程池配置

@Configuration
public class SpringThreadPoolConfig {
@Value("${async-thread-pool.corePoolSize}")
private int corePoolSize=2;
@Value("${async-thread-pool.maxPoolSize}")
private int maxPoolSize=5;
@Value("${async-thread-pool.keepAliveSeconds}")
private int keepAliveSeconds=60*5;
@Value("${async-thread-pool.queueCapacity}")
private int queueCapacity=5;

/**创建线程工厂对象:目的是创建线程时为线程起个名字*/
private ThreadFactory threadFactory=new ThreadFactory() {
private AtomicLong count=new AtomicLong(1);//CAS算法
@Override
public Thread newThread(Runnable r) {
return new Thread(r,"db-service-thread-name-"+count.getAndIncrement());
}
};

@Bean
public Executor asyncExecutor() {
System.out.println("corePoolSize="+corePoolSize);
//构建阻塞式队列
BlockingQueue<Runnable> workQueue=
new LinkedBlockingDeque<>(queueCapacity);
//构建线程池对象(tomcat中默认用的线程池也是这个类型)
ThreadPoolExecutor executor=
new ThreadPoolExecutor(corePoolSize,
maxPoolSize,
keepAliveSeconds,
TimeUnit.SECONDS,
workQueue,
threadFactory,
new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
}
复制

Yml文件中的线程配置

#spring async pool
async-thread-pool:
corePoolSize: 5
maxPoolSize: 10
keepAliveSeconds: 30
queueCapacity: 30
复制

注意,当放多线程执行的方法需要返回值时,需要添加Futrue

(如下示例为一个查询业务,则返回值类型为Futrue的泛型)

@Transactional(propagation = Propagation.REQUIRES_NEW)
@Async("asyncExecutor")
@Override
public Future<PageObject<SysLog>> findPageObjects(
String name, Integer pageCurrent) {
//1.验证参数合法性
//1.1验证pageCurrent的合法性,
//不合法抛出IllegalArgumentException异常
if(pageCurrent==null||pageCurrent<1)
throw new IllegalArgumentException("当前页码不正确");
//2.基于条件查询总记录数
//2.1) 执行查询

int rowCount=sysLogDao.getRowCount(name);
//2.2)验证查询结果,假如结果为0不再执行如下操作
if(rowCount==0)
throw new ServiceException("系统没有查到对应记录");
//3.基于条件查询当前页记录(pageSize定义为2)
//3.1)定义pageSize
int pageSize=2;
//3.2)计算startIndex
int startIndex=(pageCurrent-1)*pageSize;
//3.3)执行当前数据的查询操作
List<SysLog> records=
sysLogDao.findPageObjects(name, startIndex, pageSize);
//4.对分页信息以及当前页记录进行封装
//4.1)构建PageObject对象
PageObject<SysLog> pageObject=new PageObject<>();
//4.2)封装数据
pageObject.setPageCurrent(pageCurrent);
pageObject.setPageSize(pageSize);
pageObject.setRowCount(rowCount);
pageObject.setRecords(records);
pageObject.setPageCount((rowCount-1)/pageSize+1);
//5.返回封装结果。
return new AsyncResult<PageObject<SysLog>>(pageObject);
}
复制

此时返回值Controller时不可以直接获取到数据的,需要从Futre中取出来

取出时注意处理两个异常

异常1:ExecutionException封装了正在执行的线程抛出的任何异常,所以如果你的线程是做某种IO导致抛出IOException异常的,那么它会被包装在一个ExecutionException中并被重新抛出。

异常2:InterruptedException不是任何出错的迹象。在那里给你一种让你的线程知道什么时候停止的方法,以便他们完成当前的工作并优雅地退出。

文章转载自程序餐厅,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论