设计模式|观察者(observer)模式(二) —— 实现线程安全的监听器

1. observer模式 VS 事件监听模式

  • 【设计模式|观察者(observer)模式(二) —— 实现线程安全的监听器】之前有总结过Java的时间监听机制:java的事件监听
  • 在上一篇博客中,也明确指出observer模式又叫 source - listener模式,即事件监听模式
  • 仔细对比observer模式和事件监听模式,不难发现:subject对应事件源,observer对应listener,subject状态变化时向observer传递的数据对应event
  • 下图来自博客:设计模式之 —— 观察者模式进阶:监听器模式,很好地展示了各部分之间的对应关系
    设计模式|观察者(observer)模式(二) —— 实现线程安全的监听器
    文章图片

  • 可以说,事件监听模式,就是observer模式的另一种应用形式
2 事件监听模式的不同实现方法
  • 博客java的事件监听中,通过实现EventListener接口,实现了对动物进食事件的监听
  • 还可以使用匿名内部类、lambda表达式自定义监听器
2.1 匿名内部类
  • 使用匿名内部类实现对就诊时叫号的监听
    // 自定义事件监听器接口 public interface CallEventListener { void onCallEvent(CallEvent event); }// 基于EventObject,定义事件 public class CallEvent extends EventObject { public CallEvent(Object source) { super(source); System.out.println(Thread.currentThread().getName() + " ---- 生成callEvent, 事件源: " + source); } }// 定义事件源,支持注册、移除、触发事件监听器 public class Caller { private final int room; private int number; private final List listeners; public Caller(int room) { this.room = room; this.listeners = new ArrayList<>(); }public void addCallEventListener(CallEventListener listener) { if (listener != null && !listeners.contains(listener)) { System.out.println(Thread.currentThread().getName() + " ---- 开始添加listener"); listeners.add(listener); } }public void deleteCallEventListener(CallEventListener listener) { listeners.remove(listener); }public void call(int number) { System.out.println(Thread.currentThread().getName() + " ---- 开始叫号" + number); if (number != this.number) { this.number = number; // 自身作为source,新建新建CallEvent,通知所有注册的listener CallEvent callEvent = new CallEvent(this); System.out.println(Thread.currentThread().getName() + " ---- 触发callEvent, 事件源: " + this); for (CallEventListener listener : listeners) { listener.onCallEvent(callEvent); } } }@Override public String toString() { return "Caller@" + this.hashCode() + "{" + "room=" + room + ", number=" + number + '}'; }public int getRoom() { return room; }public int getNumber() { return number; } }// 使用匿名内部类注册事件监听器,测试整个程序 public class Main { public static void main(String[] args) { Caller caller = new Caller(3); // 通过匿名内部类,注册监听器 caller.addCallEventListener(new CallEventListener() { @Override public void onCallEvent(CallEvent event) { if (event.getSource() instanceof Caller) { Caller source = (Caller) event.getSource(); if (source.getNumber() == 2) { System.out.printf(Thread.currentThread().getName() + " ---- 我是%d号病人,马上去%d诊室就诊\n", source.getNumber(), source.getRoom()); } } } }); // 开始叫号 caller.call(1); caller.call(2); } }

  • 最终执行结果如下:
    设计模式|观察者(observer)模式(二) —— 实现线程安全的监听器
    文章图片

  • 匿名内部类的实现方式,是Java GUI实现事件监听最常用的实现方式
2.2 使用lambda表达式
  • 从代码可知,CallEventListener就是一个函数式接口,可以使用lambda表达式进行实现
    // 通过lambda表达式,注册监听器 caller.addCallEventListener(event -> { if (event.getSource() instanceof Caller) { Caller source = (Caller) event.getSource(); if (source.getNumber() == 1) { logger.info("我是{}号病人,马上去{}诊室就诊", source.getNumber(), source.getRoom()); } } });

2.3 listener内存泄漏
  • 不知读者是否发现一个问题:通过匿名内部类或者lambda表达式实现的listener,主程序无法获得其引用,也就无法调用事件源Caller的deleteCallEventListener()方法注销listener
  • 这样的实现,将存在上一篇博客提到的Lapsed listener problem带来的内存泄漏问题
  • 可以将addCallEventListener()方法稍作修改,使其返回注册后的listener
    public CallEventListener addCallEventListener(CallEventListener listener) { if (listener != null && !listeners.contains(listener)) { listeners.add(listener); }return listener; }

3. 线程安全的监听器 3.1 线程不安全
  • 以上代码单线程运行,不会出现任何问题,但在多线程环境下就会出现各种意想不到的错误
  • 例如,一个线程在添加listener时,另一个线程在执行叫号操作,新添加的listener可能会收到/收不到这次的叫号通知。
  • 直接实现Patient类,作为listener
    public class Patient implements CallEventListener{ private final int number; public Patient(int number) { this.number = number; }@Override public void onCallEvent(CallEvent event) { if (event.getSource() instanceof Caller) { Caller source = (Caller) event.getSource(); if (source.getNumber() == number) { System.out.printf(Thread.currentThread().getName() + " ---- 我是%d号病人,马上去%d诊室就诊\n", number, source.getRoom()); } } } }

  • 多线程添加listener、叫号,程序执行结果多种多样,甚至执行失败
    public static void main(String[] args) { Caller caller = new Caller(3); // 添加病人的同时,进行叫号操作,病人可能没法收到叫号通知,从而错过叫号 new Thread(() -> caller.addCallEventListener(new Patient(3))).start(); new Thread(() -> caller.addCallEventListener(new Patient(1))).start(); new Thread(() -> caller.call(1)).start(); new Thread(() -> caller.call(2)).start(); new Thread(() -> caller.addCallEventListener(new Patient(2))).start(); }

  • 例如,下面的执行结果中,没有一个病人被成功叫号
    设计模式|观察者(observer)模式(二) —— 实现线程安全的监听器
    文章图片

  • 甚至,可能因为注册listener的同时迭代listener list,出现ConcurrentModificationException异常
3.2 synchronized保证线程安全
  • 多线程同时访问Caller中各方法时,存在线程安全问题
  • 最简单的解决办法,为每个方法添加synchronized关键字,保证多线程间的同步
    public synchronized void addCallEventListener(CallEventListener listener)public synchronized void deleteCallEventListener(CallEventListener listener)public synchronized void call(int number)

  • 期望的执行结果如下:
    设计模式|观察者(observer)模式(二) —— 实现线程安全的监听器
    文章图片

  • 使用synchronized关键字,保证同一时刻只有一个线程访问Caller,不会因为多线程交替执行而产生各种奇怪的执行结果
  • synchronized可以看做是一个重量级锁、互斥锁
    • 进行注册、删除listener(即病人)这样的写操作时,互斥是必要的。
    • 但通知listener这样的读操作(call()方法),没必要互斥,可以多线程同时执行(叫号这样要求有顺序的场景是不行的,都怪自己一开始给错了需求场景
    • 而且,迭代调用所有的listener的事件处理方法(这里为onCallEvent()方法)需要一定的时间:可能是listener很多,也可能是listener执行事件处理方法需要一定的时间
  • 同时,synchronized不保证操作的执行顺序。
    • 例如,实际执行时,Thread A叫1号的操作早于Thread B叫2号的操作
    • 但是由于synchronized锁被其他线程占有,使得两个线程都将阻塞并在同步队列中等待synchronized锁
    • 等到synchronized锁释放后,Thread B因为竞争synchronized锁成功,使得call 2号病人先于call 1号病人
      设计模式|观察者(observer)模式(二) —— 实现线程安全的监听器
      文章图片
3.3 公平的ReentrantReadWriteLock
  • 上面的场景中,注册、删除listener这样的写操作,必须和其他线程的读写操作互斥;通知listener的操作,得按照叫号顺序依次执行
  • 这时,可以考虑使用公平的ReentrantReadWriteLock,满足以上需求,解决synchronized关键字存在的问题
  • 由于作者是个菜鸡,需求场景没给对,导致这里没法基于已有的代码给出一个正确的示例
  • 具体可以参考博客The Observer Pattern Using Java 8中,关于Ordered Notification of Listeners的示例程序
  • 关键思想: 只要Thread 1获取锁的操作早于Thread 2, Thread 1一定能早于Thread 2获取到锁,而非像使用synchronized关键字一样,两个线程靠运气去竞争锁
3.5 另一篇文章推荐
  • Thread safe Observer design pattern in Java,这篇文章及其后面的评论都值得一读
  • 从单线程的observer模式实现为例,探讨了如何一步一步实现一个线程安全的observer模式
    • 单线程存在各种无法预料的问题,例如存放observer的set,多次初始化、以及observer丢失等
    • 基于ConcurrentHashMap创建存储observer的即时初始化(相对 lazy initialization而言) 的set,保证了线程安全,但不保证调用notifyObservers()时,基于最新的observer集合进行迭代
    • 使用synchronized关键字,有两个问题:① 容易因为notifyObservers()迭代通知所有的observer时间太长,其他操作阻塞;② observer执行自身的notify()方法时,如果尝试获取其内部的synchronized锁会被阻塞(暂时不太能理解)
    • 更新synchronized的锁范围:先复制set,然后基于set副本迭代通知observer;其中,对set的复制操作需要加锁
  • 评论中的建议:
    • 基于ConcurrentHashMap创建存储observer的set,改为直接使用CopyOnWriteArraySet,可以免除复制set的操作
    • 使用ReentrantReadWriteLock + 公平队列,仍需复制存储observer的set
5. 后记 5.1 其他
  • 博客The Observer Pattern Using Java 8,还介绍了listener的其他小技巧
  • 技巧一: 一个listener接口需要处理多种事件时(多个事件处理方法),可以考虑基于Listener接口实现Adapter类;用户创建listener时,只需要继承Adapter类,并按照需求实现某个方法,而无需重写所有方法 —— Java AWT中的MouseListenerMouseAdapter就是采用了这种方法
    • JDK 8以后,可以将Listener接口中的方法定义为default方法
    • 将Listener接口改为所有方法体为空的Listener类,但其后期不支持继承多个Listene类(Java的单继承、多实现)
  • 技巧二: 如果listener中的事件处理方法是复杂的,甚至是阻塞的,该如何处理?
    • 为每个listener分配一个线程,避免串行执行带来的副作用
    • 为事件源(observer中叫subject)的notifyListeners()方法分配一个线程
    • Queue the listener function invocations and have a set of threads execute the listener functions,暂时还不是特别懂
5.2 参考链接
  • 设计模式之 —— 观察者模式进阶:监听器模式
  • 虽然没看懂,但起码知道有哪些现成的listener可用:设计模式 - 事件监听者模式 - JDK & Spring & Guava 各有千秋
  • 王者之作1:Thread safe Observer design pattern in Java
  • 王者之作2:The Observer Pattern Using Java 8

    推荐阅读