Springboot详解线程池与多线程及阻塞队列的应用详解

目录

  • 一、案例场景
  • 二、使用类
  • 三、本例说明
    • 1.接收web请求
    • 2.后台任务处理
    • 3.关系说明
  • 四、代码
    • 1.OrderController
    • 2.FlowStarter流程启动器
    • 3.FlowManager流程管理器
    • 4.StepContainer线程池容器
    • 5.StepExecutor线程执行器
    • 6.StepHandler业务处理handler
    • 7.阻塞队列
      • 7.1 FlowQueue
      • 7.2 QueueUtils
      • 7.3 ConstantUtils
    • 8.任务模型
      • 8.1 StepModel
      • 8.2 StepResult
    • 9.业务数据模型
      • 9.1 OrderInfo
      • 9.2 ResultObj
    • 10.测试
      • 10.1 web请求
      • 10.2 后台任务日志
版本:Spring Boot 2.6.3

一、案例场景 1>web端接收restful请求生成任务A,并把任务放入队列Queue_A。
2>线程池A的任务线程从队列Queue_A取出任务,处理完成后放入Queue_B。
3>线程池B的任务线程从Queue_B取出任务,处理完成后入库。
本例就使用两个任务步骤,按需扩展延长任务链。

二、使用类 java.util.LinkedHashMap,双向链表。
java.util.concurrent.BlockingQueue,阻塞队列接口。
java.util.concurrent.LinkedBlockingQueue,阻塞队列实现类。
java.util.concurrent.CountDownLatch,线程计数器。
java.util.concurrent.locks.ReentrantLock,可重入锁。

三、本例说明
1.接收web请求
OrderController接收web请求,业务数据封装成任务对象,并写入队列QUEUE_A。Web请求结束,立即返回。

2.后台任务处理
FlowStarter流程启动器
管理FlowManager,创建流程管理器和启动流程管理器。创建线程池容器StepContainer,指定队列、线程池线程数量,以及业务处理Handler。
FlowManager流程管理器
管理线程池容器StepContainer。创建线程池容器,启动线程池容器,关闭线程池容器,线程池容器之间数据传递。使用LinkedHashMap维护一个流程中的多个线程池容器。
【Springboot详解线程池与多线程及阻塞队列的应用详解】StepContainer线程池容器
创建线程池,启动线程执行器(Executor),初始化业务处理Handler,读写队列。使用LinkedHashMap维护一个流程中的多个StepExecutor。
StepExecutor线程执行器
执行抽象公用业务逻辑。实现线程Runnable接口。调用StepHandler的实现类的execute执行具体业务逻辑。
StepHandler业务处理器handler
具体业务在StepHandler的实现类的execute中实现。
任务模型对象StepModel和执行结果对象StepResult
每个具体业务数据必须包装成任务模型对象StepModel,执行结果包装成执行结果对象StepResult,才能在线程池和队列中流转。

3.关系说明
一个FlowStarter可以启动一个或者多个FlowManager。支持一对多和一对一,按需扩展。
一个FlowManager对应一个业务流程。一个业务流程可以拆分为多个步骤。一个步骤对应一个线程池容器StepContainer。一个线程池容器StepContainer,启动多个线程执行器StepExecutor。效果就是并发执行任务。
一个业务流程拆分成若干个步骤,每个步骤之间数据流转,使用任务模型StepModel中的状态标识isFinished,isPutInQueueAgain,isPutInQueueNext 字段来分析任务流向。使用StepModel的StepResult的 nextStepName字段来识别具体流向的线程池容器。

四、代码
1.OrderController
OrderController,接收请求、封装任务、写队列。
@Slf4j@RestController@RequestMapping("/order")public class OrderController {@PostMapping("/f1")public Object f1(@RequestBody Object obj) {log.info("OrderController->f1,接收参数,obj = " + obj.toString()); Map objMap = (Map) obj; OrderInfo orderInfo = new OrderInfo(); orderInfo.setUserName((String) objMap.get("userName")); orderInfo.setTradeName((String) objMap.get("tradeName")); orderInfo.setOrderTime(System.currentTimeMillis()); LinkedBlockingQueue queueA = FlowQueue.getBlockingQueue("QUEUE_A"); QueueUtils.putStepPutInQueue(queueA,orderInfo); log.info("OrderController->f1,返回." ); return ResultObj.builder().code("200").message("成功").build(); }}


2.FlowStarter流程启动器
FlowStarter,后台任务线程池和线程启动。实现InitializingBean了接口。那么在spring初始化化bean完成后,就能触发启动线程池和线程。
@Slf4j@Servicepublic class FlowStarter implements InitializingBean {@Overridepublic void afterPropertiesSet() throws Exception {log.info("FlowWorker创建流程."); FlowManager flowManager = new FlowManager(); flowManager.buildContainer(ConstantUtils.STEP_01,5,FlowQueue.getBlockingQueue("QUEUE_A"), Step01Handler.class); flowManager.buildContainer(ConstantUtils.STEP_02,5,FlowQueue.getBlockingQueue("QUEUE_B"), Step02Handler.class); flowManager.startContainers(); log.info("FlowWorker启动流程完成."); }}


3.FlowManager流程管理器
一个FlowManager流程管理器,维护多个线程池容器StepContainer,共同完成一个流程的多个步骤。
public class FlowManager {// 管理器名称private String name; // 管理线程池容器private Map stepContainerMap = new LinkedHashMap<>(); public FlowManager() {}// 创建线程池容器public void buildContainer(String name, int poolSize, BlockingQueue queue,Class handlerClazz) {StepContainer stepWorker = new StepContainer(); stepWorker.createThreadPool(poolSize, queue, handlerClazz); stepWorker.setName(name); stepWorker.setFlowManager(this); this.stepContainerMap.put(name, stepWorker); }// 启动线程池容器public void startContainers() {for (StepContainer stepContainer : this.stepContainerMap.values()) {stepContainer.startRunExecutor(); }}// 关闭线程池容器public void stopContainers() {for (StepContainer stepContainer : this.stepContainerMap.values()) {stepContainer.stopRunExecutor(); }this.stepContainerMap.clear(); }// 任务放入下一个线程池public boolean sendToNextContainer(String nextStepName, Object obj) {if (nextStepName != null && !StringUtils.equals(nextStepName, "")) {if (this.stepContainerMap.containsKey(nextStepName)) {this.stepContainerMap.get(nextStepName).putStepInQueue(obj); return true; } else {return false; }} else {return false; }}public String getName() {return name; }}


4.StepContainer线程池容器
StepContainer线程池容器,维护多个线程执行器StepExecutor,实现多线程异步完成每个独立任务。
@Slf4jpublic class StepContainer {// 线程池名称private String name; // 线程池private ExecutorService threadPool; // 线程数目private int nThreads = 0; // 线程处理业务handler类private Class handlerClazz; // 线程处理业务队列private BlockingQueue queue = null; // 线程池内线程管理private Map stepExecutorMap = new LinkedHashMap<>(); // 线程池运行状态private boolean isRun = false; // 线程池管理器private FlowManager flowManager = null; // 构造函数public StepContainer() {}// 创建线程池public boolean createThreadPool(int nThreads, BlockingQueue queue,Class handlerClazz) {try {this.nThreads = nThreads; this.queue = queue; this.handlerClazz = handlerClazz; this.threadPool = Executors.newFixedThreadPool(this.nThreads, new ThreadFactory() {@Overridepublic Thread newThread(Runnable runnable) {return new Thread(runnable); }}); } catch (Exception e) {e.printStackTrace(); return false; }return true; }// 启动线程public void startRunExecutor() {if (!this.isRun) {if (this.handlerClazz != null) {log.info("线程池: " + this.name + ",启动,加载线程Executor."); StepExecutor stepExecutor; String executorName = ""; for (int num = 0; num < this.nThreads; num++) {try {executorName = this.name + "_" + (num + 1); StepHandler stepHandler = (StepHandler) createStepHandler(this.handlerClazz); stepExecutor = new StepExecutor(executorName, this.queue, stepHandler, this); this.threadPool.execute(stepExecutor); this.stepExecutorMap.put(executorName, stepExecutor); } catch (Exception e) {e.printStackTrace(); }}this.isRun = true; }}}// 关闭线程public void stopRunExecutor() {if (isRun) {Iterator iterator = this.stepExecutorMap.values().iterator(); while (iterator.hasNext()) {StepExecutor stepExecutor = (StepExecutor) iterator.next(); stepExecutor.stop(); }this.stepExecutorMap.clear(); this.isRun = false; }}// 从队列获取任务public StepModel getStepFromQueue() {StepModel stepModel = null; synchronized (this.queue) {try {if (this.queue.size() > 0) {stepModel = this.queue.take(); }} catch (Exception e) {log.info("从队列获取任务异常."); e.printStackTrace(); }}return stepModel; }// 任务放入队列public void putStepInQueue(Object obj) {try {StepModel stepModel = new StepModel(obj); stepModel.setPutInQueueTime(System.currentTimeMillis()); this.queue.put(stepModel); } catch (InterruptedException e) {log.info("任务放入队列异常."); e.printStackTrace(); }}// 重新放入public void putStepInQueueAgain(StepModel stepModel) {stepModel.setFinished(false); stepModel.setPutInQueueNext(false); stepModel.setPutInQueueAgain(false); try {this.queue.put(stepModel); } catch (InterruptedException e) {log.info("任务重新放入队列异常."); e.printStackTrace(); }}// 清空队列public void clearQueue() {if (this.queue != null) {this.queue.clear(); }}// 初始化实例对象public Object createStepHandler(Class clazz)throws InstantiationException, IllegalAccessException {Object object = clazz.newInstance(); return object; }public String getName() {return name; }public void setName(String name) {this.name = name; }public FlowManager getFlowManager() {return flowManager; }public void setFlowManager(FlowManager flowManager) {this.flowManager = flowManager; }}


5.StepExecutor线程执行器
StepExecutor线程执行器,实现Runnable接口。线程执行单元通用逻辑,具体业务逻辑通过调用StepHandler的execute方法实现。
@Slf4jpublic class StepExecutor implements Runnable {// 执行器名称private String name; // 线程执行的任务private StepModel stepModel; // 线程执行的队列private BlockingQueue queue; // 线程执行的业务处理逻辑private Object stepHandler; // 线程运行状态private volatile boolean isRun = false; // 线程开启(True)和关闭(False)private volatile boolean isClose = false; // 线程隶属容器private StepContainer stepContainer; // 线程计数器(关闭线程使用)private CountDownLatch countDownLatch = null; public StepExecutor() {}public StepExecutor(String name, BlockingQueue queue,StepHandler stepHandler, StepContainer stepContainer) {this.name = name; this.queue = queue; this.stepHandler = stepHandler; this.stepContainer = stepContainer; }@Overridepublic void run() {this.isRun = true; this.countDownLatch = new CountDownLatch(1); // 没收到关闭信号,则循环运行while (!this.isClose) {this.stepModel = null; String threadName = "【线程池:" + this.stepContainer.getName() + ",线程:" + Thread.currentThread().getName() + "】"; // 循环运行,为防止中断和卡主,需捕获异常try {StepHandler stepHandler = (StepHandler) this.stepHandler; this.stepModel = this.stepContainer.getStepFromQueue(); if (this.stepModel != null) {log.info(threadName + ",处理任务."); this.stepModel.getStepResultList().clear(); stepHandler.execute(this.stepModel); // 执行完成后结果数据List stepResultList = this.stepModel.getStepResultList(); boolean isFinished = this.stepModel.isFinished(); boolean isPutInQueueAgain = this.stepModel.isPutInQueueAgain(); boolean isPutInQueueNext = this.stepModel.isPutInQueueNext(); if (isFinished && !isPutInQueueAgain && !isPutInQueueNext) {log.info(threadName + ",任务结束."); }if (!isFinished && isPutInQueueAgain && !isPutInQueueNext) {log.info(threadName + ",任务在本步骤未完成,重新放队列."); this.stepContainer.putStepInQueueAgain(this.stepModel); }if (!isFinished && !isPutInQueueAgain && isPutInQueueNext) {int resultNum = stepResultList.size(); if (resultNum > 0) {for (StepResult stepResult : stepResultList) {log.info(threadName + ",任务在本步骤已经完成,发送给下一个线程池: " + stepResult.getNextStepName() + ",执行."); this.stepContainer.getFlowManager().sendToNextContainer(stepResult.getNextStepName(),stepResult.getResult()); }}}} else {threadToSleep(1000 * 3L); }} catch (Exception e) {log.info("执行器异常."); e.printStackTrace(); this.stepContainer.putStepInQueueAgain(this.stepModel); }}// 跳出循环后,线程计数减1this.countDownLatch.countDown(); this.isRun = false; }public void stop() {this.isClose = true; if (this.countDownLatch != null) {while (this.countDownLatch.getCount() > 0L) {try {this.countDownLatch.await(); } catch (InterruptedException e) {log.info("线程关闭异常."); e.printStackTrace(); }}}this.isClose = false; }public void threadToSleep(long time) {try {Thread.sleep(time); } catch (Exception e) {log.info("线程休眠异常."); e.printStackTrace(); }}}


6.StepHandler业务处理handler
StepHandler是StepExecutor线程执行器,具体执行业务逻辑的入口。
StepHandler抽象类
每个具体的实现类都继承抽象的StepHandler。
public abstract class StepHandler {public StepHandler() {}public abstract void execute(StepModel stepModel); }

Step01Handler
Step01Handler是StepHandler实现类,从队列中取任务执行,执行完成后放入下一个业务处理器Step02Handler。
@Slf4jpublic class Step01Handler extends StepHandler { @Override public void execute(StepModel stepModel) {log.info("Step01Handler执行开始,stepModel: " + stepModel.toString()); OrderInfo orderInfo = (OrderInfo) stepModel.getObj(); List stepResultList = stepModel.getStepResultList(); try {log.info("Step01Handler执行,处理订单."); String orderNo = UUID.randomUUID().toString().replace("-", "").toUpperCase(); orderInfo.setOrderNo(orderNo); orderInfo.setPlatformType("线上"); orderInfo.setOrderSource("Web"); stepModel.setFinished(false); stepModel.setPutInQueueNext(true); stepModel.setPutInQueueAgain(false); stepResultList.add(new StepResult(ConstantUtils.STEP_02, orderInfo)); } catch (Exception e) {stepModel.setFinished(false); stepModel.setPutInQueueNext(false); stepModel.setPutInQueueAgain(true); stepResultList.add(new StepResult(ConstantUtils.STEP_01, orderInfo)); }log.info("Step01Handler执行完成,stepModel: " + stepModel.toString()); }}

Step02Handler
Step02Handler是StepHandler实现类,从队列中取任务执行。
@Slf4jpublic class Step02Handlerextends StepHandler{@Overridepublic void execute(StepModel stepModel) {log.info("Step02Handler执行开始,stepModel: " + stepModel.toString()); OrderInfo orderInfo = (OrderInfo) stepModel.getObj(); List stepResultList = stepModel.getStepResultList(); try {orderInfo.setEndTime(System.currentTimeMillis()); stepModel.setFinished(true); stepModel.setPutInQueueNext(false); stepModel.setPutInQueueAgain(false); log.info("Step02Handler执行,入库."); } catch (Exception e) {stepModel.setFinished(true); stepModel.setPutInQueueNext(false); stepModel.setPutInQueueAgain(false); }log.info("Step02Handler执行完成,stepModel: " + stepModel.toString()); }}


7.阻塞队列
BlockingQueue是线程安全的阻塞队列。

7.1 FlowQueue FlowQueue,管理本例使用的两个阻塞队列。
public class FlowQueue { private static final LinkedBlockingQueue queueA = new LinkedBlockingQueue(); private static final LinkedBlockingQueue queueB = new LinkedBlockingQueue(); public static LinkedBlockingQueue getBlockingQueue(String queueName) {LinkedBlockingQueue queue = null; switch (queueName) {case "QUEUE_A":queue = queueA; break; case "QUEUE_B":queue = queueB; break; }return queue; }}


7.2 QueueUtils QueueUtils,队列简易工具。
@Slf4jpublic class QueueUtils { public static StepModel getStepFromQueue(LinkedBlockingQueue queue) {StepModel stepModel = null; try {if (queue.size() > 0) {stepModel = queue.take(); }} catch (Exception e) {log.info("读队列异常."); e.printStackTrace(); }return stepModel; } public static void putStepPutInQueue(LinkedBlockingQueue queue, Object obj) {try {StepModel stepModel = new StepModel(obj); stepModel.setPutInQueueTime(System.currentTimeMillis()); queue.put(stepModel); } catch (Exception e) {log.info("写队列异常."); e.printStackTrace(); } } public static int getQueueSize(LinkedBlockingQueue queue) {int size = 0; try {size = queue.size(); } catch (Exception e) {log.info("获取队列Size异常."); e.printStackTrace(); }return size; }}


7.3 ConstantUtils ConstantUtils,管理常量,即线程池名称。
public class ConstantUtils { public static final String STEP_01 = "STEP_01_THREAD_POOL"; public static final String STEP_02 = "STEP_02_THREAD_POOL"; }


8.任务模型
任务模型,即具体需要处理对象,封装成线程使用的任务模型,这样可以把业务和流程框架解耦。

8.1 StepModel StepModel,任务模型封装。
@Datapublic class StepModel {// 任务对象private Object obj; // 任务执行结果private List stepResultList; // 任务接收时间private long putInQueueTime; // 任务完成标识private boolean isFinished = false; // 任务重新放入队列标识private boolean isPutInQueueAgain = false; // 任务放入下一个队列标识private boolean isPutInQueueNext = false; public StepModel(Object object) {this.obj = object; this.stepResultList = new ArrayList<>(); }}


8.2 StepResult StepResult,执行结果模型封装。
@Datapublic class StepResult { // 目标线程池名 private String nextStepName; // 执行结果 private Object result; public StepResult(String nextStepName,Object result){this.nextStepName = nextStepName; this.result = result; }}


9.业务数据模型
业务数据模型,即生成具体需要处理的数据,在传入给线程池的线程执行前,需要封装成任务模型。

9.1 OrderInfo OrderInfo,本例要处理的业务数据模型。
@Data@NoArgsConstructorpublic class OrderInfo {private String userName; private String orderNo; private String tradeName; private String platformType; private String orderSource; private long orderTime; private long endTime; }


9.2 ResultObj ResultObj,web请求返回的统一封装对象。
@Data@NoArgsConstructor@AllArgsConstructor@Builderpublic class ResultObj {private String code; private String message; }


10.测试
包括web请求和后台任务

10.1 web请求 请求URL: http://127.0.0.1:8080/server/order/f1
入参:
{
"userName": "HangZhou0614",
"tradeName": "Vue进阶教程"
}
返回值:
{
"code": "200",
"message": "成功"
}

10.2 后台任务日志 日志输出:
Springboot详解线程池与多线程及阻塞队列的应用详解
文章图片

到此这篇关于Springboot详解线程池与多线程及阻塞队列的应用详解的文章就介绍到这了,更多相关Springboot线程池内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

    推荐阅读