Java并发编程 - 线程

Java内存模型 Java内存模型(JMM)是一个中间层的模型,是物理内存模型的映射,它为程序员屏蔽了底层的硬件实现细节(CPU缓存一致性及内存屏障等问题),也屏蔽操作系统的内存访问差异,以实现Java程序在各种平台下都能达到一致的内存访问效果。Java内存模型如下图所示:
Java并发编程 - 线程
文章图片

Java线程对变量的所有操作都在各自的工作内存(主内存的副本)中进行,不能直接读写主内存(volatile变量也不例外),也不能读写其他线程的工作内存。
内存可见性
内存可见性是指,一个线程对变量的修改能够立即被其他线程感知到,即能够立即读取到最新值。可见性问题的来源是Java内存模型里各线程的工作内存,同时由于指令重排序的存在,使可见性错误总是违背我们的直觉。为确保多个线程之间对内存写入操作的可见性,必须使用同步机制。比如volatile修饰,或者使用锁。
以下程序:线程1读取变量run来控制循环,线程2用来改变run的值为false。但线程1始终无法读取到线程2对变量run更改后的值,导致线程1的循环无法结束。

public class VisibleTest { static boolean run = true; static int other = 1; public static void main(String[] args) { //线程1 Thread thread1 = new Thread(() -> { int loop = 0; while (true) { // 第10行 if (!run) {break; } loop += 1; other = 2; } System.out.println(String.format("T1 end, run=%s, loop=%s, other=%s, time=%d", run, loop, other, System.currentTimeMillis())); }); thread1.start(); //线程2 Thread thread2 = new Thread(() -> { try { Thread.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } run = false; }); thread2.start(); // 主线程 System.out.println(String.format("mainT run=%s, other=%s, time=%d",run, other, System.currentTimeMillis())); } }

使线程1可以读取到线程2对变量run的更新值(即让线程1结束循环)的方法,有以下几种:
1.变量run增加volatile修饰;
2.变量other增加volatile修饰;
3.在第10行增加:System.out.println("-"); 使产生IO操作。
4.在第10行增加:Thread.yield(); 让cpu重新选择执行的线程(当前线程也可能被再次选中),以触发线程切换。
5.在第10行增加:一个synchronized方法的调用。以产生获取锁和释放锁的操作。上述方法中的任何一个都可以让线程1结束循环并退出。
这里涉及一个问题,什么时候工作内存的变量会失效?答案是:1.线程释放锁时;2.线程切换时;3.cpu有空闲时间时,比如线程休眠,IO操作。工作内存的变量失效后,线程会从主存读取此变量的值。
Happens-Before规则
编译器生成的指令顺序可以与源代码中的顺序不同。同时,为了提高执行效率,处理器可以采用乱序或并行等方式来执行指令。这些乱序操作的前提是,程序的最终结果与在严格串行环境中执行的结果相同。
JMM为程序中的操作提供了一个Happens-Before规则,它是一个原生的规则,无需使用任何同步手段就已经存在,由JVM保证。如果操作A和操作B满足这个规则,那么操作A一定先于B执行,并且A的执行结果对B可见(即B能观察到A产生的影响)。如果两个操作A和B没有Happens-Before关系,那么JVM可以对它们进行任意地重排序,执行结果是无法预测的。这些规则包括:
1.程序顺序规则。在一个线程内,操作A和B按书写的逻辑顺序执行;
2.监视器锁规则。同一个监视器锁上的解锁操作必须在加锁操作之前执行;
3.volatile变量规则。对volatile变量的写入操作必须在对该变量的读取之前执行;
4.线程启动规则。Thread对象的start方法必须在该线程其他任何操作之前执行;
5.线程结束规则。线程中的任何操作都必须在其他线程能检测到该线程已结束之前执行。该线程已结束的标志是从Thread.join()返回,或者Thread.isAlive()返回false;
6.线程中断规则。当一个线程A调用另一个线程B的interrupt()方法时,此方法必须在线程B检测到中断事件之前执行。
7.对象终结规则。对象的构造函数必须在启动该对象的终结器finalize()之前执行完成;
8.传递性。如果操作A在B之前执行,并且操作B在C之前执行,那么操作A必须在C之前执行。
通过Happens-Before规则,可以分析并发环境下两个操作是否可能存在冲突,是否存在安全性问题。Happens-Before规则与时间先后顺序没有因果关系,在分析线程安全问题时不要受时间顺序的干扰。
工作内存和主存之间的数据同步
问题: 一个线程何时会从主存中去重新读取共享变量的值,又是何时需要将工作内存的值重新刷写到主存中。
前提: 不使用volatile关键字保证内存可见性的情况。因为volatile标识的变量告诉JVM此变量是易变的,使线程工作内存的值总是失效而必须每次都从主存取值。
工作内存的值写入主存的时机:线程结束,线程释放锁,线程切换,抛出异常等情况。
线程重新读取主存的值到工作内存的时机:线程切换,获取锁(虽然锁操作只对同步代码块起到作用,但影响的却是线程执行所使用的所有字段),cpu有空闲时间(比如线程sleep,IO操作等)。
线程 线程是Java里进行CPU资源调度的最小单位。Java Thread类的关键方法都是native方法,意味着Java线程是与操作系统平台相关的。
线程的实现
Java线程如何实现并不受Java虚拟机规范的约束,虚拟机厂商自行决定。目前主流的Java虚拟机的线程模型都是基于操作系统原生线程模型来实现的,Java线程与操作系统的内核线程是1:1的映射关系。JVM不会干涉线程的调度和执行,全权交给操作系统去处理。
Java线程有6种状态(Thread.State枚举类定义):new, runnable(包含ready和running), blocked, timed_waiting, waiting, terminated。线程的状态转换图如下:
Java并发编程 - 线程
文章图片

协程
协程(coroutine),是协同式调度的用户线程(非内核线程),分有栈/无栈协程。
协程的出现是因为操作系统内核线程有其天然缺陷:线程切换和调度的成本高昂,即响应中断、保护和恢复执行现场的成本高昂。系统能容纳的线程数量也有限(在几百和上千的量级)。协程的主要优势是轻量,一个协程的栈大小通常在几百字节到几KB(而线程的默认内存大小是1M),系统能容纳几万量级的协程。
纤程(Fiber),是一种有栈协程的特例实现,由JVM调度而非操作系统调度,由OpenJDK Loom项目开发。
线程的开销
有以下4种开销:
  • 线程创建。第1步new Thread()做初始化操作,第2步thread.start()调用操作系统API创建native thread。同时也有对内存的开销,通过-Xss设置线程堆栈内存大小,默认值1M。当线程被使用时才会真正消耗物理内存,否则只分配给虚拟内存。线程创建和启动约70-80微秒。
  • 线程切换。也叫上下文切换,需要保存和恢复执行现场。当一个新线程被切换进来时,它所需要的数据可能不在当前处理器的缓存中,因此线程在首次运行时会更缓慢。调度器会为每个线程分配一个最小执行时间,以分摊上下文切换的开销,从而提高整体的吞吐量,但缺点是损失了响应性。一次线程切换的耗时约几微秒,切换次数可以通过vmstat命令查看(unix系统)。
  • 内存同步。synchronized和volatile提供的可见性保证中可能会使用内存栅栏(Memory Barrier),它会刷新缓存,使缓存无效,同时会抑制一些编译器优化操作,使大多数操作不能被指令重排,从而对性能带来间接影响。同步还会增加共享内存总线上的通信量,而所有处理器都将共享这条总线,从而影响其他线程的性能。如果是无竞争的同步,那么它对性能的影响微乎其微,可以忽略它。关注重点应该放在有竞争的同步。
  • 阻塞。当在锁上发生竞争时,竞争失败的线程肯定会阻塞。JVM在实现阻塞时可以采用自旋等待(Spin-waiting)或者通过操作系统挂起线程。如果等待时间短,自旋等待的效率高;反之,则线程挂起的效率高。自旋等待增加了CPU时钟的开销,挂起则增加了两次额外的上下文切换开销。
线程创建开销:https://lotabout.me/books/Jav...
上下文切换开销:https://eli.thegreenplace.net...
线程安全 在没有充足同步的情况下,多个线程中的操作的执行顺序是不可预测的,会产生安全性问题。一开始就设计一个线程安全的类非常重要,因为这比以后再将这个类修改为线程安全的类要容易得多。
什么是线程安全性
线程安全性:当多个线程访问某个类时,这个类始终都能表现出正确的行为,那么就称这个类是线程安全的。
正确性的含义是,某个类的行为与其规范完全一致。
哪些是不安全的代码:
  • Race Condition(竞态条件/竞争态势):并发编程中,由于多个线程不恰当的执行时序而出现不正确的结果。常见的race condition比如:多线程执行count++,由于自增运算不是原子操作,它包含3个独立地操作:读取-修改-写入。线程读取到的数值可能是前面3个操作中的任意一步的值,所以它的最终结果是不可预测的。
  • 在没有同步的情况下,编译器、处理器和运行时等都可能对操作的执行顺序进行一些意想不到的调整,即“指令重排”。
  • 非volatile的long和double变量,JVM允许将64位的读操作或写操作分解为两个32位的操作,即如果对该变量的读写操作在不同的线程中执行,那么很可能会读取到某个值的高32位和另一个值的低32位。因此在多线程程序中使用共享且可变的long和double变量是不安全的,保证其安全性需要加锁保护或者用volatile修饰。对象逸出:某个不该发布的对象被发布。
  • 对象逸出导致其他线程拿到半成品对象的引用,从而引发安全性问题。
如何保证安全性
  • 无状态对象一定是线程安全的。
  • 加锁
    每个Java对象都可以用作锁,它称为”内置锁“或监视器锁。内置锁是可重入的。synchronized修饰的方法,它用的锁就是方法所在的对象,静态的synchronized方法以Class对象作为锁。加锁也可以保证变量的内存可见性。
  • 内存可见性
    volatile变量,确保变量的更新操作被其他线程可见,是一种轻量级的同步机制(在大多数处理器架构上,读取volatile变量的开销只比读取普通变量略高一些)。
    编译器和运行时都会注意到volatile变量是共享的,不会在该变量上做指令的重排序,也不会缓存该变量在寄存器或对其他处理器不可见的地方,因此volatile变量的最新值总会被所有线程可见。
  • 对象的安全发布
    发布(Publish)一个对象是指,使对象能够在当前作用域之外的代码中使用。
    逸出(Escape)指某个不该发布的对象被发布了。
    发布内部状态可能会破坏封装性,并使程序难以维持不变性条件。
    不变性条件(invariant):不同变量之间的约束关系。
    前置条件(pre-conditions):在调用该方法或代码块之前,该条件必须为true;
    后置条件(post-conditions):在调用该方法或代码块之后,该条件必须为true;
  • 线程封闭
    线程封闭(Thread Confinement),是指不共享数据,线程独享一份数据。这是实现线程安全性最简单的方式之一。
    线程封闭技术的常见应用是JDBC的Connection对象,它不是线程安全的,但各个服务线程独享一个Connection对象。
    栈封闭,是线程封闭的一种特例,只能通过局部变量才能访问对象。基本类型的局部变量始终封闭在线程内。
    ThreadLocal类,是一种更规范的线程封闭方法。它用于保存一个线程独享的值。ThreadLocal提供了get和set方法。
  • 使用不可变对象
    不可变对象一定是线程安全的。满足以下条件时,对象才是不可变的:1.对象创建以后其状态就不能修改;2.对象的所有域都是final类型;3.对象是正确创建的(对象创建期间,this引用没有逸出)。
    并发任务任务的提交和执行
    任务执行多个并发任务通常使用执行器来执行,它能复用线程资源,把任务的创建和执行分离,使程序员能专注于任务的创建。执行器提供了两种任务提交方法:
    1.提交无返回值的任务,execute(); 任务实现 Runnable接口;
    2.提交有返回值的任务,submit(); 任务实现 Callable接口, 返回值封装在Future接口中;
    执行器的类间关系Java并发编程 - 线程
    文章图片

执行器
  • Executor接口
    抽象了任务的执行者,它会负责创建线程,启动线程,执行任务。接口只有一个方法:execute(Runnable run) - 提交一个无返回值的任务,立即返回;
  • ExecutorService接口
    继承了Executor接口,新增了更多的任务执行必须的方法:
    Future submit(Callable task) - 提交一个有返回值的任务,立即返回。内部调用了execute方法;
    List invokeAll() - 等待提交的所有任务都返回(或到达超时限制)后才返回。返回值为所有任务的future;
    T invokeAny() - 提交的任务中只要一个任务成功后即返回,其余任务均取消。返回值为成功的那个任务的值;
    shutdown() - 平缓的关闭过程:不接受新任务,等待运行中的和已经提交的任务执行完毕。它会立即返回,所以通常需要配合使用awaitTermination()方法,以阻塞等待;
    List shutdownNow() - 尝试取消所有运行中的任务,不再启动队列中尚未开始的任务。返回值是尚未开始执行的任务list。
  • AbstractExecutorService抽象类
    抽象类AbstractExecutorService 实现了 ExecutorService接口。
    ThreadPoolExecutor 继承 AbstractExecutorService 抽象类。
    ThreadPoolExecutor.Worker extends AbstractQueuedSynchronizer implements Runnable,封装执行的任务和线程。线程的复用由runWorker(Worker w)方法完成,它内部使用while循环不断从任务队列(workQueue)取任务给当前线程执行,当取回的任务为null时循环结束,线程也就退出。
    ForkJoinPool 继承 AbstractExecutorService 抽象类,类似于单机版map-reduce工作原理,把任务分解后并发执行再汇总。
  • CompletionService接口
    它只有一个实现类ExecutorCompletionService,用于按任务完成的先后顺序读取任务。其内部实现比较简单,把Executor和BlockingQueue的功能融合在一起。
  • Executors类
    是一个工具类,提供了一系列静态方法创建线程池等;
    例如创建固定线程数量的线程池: ExecutorService execService = Executors.newFixedThreadPool(10);
论文(翻译): https://blog.csdn.net/dhaibo1...
ForkJoinPool:https://cloud.tencent.com/dev...
任务的取消和停止
有时候我们希望提前结束任务或线程,比如设置的超时时间到了,或用户取消了操作,或应用程序需要被快速关闭。线程的立即停止可能会造成共享数据处于不一致的状态(因此没有安全的抢占式方法来立即停止线程,只有协作式的机制),良好的程序设计需要能完善地处理失败,取消和关闭行为。即需要管理好任务的生命周期。
线程中断 线程中断是最合理的“取消”方式,它是一种协作机制,可以通过中断来通知(不是强制)另一个线程停止当前的工作,并转而执行其他工作。当线程A中断线程B时,A仅仅是要求B在执行到某个可以暂停的地方时停止正在执行的操作。另外需要注意,JVM不能保证阻塞方法检测到中断的速度。
Thread的中断方法:
public void interrupt(); // 发出请求中断的信息给目标线程,然后目标线程在下一个合适的时刻(取消点)中断自己。 public boolean isInterrupted(); // 返回目标线程的中断状态 public static boolean interrupted(); // 清除线程的中断状态并返回它之前的值

中断策略指,当发现中断请求时应该做哪些工作,以及多快的速度来响应中断。最合理的中断策略是,尽快退出,在必要时清理,并通知线程所有者已经退出。
哪些任务(代码)可以被中断?
当阻塞方法收到中断请求的时候就会抛出InterruptedException异常,从而被中断执行。并非所有的阻塞方法或阻塞机制都能响应中断,比如等待内置锁的阻塞,等待同步的Socket IO。
非阻塞任务的取消 1.线程内使用共享变量(volatile修饰)isCancel来频繁检测是否任务取消,从而控制任务结束。当外部线程设置共享变量isCancel=true时,线程能很快发现这个更改,从而采取取消措施;
2.线程内频繁检测 Thread.currentThread().isInterrupt(),从而采取取消措施。
不可中断的阻塞 并非所有的阻塞方法或阻塞机制都能响应中断,比如执行同步的socket IO,或等待获得内置锁的阻塞。这时希望取消运行中的线程,可以通过改写Thread类的interrupt方法,将非标准的取消操作(比如关闭socket)封装在线程中。对于内置锁,可以使用lock.lockInterruptibley()方法加锁,这个方法能够在获得锁的同时保持对中断的响应。
计时运行 对任务进行超时设置,如果任务不能在指定的时间内返回,那么调用者就不再等待。需要注意,当这些任务超时后应该立即停止(例如Future.cancel(true)),避免为计算一个不再使用的结果而浪费计算资源。编写的任务是否可取消,在设计时需要留意。
通过Future来取消 Future有一个cancel方法,该方法带有一个boolean型参数:mayInterruptIfRunning。为true,表示如果任务正在运行,那么将被中断;为false,表示若任务还没启动则不要启动它,但如果任务已在运行是不会中断它的。如果你不知道线程的中断策略,就不要中断线程。由Executor创建的线程,其中断策略可以使任务通过中断取消,所以如果任务在Executor中运行,用future.cancel(true)来取消任务是安全的。
线程泄漏 导致线程提前死亡的最主要原因是RuntimeException。当一个线程由于未捕获异常而退出时,JVM会把这个事件报告给UncaughtExceptionHandler处理器,默认处理是把异常栈信息输出到System.err。
为线程池中所有线程设置一个UncaughtExceptionHandler,需要为ThreadPoolExecutor的构造函数提供一个ThreadFactory,在线程创建时的newThread()方法内部进行设置。 但只有execute()提交的任务才能将它抛出的异常交给UncaughtExceptionHandler,submit()提交的任务的异常会封装在Future.get()抛出的ExecutionException里。
@Override public Thread newThread(Runnable runnable) { String threadName = String.format("%s-%04d-%04d", threadNamePrefix, id, threadID.getAndIncrement()); Thread t = new Thread(runnable, threadName); t.setDaemon(isDaemon); //t.setUncaughtExceptionHandler(videoRecUncaughtExpHandler); // 匿名内部类实现未捕获异常处理器 t.setUncaughtExceptionHandler((thread, throwable) -> LOGGER.error("Thread:{} dead, error:{}", thread.getName(), throwable.getMessage())); return t; }

线程池的使用
配置ThreadPoolExecutor ThreadPoolExecutor的构造函数:
public ThreadPoolExecutor( int corePoolSize, //线程池的基本大小,即目标大小 int maximumPoolSize, //线程池的最大大小 long keepAliveTime, //线程数量超过coreSize时,等待此存活时间后仍然空闲则回收 TimeUnit unit, BlockingQueue workQueue, //任务队列 ThreadFactory threadFactory, //负责创建新线程 RejectedExecutionHandler handler); //当任务队列填满后执行的饱和策略

没有任务执行时线程池的大小是coreSize;当任务队列满了,会创建超过coreSize的线程,但最大数量小于maximumPoolSize。
超过coreSize的线程,在等待keepAliveTime之后仍然空闲则会被销毁,使线程数量回到coreSize大小。
设置线程池的大小 线程是昂贵的资源,不能把线程池设置得过大,同时也要关注JVM进程总的线程数量,尽量减少应用的线程数量,提高线程的利用率。线程过多不但占用更多内存,也增加了线程上下文切换的开销,导致cpu的利用非常低效。线程池大小的设置目标是:线程无空闲,任务无等待。
线程池合适的大小(来源为《Java并发编程实战》):
Nthreads = Ncpu * Ucpu* (1 + WaitTime / ComputeTime)

公式解释:cpu的数量(Ncpu) 乘以 cpu的目标利用率( Ucpu) 乘以 (1 + 任务等待时间 与 其计算时间的比值)。即任务等待时间越长(IO密集型),线程池应该越大。相反,任务计算时间越长(计算密集型),线程池应该越小。
推荐引擎服务中,大量的召回线程和排序线程都是在等待子服务返回,属于IO密集型。再结合设计的单机支持的QPS上限,设为Q,每个任务(取用户画像,召回,排序)的平均执行时间为T毫秒,一次请求会触发的任务数量为M,可以粗略估算服务需要的总线程数量为:
N = Q * M * T / 1000

QPS有高低峰,可以使用日均QPS作为Q计算N的值作为corePoolSize。高并发系统需要的线程数量可能很庞大,再结合上面 Nthreads 的理论公式,估算单机需要分配多少cpu。如果cpu过多,则需要调整单机的QPS设计上限。因为根据Amdahl定律,在增加计算资源的情况下,程序在理论上能够实现的最高加速比,取决于程序中可并行组件和串行组件的比重。并不是单机的cpu越多越好。
另外,可以在程序运行时动态设置corePoolSize和maximumPoolSize,ThreadPoolExecutor提供了两个public方法:setCorePoolSize(int), setMaximumPoolSize(int)。
同时线程池也提供了方法在初始化时进行线程池预热:prestartCoreThread(), prestartAllCoreThreads()。
通常,高并发系统中需要进行线程池隔离,防止所有任务提交到一个线程池产生饥饿现象。把重要任务提交到独立的隔离的线程池,从而保证重要任务的执行不受其他任务的影响。
QueueSize设置 queueSize设置的目标是:高峰期能触发创建新的线程使线程数量扩充到maximumPoolSize,其他时段线程数量维持在corePoolSize,同时不会触发饱和策略。
任务队列长度queueSize的设置主要根据:corePoolSize,单位时间产生的任务数量,单个任务处理时间,以及对任务等待时间的上限决定。如果queueSize设置过大,那么无法使线程数量扩充到maximumPoolSize;如果queueSize设置过小,会容易使队列满,从而频繁触发线程池饱和策略(默认是AbortPolicy)。
比如,任务等待时间的上限为100ms,100ms会有1500-4500(低谷和高峰期)个任务提交到线程池。单个任务的处理时间为3-5ms,corePoolSize=90,那么100ms 90个线程能处理 90*100/5 = 1800,或3000个任务。当线程数量扩充到maximumPoolSize=150时,100ms的处理能力为3000-5000任务。那么queueSize设置在4000左右即可。
参考文献 【Java并发编程 - 线程】[1] 周志明.《深入理解Java虚拟机-第3版》 机械工业出版社,2019
[2] Brian Goetz,Tim Peierls等.《Java并发编程实战》 机械工业出版社,2012
[3] Eli Bendersky,上下文切换开销测量:https://eli.thegreenplace.net...,2018

    推荐阅读