Java: 多线程编程
Java多线程编程
Runnable接口
- 所有类,如果希望让它单独作为一个线程运行,可以继承
Thread类并重写void run()方法 - 所有类,如果希望让它单独作为一个线程运行,更常用的方法是实现
Runnable接口以便用多态特性被调用 Runnable接口要求实现void run()方法,当这个类作为线程运行时,运行的就是这个run()方法 使用这种方法,实现了Runnable接口的类称为线程任务对象,传递给Thread运行,创建的Thread对象称为线程对象
Thread类
Thread的常用构造器:Thread():创建空线程Thread(Runnable task, String name):将实现了Runnable接口的方法引用传递给它,并命名为name
java线程包括七个状态:New:新建的线程,尚未执行Runnable:正在执行run()的线程Blocked:因阻塞而被挂起的线程Waiting:因某些原因在等待的线程Timed Waiting:因主动调用Thread.sleep()而计时等待的线程Terminated:终止的线程,run()因各种原因而结束
start():启动一个线程,使其开始以另一个线程的形式执行run()在主线程直接调用某个线程类的run()无法达到多线程的效果,必须通过start()在JVM中登记join():使当前线程等待调用该方法的线程实例结束,再继续运行join(long):使当前线程仅等待有限时间,其它同上interrupt():使该线程的中断标志位置1,可以通过isInterrupted()循环检查,实现中断线程的效果 但interrupt()并不立刻生效,仅仅是发出一个中断请求 而且当外部线程调用该线程的interrupt()方法时,若该线程处于等待状态(例如调用join()或sleep()),则join()或sleep()等会抛出InterruptedException异常volatile关键字:由于JVM的内存模型,在线程修改共享变量时不会立刻写回主内存 而volatile修饰的变量则会使JVM在读取时总是读取最新值、写入时总是立刻写入 但volatile并不保证原子性,在读写含有多个字段的volatile变量时可能会有问题setDaemon(true):一个线程默认是非守护线程,JVM进程会等待所有的非守护线程结束后再结束 但一些线程是无限循环的,可以调用setDaemon(true)将它们设置为守护线程,JVM进程结束时不会关心它们是否结束 守护线程不能占有任何需要显式关闭的资源,守护线程本身无法保证这些资源能在JVM进程结束时关闭
传统线程同步
不同线程在读写同一份资源、或需要相互协作时,就需要考虑线程同步问题 除非资源是只读的,例如不可变类型,则不需要考虑线程同步 大部分标准库中的类为了提高性能,都是非线程安全的,涉及到非线程安全的读写操作时,必须手动添加线程同步代码以保证线程安全
synchronized关键字:- 作用于对象时,会对该对象加锁
- 修饰实例方法时,等价于作用于
this - 修饰静态方法时,等价于作用于所在类的
class实例 - 一般不用该关键字修饰方法,因为会导致加锁混乱、不明确且在很多情况会使两个本不冲突的方法变为冲突
使用
synchronized的代码块无法并发执行,且加锁解锁有额外开销 对某对象加锁,不代表其它线程就无法访问该对象,如果一个线程对该对象加锁而另一个线程并不这样做,则仍存在线程同步问题
JVM的基础原子操作:除long、double以外的任意变量的赋值操作 涉及多行的赋值操作时,仍需要用synchronized修饰代码块可重入锁:
JVM允许同一个线程重复获取同一个锁,其本质是一个信号量,进入/退出synchronized代码块使信号量加/减1死锁的必要条件:互斥、不可抢占、占有并等待、循环等待,只要破坏其中一种条件即可避免死锁
- 破坏循环等待:所有线程获取同一组锁的顺序保持一致,这是最简单的一种方案
- 互斥不可破坏,破坏不可抢占可能导致混乱,破坏占有并等待必须一次性分配需要的资源、资源利用率低且可能导致饥饿
继承自
Object的wait():可使该线程暂时放弃调用wait()的对象锁,进入等待状态直至被唤醒,唤醒后立刻尝试重新请求这个对象锁 需要注意这不是作用于线程类对象的,而是作用于被加锁的资源继承自
Object的notify()和notifyAll():可随机唤醒某一个等待该资源的线程/唤醒全部等待该资源的线程 同上,这两者作用于被加锁的资源,通常后者更安全notify()可能会唤醒同类线程,导致活锁或死锁,例如生产者消费者问题: 两个消费者阻塞→生产者P1唤醒消费者C1→两个生产者阻塞→消费者C1消耗资源,但唤醒同类消费者C2→两个消费者阻塞,至此所有线程均阻塞,造成死锁- 使用
notifyAll(),至少能唤醒一个非同类线程,而其它同类线程应该继续等待,所以wait()应该在循环里而非if语句块中
JVM线程同步原理是对象监视器,Object的wait()和notify()相当于每个变量都可以作为信号量的封装,由于wait()使线程可以短暂释放已获得的锁,使其不需要像操作系统课程上讲的那般麻烦(需要互斥信号量与同步信号量且互斥信号量的PV操作紧贴临界区),而是使同步信号量围绕着互斥资源通过wait()和notify()进行同步 例如在生产者消费者问题中,互斥资源的空/满可以化作while中的条件,充当同步信号量1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22// 一个线程同步的生产者消费者队列类
private Queue<Integer> q;
private int size;
private static final MAX_SIZE = 10;
// Consumer
synchronized (this) {
while (size == 0) { // Condition: Empty
wait();
}
e = q.poll(); // consume
--size;
notifyAll();
}
// Provider
synchronized (this) {
while (size == MAX_SIZE) { // Condition: Full
wait();
}
q.offer(e); // provide
++size;
notifyAll();
}
java.util.concurrent.locks与Semaphore
synchronized的加锁机制是悲观锁、重量级锁、阻塞的,这种锁适合竞争激烈的多线程同步场景,线程先必须获得锁(进入或退出Monitor)才能读写对象java.util.concurrent是Java 5开始提供的高级并发包,以下是需要提前了解的一些概念- 乐观锁:线程无需获取锁地尝试修改,在修改时检查是否冲突 理念是估计在读过程中不会有其它线程在写 乐观读锁在读多写少的场景中好处显而易见:一是乐观读锁不排它,减少写锁饥饿的情况;二是因为写少,乐观读大概率成功而减少了悲观锁的阻塞开销
CAS(Compare And Swap或Compare And Set)机制是实现乐观锁的一种方式:线程查询内存中的值和此前读取的值是否相同,若相同则更新,否则失败 若失败则读取内存中的值,循环地进行CAS直至更新成功CAS机制存在ABA问题,即线程T1尝试通过CAS读写时,虽然内存值和此前读取的值一致,但这个内存值A可能被另一个线程先改为B再改为A,在T1看来没有改变过的资源实际上被其它线程更改过 因此实现上对资源的更改会添加时间戳/版本号CAS机制有时会配合自旋的机制: 自旋锁优点在于可避免不必要的上下文切换开销,缺点在于循环导致的CPU忙等 自旋锁仅在CPU多核的并行处理场景中,线程能在忙等中获取其它线程释放的资源时才有效 接下来从完全悲观到完全乐观地介绍java.util.concurrent提供的线程同步机制
Lock接口:可以替代synchronized,是一种显式锁,由代码层面而非语法层面实现加锁和解锁,其实现类是对synchronized的封装Lock是悲观锁lock():显式加锁unlock():需要在finally块中解锁tryLock()和tryLock(long, TimeUnit):Lock支持非阻塞获取锁或有限忙等地获取锁,前者仅尝试一次、后者在有限时间内循环尝试 返回true表示获取成功newCondition():返回一个Condition对象,Lock支持多条件锁,与synchronized单锁相区别ReentrantLock是Lock的实现类之一,译为“可重入锁”- 支持公平锁,在构造时传递
true即可 公平锁即每次向等待队列加入新的线程时,它无法插队,保证每次获取锁的线程是队列中等待时间最长的线程
Condition接口:表示某条件的等待队列,如上文传统的线程同步代码中,由于空/满这两种条件使不同类线程在同一等待队列中,因此会出现死锁现象而必须用notifyAll()来唤醒非同类线程 而Condition和Lock允许在一个锁对象上分配多个不同条件的等待队列,能保证每次唤醒都能唤醒非同类线程await():使线程在该条件上短暂地释放锁,进入等待状态,被唤醒后重新尝试获取await(long, TimeUnit):可以在有限时间内地等待signal():唤醒在该条件上等待的某个线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29private Queue<Integer> q;
private int size;
private static int MAX_SIZE = 10;
private Lock lock = new ReentrantLock();
private final Condition notempty = lock.newCondition(), notfull = lock.newCondition();
// Consumer
lock.lock();
try {
while (size == 0) {
notempty.await();
}
e = q.poll();
--size;
notfull.signal();
} finally {
lock.unlock();
}
// Provider
lock.lock();
try {
while (size == MAX_SIZE) {
notfull.await();
}
q.offer(e);
++size;
notempty.signal();
} finally {
lock.unlock();
}ReadWriteLock接口:有时排它锁过于重量,在多读少写的场景下,希望允许多个线程同时读,这个接口就用于这种场景readLock():获取读锁writeLock():获取读锁 类的内部会维护,若有线程占有读锁,则不会有线程能占有写锁,反之亦然;若没有线程占有写锁,则允许多个线程占有读锁 虽然它实现了读写分离,但它仍是悲观锁,因此也可能导致需要写锁的线程饥饿ReentrantReadWriteLock是ReadWriteLock的实现类之一,是可重入锁
AQS框架:StampedLock类:Java 8开始提供的乐观读锁和读写分离的悲观锁两者的封装锁,在互斥上可替代ReadWriteLock,但要注意它不是可重入锁、不支持公平锁StampedLock和ReadWriteLock一样有readLock()和writeLock()方法,用于获取读锁和写锁 但它们的实现有所区别:它们不返回Lock对象而是在内部就调用了锁的lock()方法,然后返回long类型的版本号stamp- 因此需要使用
unlockRead(stamp)和unlockWrite(stamp)释放掉这个版本号的读写锁 StampedLock还支持乐观读锁,通过tryOptimisticRead()尝试获取乐观读锁的版本号stamp,然后通过validate(stamp)检验,乐观锁不需要解锁,因为本质上并没有加锁操作 如前文所说,乐观锁不排它,乐观读过程中允许其它线程写,如果检验成功,说明加乐观读锁途中没有线程占有死锁 如果检验不成功则有两种选择:自旋地重复乐观读、悲观读- 同
Lock一样支持非阻塞或有限阻塞地获取悲观读锁和悲观写锁 - 支持锁转换:
tryConvertToReadLock(stamp):若stamp有效,原子地,悲观读锁则返回其本身、悲观写锁则释放它并返回悲观读锁、乐观读锁则非阻塞地请求一个悲观读锁tryConvertToWriteLock(stamp):若stamp有效,原子地,悲观读锁且写锁空闲则返回写锁、悲观写锁则返回其本身、乐观读锁则非阻塞地请求一个悲观写锁tryConvertToOptimisticRead(stamp):若stamp有效,原子地,悲观锁则释放它们并返回乐观读锁、有效的乐观锁则返回其本身 - 但
StampedLock不支持基于条件的线程间协作,因此只适用于单个资源的互斥读写场景
Semaphore:信号量可以被最多N个线程获取,但不支持基于条件的线程间协作,用于复数个资源的互斥获取,它也支持公平锁acquire():阻塞地获取该信号量release():需要在finally块中释放tryAcquire()和tryAcquire(long, TimeUnit):非阻塞地或有限阻塞地获取信号量
Callable<T>接口与异步线程池
Callable<T>接口:由于Runnable的run()没有返回值且不允许抛出异常,因此Callable<T>诞生了,是单方法接口,包含方法T call() throws Exceptionjava线程池提供了多线程异步的功能,能更好地利用多核资源 所谓同步,即各个任务有一定的执行顺序,一些任务必须等待其它任务完成后,利用其计算结果才能继续运行;异步是各个任务没有执行顺序,任务通过回调函数或其它手段获取其它任务的计算结果,在这段时间内可以执行其它计算 同步异步与线程个数没有必然联系,单线程异步可以通过检测事件循环实现Future<T>接口是实现多线程异步的核心,表示“能在未来得到计算结果的对象”,包括以下核心方法:get():获取结果,如果其对应的计算任务尚未完成则会使调用get()的线程进入阻塞get(long, TimeUnit):仅等待有限时间地获取结果cancel(true):true表示通过发出中断请求来取消任务,若任务尚未开始或线程响应中断请求后则成功取消,返回true,否则返回falsecancel(false):若任务尚未完成则一定能取消成功,但任务可以继续执行直至结束 成功调用cancel()后,isCancelled()返回true,此后get()会抛出CancellationException异常
ExecutorService接口:- 任务提交:把任务提交给线程池,异步地执行
Future<T> submit(Callable<T> task):提交一个计算任务给线程池执行,立刻获得一个Future<T>对象 - 任务执行:把多个任务提交给线程池,当前线程阻塞,即同步地执行
List<Future<T>> invokeAll(List<Callable<T>>):执行多个计算任务,当前线程阻塞直至它们全部执行完成,返回值顺序和提交的任务顺序一致invokeAll()还允许传递时间参数,有限地阻塞 shutdown():停止接受所有新线程,执行完已提交的线程,然后关闭线程池shutdownNow():停止接受所有新线程,尝试中断地取消所有已提交线程,返回所有未开始的任务
- 任务提交:把任务提交给线程池,异步地执行
- 创建
ExecutorService对象:ExecutorsThreadPoolExecutor
Fork/Join线程池:
其它API
java.util.concurrent.atomic中的类基于CAS机制实现无锁的线程同步,使用乐观锁、非阻塞地封装资源 例如AtomicReferencte<T>、AtomicReferenceArray<T>,除此之外还对三个基本数据类型有更全面的封装:AtomicInteger、AtomicLong、AtomicBoolean如果ABA问题会影响业务逻辑,则应使用AtomicStampedReference或AtomicMarkableReference以下是AtomicReference一系列常用的原子方法:get():获取当前值set(T):volatile地写入新值compareAndSet(T ept, T upd):CAS地更新值,可能失败getAndSet(T):获取旧值,写入新值getAndUpdate()或updateAndGet():提供单元运算函数更新值,返回更新前/更新后的值getAndAccumulate()或accumulateAndGet():提供二元运算函数和加值,返回更新前/更新后的值
- 线程安全的集合:
List的实现类CopyOnWriteArrayListMap的实现类ConcurrentHashMapSet的实现类CopyOnWriteArraySetQueue的实现类ArrayBlockingQueue与LinkedBlockingQueueDeque的实现类LinkedBlockingDeque
CompletableFutureThreadLocal- 虚拟线程: