前言
在开发过程中,我们会遇到很多使用线程池的业务场景,例如定时任务使用的就是ScheduledThreadPoolExecutor。而有些时候使用线程池的场景就是会将一些可以进行异步操作的业务放在线程池中去完成,例如在生成订单的时候给用户发送短信,生成订单的结果不应该被发送短信的成功与否所左右,也就是说生成订单这个主操作是不依赖于发送短信这个操作,所以我们就可以把发送短信这个操作置为异步操作。而要想完成异步操作,一般使用的一个是消息服务器MQ,一个就是线程池。今天我们就来看看在Java中常用的Spring框架中如何去使用线程池来完成异步操作,以及分析背后的原理。

在Spring4中,Spring中引入了一个新的注解@Async,这个注解让我们在使用Spring完成异步操作变得非常方便。

在SpringBoot环境中,要使用@Async注解,我们需要先在启动类上加上@EnableAsync注解。这个与在SpringBoot中使用@Scheduled注解需要在启动类中加上@EnableScheduling是一样的道理(当然你使用古老的XML配置也是可以的,但是在SpringBoot环境中,建议的是全注解开发),具体原理下面会分析。加上@EnableAsync注解后,如果我们想在调用一个方法的时候开启一个新的线程开始异步操作,我们只需要在这个方法上加上@Async注解,当然前提是,这个方法所在的类必须在Spring环境中。

项目实况介绍

项目中,我需要将700w条数据,定时任务加入到mysql表中,去掉日志打印和一些其他因素的影响,入库时间还是需要8个小时以上,严重影响后续的一系列操作,所以我才用@Async注解,来实现异步入库,开了7个线程,入库时间缩短为1.5个小时,大大提高效率,以下是详细介绍,一级一些需要注意的坑.

需要写个配置文件两种方式

第一种方式

@Configuration
@EnableAsync //启用异步任务
public class ThreadConfig {
@Bean
public ThreadPoolTaskExecutor executor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(15);
//配置最大线程数
executor.setMaxPoolSize(30);
//配置队列大小
executor.setQueueCapacity(1000);
//线程的名称前缀
executor.setThreadNamePrefix("Executor-");
//线程活跃时间(秒)
//executor.setKeepAliveSeconds(60);
//等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
//设置拒绝策略
//executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
return executor;
}
}

第二种方式

@Configuration
@EnableAsync
public class ExecutorConfig { @Value("${thread.maxPoolSize}")
private Integer maxPoolSize;
@Value("${thread.corePoolSize}")
private Integer corePoolSize;
@Value("${thread.keepAliveSeconds}")
private Integer keepAliveSeconds;
@Value("${thread.queueCapacity}")
private Integer queueCapacity;
@Bean
public ThreadPoolTaskExecutor asyncExecutor(){
ThreadPoolTaskExecutor taskExecutor=new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(corePoolSize);//核心数量
taskExecutor.setMaxPoolSize(maxPoolSize);//最大数量
taskExecutor.setQueueCapacity(queueCapacity);//队列
taskExecutor.setKeepAliveSeconds(keepAliveSeconds);//存活时间
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);//设置等待任务完成后线程池再关闭
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//设置拒绝策略
taskExecutor.initialize();//初始化
return taskExecutor;
}
}

配置文件

#线程池
thread:
corePoolSize: 5
maxPoolSize: 10
queueCapacity: 100
keepAliveSeconds: 3000

springboot默认是不开启异步注解功能的,所以,要让springboot中识别@Async,则必须在入口文件中,开启异步注解功能

package com.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync; //开启异步注解功能
@EnableAsync
@SpringBootApplication
public class SpringbootTaskApplication { public static void main(String[] args) {
SpringApplication.run(SpringbootTaskApplication.class, args);
} }

这里有个坑!

如果遇到报错:需要加上    proxyTargetClass = true

The bean 'xxxService' could not be injected as a'com.xxxx.xxx.xxxService' because it is a JDK dynamic proxy that implements:
xxxxxx
Action:
Consider injecting the bean as one of its interfaces orforcing the use of CGLib-based proxiesby setting proxyTargetClass=true on @EnableAsync and/or @EnableCaching.
package com.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync; //开启异步注解功能
@EnableAsync(proxyTargetClass = true)
@SpringBootApplication
public class SpringbootTaskApplication { public static void main(String[] args) {
SpringApplication.run(SpringbootTaskApplication.class, args);
} }

当我service层处理完逻辑,吧list分成7个小list然后调用异步方法(异步方法的参数不用管,没影响,只截取核心代码)

List<List<DistributedPredictDTO>> partition = Lists.partition(userList, userList.size() / 7);
for (List<DistributedPredictDTO> distributedPredictDTOS : partition) {
       //调用异步方法
threadService.getI(beginDate, endDate, tableName, distributedPredictDTOS, hMap, i);
}
@Slf4j
@Service
public class ThreadServiceImpl {
@Resource
ResourcePoolUrlProperties properties;
@Resource
private MonitorDao monitorDao;
@Async
Integer getI(String beginDate, String endDate, String tableName, List<DistributedPredictDTO> userList, Map<String, String> hMap, int i) {
log.info("我开始执行");
for (DistributedPredictDTO e : userList) {
String responseStr;
HashMap<String, String> pMap = Maps.newHashMap();
pMap.put("scheduleId", e.getScheduleId());
pMap.put("scheduleName", e.getScheduleName());
pMap.put("distribsunStationId", e.getLabel());
pMap.put("distribsunStationName", e.getValue());
pMap.put("beginTime", beginDate);
pMap.put("endTime", endDate);
try {
if ("180".equals(properties.getNewPowerSys().getDistributedPredictUrl().substring(17, 20))) {
pMap = null;
}
responseStr = HttpClientUtil.doPost(properties.getNewPowerSys().getDistributedPredictUrl(), hMap, pMap);
} catch (Exception exception) {
throw new RuntimeException(e.getValue() + "的功率预测接口异常" + hMap + pMap);
}
if (org.springframework.util.StringUtils.isEmpty(responseStr)) {
log.info(e + "数据为空");
continue;
}
JSONObject resJson = JSONObject.parseObject(responseStr);
JSONObject obj = (JSONObject) resJson.get("obj");
JSONArray tableData = (JSONArray) obj.get("tabledata"); final List<DistributedUserPower> userPowers = Lists.newArrayList();
for (Object o : tableData) {
final DistributedUserPower distributedUserPower = new DistributedUserPower();
distributedUserPower.setData(((JSONObject) o).get("data").toString());
distributedUserPower.setData2(((JSONObject) o).get("data2").toString());
distributedUserPower.setDataTime(((JSONObject) o).get("time").toString());
distributedUserPower.setUserId(e.getLabel());
distributedUserPower.setUserName(e.getValue());
distributedUserPower.setAreaName(e.getScheduleName());
distributedUserPower.setCreateTime(DateUtils.getDate());
userPowers.add(distributedUserPower);
}
monitorDao.saveBatch(userPowers, tableName);
i++;
}
return i;
}

这里有两个坑!

第一个坑:

  我调用的异步方法在当前类中,则直接导致

@Async注解失效
正确操作,异步方法不要和同步调用方法写在同一个类中,应该重新调用其他类

第二个坑:

如果出现这个报错:

Null return value from advice does not mat

问题分析

代码中采用异步调用,AOP 做来一层切面处理,底层是通过 JDK 动态代理实现

不管采用 JDK 还是 CGLIB 代理,返回值必须是包装类型,所以才会导致上诉的报错信息

处理方案
将异步方法的返回值修改为基本类型的对应包装类型即可,如 int -> Integer

5分钟测试效果图:

最后一张是7线程:

spring-boot @Async注解 解决异步多线程入库的问题的更多相关文章

  1. 使用Spring中@Async注解实现异步调用

    异步调用? 在解释异步调用之前,我们先来看同步调用的定义:同步就是整个处理过程顺序执行,当各个过程都执行完毕,并返回结果. 异步调用则是只是发送了调用的指令,调用者无需等待被调用的方法完全执行完毕,继 ...

  2. Spring使用@Async注解

    本文讲述@Async注解,在Spring体系中的应用.本文仅说明@Async注解的应用规则,对于原理,调用逻辑,源码分析,暂不介绍.对于异步方法调用,从Spring3开始提供了@Async注解,该注解 ...

  3. Spring boot 使用WebAsyncTask处理异步任务

    上文介绍了基于 @Async 注解的 异步调用编程,本文将继续引入 Spring Boot 的 WebAsyncTask 进行更灵活异步任务处理,包括 异步回调,超时处理 和 异常处理. 正文 1. ...

  4. Spring Boot常用注解总结

    Spring Boot常用注解总结 @RestController和@RequestMapping注解 @RestController注解,它继承自@Controller注解.4.0之前的版本,Spr ...

  5. Spring Boot 常用注解汇总

    一.启动注解 @SpringBootApplication @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documen ...

  6. 3个Spring Boot核心注解,你知道几个?

    Spring Boot 核心注解讲解 Spring Boot 最大的特点是无需 XML 配置文件,能自动扫描包路径装载并注入对象,并能做到根据 classpath 下的 jar 包自动配置. 所以 S ...

  7. Spring Boot@Component注解下的类无法@Autowired的问题

    title: Spring Boot@Component注解下的类无法@Autowired的问题 date: 2019-06-26 08:30:03 categories: Spring Boot t ...

  8. Spring boot 基于注解方式配置datasource

    Spring boot 基于注解方式配置datasource 编辑 ​ Xml配置 我们先来回顾下,使用xml配置数据源. 步骤: 先加载数据库相关配置文件; 配置数据源; 配置sqlSessionF ...

  9. 【SpringBoot】15. Spring Boot核心注解

    Spring Boot核心注解 1 @SpringBootApplication 代表是Spring Boot启动的类 2 @SpringBootConfiguration 通过bean对象来获取配置 ...

  10. spring boot纯注解开发模板

    简介 spring boot纯注解开发模板 创建项目 pom.xml导入所需依赖 点击查看源码 <dependencies> <dependency> <groupId& ...

随机推荐

  1. C++实现的哈希搜索

    C++实现的哈希搜索 程序内容 Complete a text searching engine using hash table. 完成一个文本搜索引擎,使用哈希表 程序设计 程序流程图 程序代码 ...

  2. 错误 是否保存对以下各项的更改 devenv.sin

    描述: 打开VS2012项目时,提示 是否保存对以下各项的更改 devenv.sin google了一下,没找到...纠结.百度了一下,竟然有的...擦一直以为google很给力,看来对于中文的解析不 ...

  3. [转载]启用 VIM 中的 Python 自动补全及提示功能

    转载: http://zhongwei-leg.iteye.com/blog/941474 周围的同事不喜欢使用 VIM 写 Python 代码的原因之一就是,VIM 不能像 Visual Studi ...

  4. php变量布尔值验证

    使用 PHP 函数对变量 $x 进行比较 表达式 gettype() empty() is_null() isset() boolean : if($x) $x = ""; str ...

  5. Android开发技巧——自定义控件之增加状态

    Android开发技巧--自定义控件之增加状态 题外话 这篇本该是上周四或上周五写的,无奈太久没写博客,前几段把我的兴头都用完了,就一拖再拖,直到今天.不想把这篇拖到下个月,所以还是先硬着头皮写了. ...

  6. DVWA 1.9 通关秘籍

    DVWA 1.9 通关秘籍   本文来源:i春秋社区-分享你的技术,为安全加点温度    DVWA (Dam Vulnerable Web Application) DVWA是用PHP+Mysql编写 ...

  7. [Laravel] Laravel的基本数据库操作部分

    [laravel] laravel的数据库配置 找到程序目录结构下.env文件 配置基本的数据库连接信息 DB_HOST=127.0.0.1 DB_PORT=3306 DB_DATABASE=blog ...

  8. MIME 设置

    1,打开iis7,选择你要设置网站,打开mime类型选项 2,找到.rar的mime类型,复制他的类型 3,复制后选项添加,在文件扩展名那一栏填入.*,然后在下面的mime类型复制你刚复制的appli ...

  9. 每天一个linux命令(7):whereis 命令

    whereis命令只能用于程序名的搜索,而且只搜索二进制文件(参数-b).man说明文件(参数-m)和源代码文件(参数-s).如果省略参数,则返回所有信息. 和find相比,whereis查找的速度非 ...

  10. Unix环境高级编程(十)信号续

    1.signal函数 Unix系统的信号机制最简单的接口是signal函数,函数原型如下: #include <signal.h> typedef void (*sighandler_t) ...