您的当前位置:首页正文

Springboot之多线程多任务并行+线程池处理

2022-05-02 来源:年旅网
Springboot之多线程多任务并⾏+线程池处理

最近项⽬中做到⼀个关于批量发短信的业务,如果⽤户量特别⼤的话,不能使⽤单线程去发短信,只能尝试着使⽤多任务来完成!我们的项⽬使⽤到了⽅式⼆,即Future的⽅案

Java 线程池

Java通过Executors提供四种线程池,分别为:

newCachedThreadPool创建⼀个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若⽆可回收,则新建线程。newFixedThreadPool 创建⼀个定长线程池,可控制线程最⼤并发数,超出的线程会在队列中等待。newScheduledThreadPool 创建⼀个定长线程池,⽀持定时及周期性任务执⾏。

newSingleThreadExecutor 创建⼀个单线程化的线程池,它只会⽤唯⼀的⼯作线程来执⾏任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执⾏。优点

重⽤存在的线程,减少对象创建、消亡的开销,性能佳。

可有效控制最⼤并发线程数,提⾼系统资源的使⽤率,同时避免过多资源竞争,避免堵塞。提供定时执⾏、定期执⾏、单线程、并发数控制等功能。

⽅式⼀(CountDownLatch)

public class StatsDemo {

final static SimpleDateFormat sdf = new SimpleDateFormat( \"yyyy-MM-dd HH:mm:ss\");

final static String startTime = sdf.format(new Date());

/**

* IO密集型任务 = ⼀般为2*CPU核⼼数(常出现于线程中:数据库数据交互、⽂件上传下载、⽹络数据传输等等) * CPU密集型任务 = ⼀般为CPU核⼼数+1(常出现于线程中:复杂算法) * 混合型任务 = 视机器配置和复杂度⾃测⽽定 */

private static int corePoolSize = Runtime.getRuntime().availableProcessors(); /**

* public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime, * TimeUnit unit,BlockingQueue workQueue) * corePoolSize⽤于指定核⼼线程数量 * maximumPoolSize指定最⼤线程数

* keepAliveTime和TimeUnit指定线程空闲后的最⼤存活时间

* workQueue则是线程池的缓冲队列,还未执⾏的线程会在队列中等待 * 监控队列长度,确保队列有界

* 不当的线程池⼤⼩会使得处理速度变慢,稳定性下降,并且导致内存泄露。如果配置的线程过少,则队列会持续变⼤,消耗过多内存。

* ⽽过多的线程⼜会 由于频繁的上下⽂切换导致整个系统的速度变缓——殊途⽽同归。队列的长度⾄关重要,它必须得是有界的,这样如果线程池不堪重负了它可以暂时拒绝掉新的请求。 * ExecutorService 默认的实现是⼀个⽆界的 LinkedBlockingQueue。 */

private static ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, corePoolSize+1, 10l, TimeUnit.SECONDS, new LinkedBlockingQueue(1000));

public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(5); //使⽤execute⽅法

executor.execute(new Stats(\"任务A\ executor.execute(new Stats(\"任务B\ executor.execute(new Stats(\"任务C\ executor.execute(new Stats(\"任务D\ executor.execute(new Stats(\"任务E\ latch.await();// 等待所有⼈任务结束

System.out.println(\"所有的统计任务执⾏完成:\" + sdf.format(new Date())); }

static class Stats implements Runnable { String statsName; int runTime;

CountDownLatch latch;

public Stats(String statsName, int runTime, CountDownLatch latch) { this.statsName = statsName; this.runTime = runTime; this.latch = latch; }

public void run() { try {

System.out.println(statsName+ \" do stats begin at \"+ startTime); //模拟任务执⾏时间 Thread.sleep(runTime);

System.out.println(statsName + \" do stats complete at \"+ sdf.format(new Date())); latch.countDown();//单次任务结束,计数器减⼀ } catch (InterruptedException e) { e.printStackTrace(); } } }}

  

⽅式⼆(Future)

重点是和springboot整合,采⽤注解bean⽅式⽣成ThreadPoolTaskExecutor@Bean

//spring依赖包

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;@Configuration

public class GlobalConfig {

/**

* 默认线程池线程池 *

* @return Executor */

@Bean

public ThreadPoolTaskExecutor defaultThreadPool() {

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //核⼼线程数⽬

executor.setCorePoolSize(16); //指定最⼤线程数

executor.setMaxPoolSize(64); //队列中最⼤的数⽬

executor.setQueueCapacity(16); //线程名称前缀

executor.setThreadNamePrefix(\"defaultThreadPool_\");

//rejection-policy:当pool已经达到max size的时候,如何处理新任务

//CALLER_RUNS:不在新线程中执⾏任务,⽽是由调⽤者所在的线程来执⾏ //对拒绝task的处理策略

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //线程空闲后的最⼤存活时间

executor.setKeepAliveSeconds(60); //加载

executor.initialize(); return executor; }}

使⽤

//通过注解引⼊配置

@Resource(name = \"defaultThreadPool\")private ThreadPoolTaskExecutor executor;

//使⽤Future⽅式执⾏多任务 //⽣成⼀个集合

List futures = new ArrayList<>();

//获取后台全部有效运营⼈员的集合

List adminUserDOList = adminManagerService.GetUserToSentMsg(null);

for (AdminUserMsgResponse response : adminUserDOList) { //并发处理

if (response.getMobile() != null) {

Future future = executor.submit(() -> { //发送短信

mobileMessageFacade.sendCustomerMessage(response.getMobile(), msgConfigById.getContent()); });

futures.add(future); } }

//查询任务执⾏的结果

for (Future future : futureList) {

while (true) {//CPU⾼速轮询:每个future都并发轮循,判断完成状态然后获取结果,这⼀⾏,是本实现⽅案的精髓所在。即有10个future在⾼速轮询,完成⼀个future的获取结果,就关闭⼀个轮询

if (future.isDone()&& !future.isCancelled()) {//获取future成功完成状态,如果想要限制每个任务的超时时间,取消本⾏的状态判断+future.get(1000*1, TimeUnit.MILLISECONDS)+catch超时异常使⽤即可。 Integer i = future.get();//获取结果

System.out.println(\"任务i=\"+i+\"获取完成!\"+new Date()); list.add(i);

break;//当前future获取结果完毕,跳出while } else {

Thread.sleep(1);//每次轮询休息1毫秒(CPU纳秒级),避免CPU⾼速轮循耗空CPU---》新⼿别忘记这个 } } }

完毕

因篇幅问题不能全部显示,请点此查看更多更全内容