Java高并发BlockingQueue重要的实现类二

DelayQueue

DelayQueue是一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的Delayed元素。
存放到DelayDeque的元素必须继承Delayed接口。Delayed接口使对象成为延迟对象,它使存放在DelayQueue类中的对象具有了激活日期,该接口强制执行下列两个方法:
  • CompareTo(Delayed o):Delayed接口继承了Comparable接口,因此有了这个方法
  • getDelay(TimeUnit unit):这个方法返回到激活日期的剩余时间,时间单位由单位参数指定
DelayQueue使用场景
  • 关闭空闲链接。服务器中,有很多客户端链接,空闲一段时间后需要关闭。
  • 缓存超过了缓存时间,就需要从缓存中移除。
DelayQueue超时订单处理案例
package com.rumenz.learn.delayqueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; //DelayQueue里面的元素必须实现Delayedpublic class Item implements Delayed {private Long expireTime; private T data; public Item(Long expireTime, T data) { this.expireTime = expireTime+System.currentTimeMillis(); this.data = https://www.it610.com/article/data; }@Override public long getDelay(TimeUnit unit) { long d = unit.convert(this.expireTime - System.currentTimeMillis(),unit); return d; }@Override public int compareTo(Delayed o) { long d=getDelay(TimeUnit.MILLISECONDS)-o.getDelay(TimeUnit.MILLISECONDS); if(d==0){ return 0; } return d>0?1:-1; }public Long getExpireTime() { return expireTime; }public void setExpireTime(Long expireTime) { this.expireTime = expireTime; }public T getData() { return data; }public void setData(T data) { this.data = https://www.it610.com/article/data; } }// 订单实体类 package com.rumenz.learn.delayqueue; public class OrderItem { private Double orderAmount; private String orderNo; //0未支付 1支付了 private Integer orderStatus; public OrderItem(Double orderAmount, String orderNo, Integer orderStatus) { this.orderAmount = orderAmount; this.orderNo = orderNo; this.orderStatus = orderStatus; }public Double getOrderAmount() { return orderAmount; }public void setOrderAmount(Double orderAmount) { this.orderAmount = orderAmount; }public String getOrderNo() { return orderNo; }public void setOrderNo(String orderNo) { this.orderNo = orderNo; }public Integer getOrderStatus() { return orderStatus; }public void setOrderStatus(Integer orderStatus) { this.orderStatus = orderStatus; } }//package com.rumenz.learn.delayqueue; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Iterator; import java.util.Map; import java.util.Random; import java.util.concurrent.*; public class DelayQueueExample { //3个线程 1个线程下单 1个线程支付1个线程关闭超时订单订单支付超时时间为10s public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(3); DelayQueue> delayeds = new DelayQueue<>(); ConcurrentMap map = new ConcurrentHashMap<>(); //下单线程 executorService.execute(()->{ SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Integer orderNo=100; while (true){ try{ Thread.sleep(3000); Integer amount = new Random().nextInt(1000); OrderItem orderItem=new OrderItem(amount.doubleValue(), String.valueOf(orderNo), 0); Item item=new Item<>(10*1000L,orderItem); Date date=new Date(); date.setTime(item.getExpireTime()); System.out.println("=======================下单=========================="); System.out.println("生成订单时间:"+simpleDateFormat.format(new Date())); System.out.println("订单编号:"+orderNo); System.out.println("订单金额:"+orderItem.getOrderAmount()); System.out.println("支付过期时间:"+simpleDateFormat.format(date)); System.out.println("========================下单========================="); map.put(String.valueOf(orderNo),orderItem); orderNo++; delayeds.offer(item); }catch (Exception e){ e.printStackTrace(); }}}); //支付线程 executorService.execute(()->{ while (true){ try { //随机等待 再支付 Thread.sleep(new Random().nextInt(15)*1000); String orderNo=""; Iterator iterator = map.entrySet().iterator(); if(iterator.hasNext()){ OrderItem orderItem = iterator.next().getValue(); orderItem.setOrderStatus(1); orderNo=orderItem.getOrderNo(); System.out.println("-----------------------支付订单-----------------------"); System.out.println("订单支付"+orderNo); System.out.println("支付金额"+orderItem.getOrderAmount()); System.out.println("-----------------------支付订单-----------------------"); } map.remove(orderNo); }catch (Exception e){ e.printStackTrace(); } }}); //关系过期的订单 executorService.execute(()->{ SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); while (true){ try{ Item item = delayeds.take(); OrderItem data = https://www.it610.com/article/item.getData(); Date date=new Date(); date.setTime(item.getExpireTime()); if(data.getOrderStatus()==0){ System.out.println("########################过期订单########################"); System.out.println("订单编号:"+data.getOrderNo()); System.out.println("订单金额:"+data.getOrderAmount()); System.out.println("订单到期支付时间:"+simpleDateFormat.format(date)); System.out.println("########################过期订单########################"); }map.remove(data.getOrderNo()); }catch (Exception e){ e.printStackTrace(); } } }); executorService.shutdown(); } }

SynchronousQueue
它是一个特殊的队列交做同步队列,特点是当一个线程往队列里写一个元素,写入操作不会理解返回,需要等待另外一个线程来将这个元素拿走。同理,当一个读线程做读操作的时候,同样需要一个相匹配写线程的写操作。这里的 Synchronous指的就是读写线程需要同步,一个读线程匹配一个写线程,同理一个写线程匹配一个读线程。 不像ArrayBlockingQueueLinkedBlockingDeque之类的阻塞队列依赖AQS实现并发操作,SynchronousQueue直接使用CAS实现线程的安全访问。
较少使用到 SynchronousQueue 这个类,不过它在线程池的实现类 ScheduledThreadPoolExecutor 中得到了应用。
public class SynchronousQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable { //内部栈 static final class TransferStack extends Transferer {} //内部队列 static final class TransferQueue extends Transferer {} public SynchronousQueue() {this(false); } public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue() : new TransferStack(); } }

SynchronousQueue代码演示
package com.rumenz.learn.synchronousqueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; public class SynchronousQueueExample { public static void main(String[] args) { SynchronousQueue queue = new SynchronousQueue<>(); ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.execute(()->{ try { System.out.println(Thread.currentThread().getName()+"put 1"); queue.put("1"); System.out.println(Thread.currentThread().getName()+"put 2"); queue.put("2"); System.out.println(Thread.currentThread().getName()+"put 3"); queue.put("3"); System.out.println(Thread.currentThread().getName()+"put 4"); queue.put("4"); }catch (Exception e){ e.printStackTrace(); }}); executorService.execute(()->{ try{ TimeUnit.SECONDS.sleep(1); System.out.println("获取数据:"+queue.take()); TimeUnit.SECONDS.sleep(1); System.out.println("获取数据:"+queue.take()); TimeUnit.SECONDS.sleep(1); System.out.println("获取数据:"+queue.take()); TimeUnit.SECONDS.sleep(1); System.out.println("获取数据:"+queue.take()); }catch (Exception e){ e.printStackTrace(); }}); executorService.shutdown(); }}

关注微信公众号:【入门小站】,关注更多知识点
【Java高并发BlockingQueue重要的实现类二】Java高并发BlockingQueue重要的实现类二
文章图片

    推荐阅读