多线程
线程
- priority
- 线程的优先级是继承于创建它的线程的。
- 优先级只是代表告知了 「线程调度器」该线程的重要度有多大。如果有大量线程都被堵塞,都在等候运 行,调试程序会首先运行具有最高优先级的那个线程。然而,这并不表示优先级较低的线程不会运行(换言之,不会因为存在优先级而导致死锁)。若线程的优先级较低,只不过表示它被准许运行的机会小一些而已。
- daemon
- 只要当前JVM实例中尚存在任何一个非守护线程没有结束,守护线程就全部工作;只有当最后一个非守护线程结束时,守护线程随着JVM一同结束工作。
- thread.setDaemon(true)必须在thread.start()之前设置
- 守护线程最典型的应用就是 GC (垃圾回收器)
- User和Daemon两者几乎没有区别,唯一的不同之处就在于虚拟机的离开:如果 User Thread已经全部退出运行了,只剩下Daemon Thread存在了,虚拟机也就退出了。
- Thread.yield()
线程的六种状态
-
New (新建)
线程创建,但还没有开始执行(也就是没有执行 start 方法)
-
Runnable (运行)
包括操作系统线程状态中的 Running 和 Ready ,也就是出于此状态的线程有可能正在执行,也有可能正在等待着操作系统为它分配执行时间
-
Blocked (阻塞)
线程被被阻塞了,“阻塞状态”与“等待状态”的区别是“阻塞状态”在等待着获取到一个排它锁。
-
Waiting (无限等待)
处于这种状态的线程不会被分配处理器执行时间,他们要等待被其他线程显示唤醒。以下方式会让线程进入 Waiting 状态
- 没有设置 Timeout 参数的 Object::waiting() 方法;
- 没有设置 Timeout 参数的 Thread::join() 方法;
- LockSupport::park() 方法
-
Timed Waiting(限期等待)
处于这种状态的线程也不会被分配处理器执行时间,不过无需等待被其他线程显示唤醒,在一定时间之后他们会有系统自动唤醒。以下方式会让线程进入 Timed Waiting 状态
- Thread::Sleep(long)
- Object::waiting(long)
- Thread::join(long)
- LockSupport::parkNanos()
- LockSupport::parkUntil()
-
Terminated (结束)
已终止线程的线程状态,线程已经结束执行。
线程间通讯
- 数据共享 (Handler 的本质就是数据共享)
- 文件共享
- 变量共享(其实就是,两个线程使用同一个变量)
- 线程间协作
- Object::waiting()
- Object::notify()
- Condition
线程池
为什么要使用线程池
- 降低系统资源消耗,通过重用已存在的线程,降低线程创建和销毁造成的消耗;
- 提高系统响应速度,当有任务到达时,通过复用已存在的线程,无需等待新线程的创建便能立即执行;
- 方便线程统一管理:命名管理,异常管理,优先级管理,并发数的管理等。线程若是无限制的创建,可能会导致内存占用过多而产生OOM,并且会造成 CUP 过度切换(cpu切换线程是有时间成本的(需要保持当前执行线程的现场,并恢复要执行线程的现场))。
- 提供更强大的功能,延时定时线程池。
线程池参数
- corePoolSize
- maximumPoolSize
- keepAliveTime
- unit
- workQueue
- threadFactory
- handler
常见的几种线程池
- Executors.newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。(Executors.newCachedThreadPool())
- Executors.newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
- Executors.newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
- Executors.newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
先了解 BlockingQueue 阻塞队列
入队
- offer(E e):如果队列没满,立即返回true; 如果队列满了,立即返回false–>不阻塞
- put(E e):如果队列满了,一直阻塞,直到队列不满了或者线程被中断–>阻塞
- offer(E e, long timeout, TimeUnit unit):在队尾插入一个元素,,如果队列已满,则进入等待,直到出现以下三种情况:–>阻塞、被唤醒等待时间超时、当前线程被中断
出队
- poll():如果没有元素,直接返回null;如果有元素,出队
- take():如果队列空了,一直阻塞,直到队列不为空或者线程被中断–>阻塞
- poll(long timeout, TimeUnit unit):如果队列不空,出队;如果队列已空且已经超时,返回null;如果队列已空且时间未超时,则进入等待,直到出现以下三种情况:被唤醒、等待时间超时、当前线程被中断
线程池源码解析
```java // 线程池执行任务 public void ThreadPoolExecutor.execute(Runnable command) { if (command == null) throw new NullPointerException();
int c = ctl.get(); // 如果当前线程数少于核心线程数,添加核心线程执行任务 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 尝试添加到工作队列中 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)){ reject(command); }else if (workerCountOf(recheck) == 0){//如果当前线程数为0 ,启动的一个线程 addWorker(null, false); }
}else if (!addWorker(command, false)){//尝试用非核心线程执行 // 没有非核心线程执行失败,执行拒绝策略 reject(command); }
} private boolean ThreadPoolExecutor.addWorker(Runnable firstTask, boolean core) {
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
// 检查当前检查是否可以添加新的 Work
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 添加 Work 也就是线程并启动
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted; } // Worker.的run方法只是简单的调用 ThreadPoolExecutor.runWorker public void Worker.run() {
runWorker(this); } //ThreadPoolExecutor.runWorker 执行 work ,优先执行 work.firstTask 如果 firstTask 为 null 调用 getTask 从工作队列中获取新任务,如果没有得到任务则关闭 work final void ThreadPoolExecutor.runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
} }
private Runnable ThreadPoolExecutor.getTask() { boolean timedOut = false; for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } // 上面的代码就是检查一些状态和数量信息
1
2
3
4
5
6
7
8
9
10
11
12
try {
// 如果是当前线程数大于核心线程数,则 timed = true 代表有超时限制,调用阻塞队列的 poll(time) ;如果没有则一直等待调用 take()
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
} } ``` 总结:通过 exexute 方法执行一个任务,如果当前线程数(Worker)小于核心线程数就会直接创建 Worker 并把当前 task 作为 Worker 的 firstTask ,然后 start Worker; 如果当前线程大于等于核心线程就会尝试添加到 workQueue 中,如果添加失败,当前线程数小于最大线程数,就会创建 worker 把当前 task 作为 worker 的 firstTask,调用 start Worker; Worker 运行的逻辑是:启动一个循环,先检查当前 firstTask 是否为 null 如果不为 null 优先执行并置为 null,当 firstTask 为 null 调用 getTask 获取新任务,如果 getTask 返回 null 退出循环,当前线程也就会退出;getTask 方法如果当前线程数大于核心线程数会调用 workQueue.poll(timeout) 方法固定时间内阻塞等待获取task ,如果小于等于核心线程数调用 take 方法无限阻塞等待获取 task
自定义线程池
自定义线程池,各个参数的意义
可以通过 thread.setUncaughtExceptionHandler 设置线程的异常捕获
如果线程设置了异常捕获,线程内发生的异常就不会导致 app 崩溃。
线程池中线程的增长策略
当用户使用线程池创建一个任务,
- 当先存活线程数是否小于核心线程数,如果小于核心线程数则直接创建线程(即使这时候核心线程处于空闲状态也会直接创建新线程),如果没有->
- 等待队列是否已经满了,如果没有满则添加到队列中,如果满了->
- 当前线程数是否小于最大线程数,如果小于最大线程数则直接创建新线程执行(虽然这个任务添加时间比较晚但是执行时间缺比等待队列中的任务要早),如果没有->
- 触发拒绝策略
易错点
- 队列满+核心线程数满才创建线程,并不是核心队列满就创建新线程,也就是说如果队列是无限制的,那么就永远不会开启非核心线程
- 如果核心线程满了,队列未满,刚执行完成的非核心线程会直接从队列获取任务,而不是销毁
- 如果核心线程都在+队列满,新来的任务会立刻开启非核心线程执行,并不会开启线程先执行队列里的任务。
- 核心线程并不是固定的几个线程
线程安全三要素
- 有序性
- 可见性
- 原子性
volatile
- 线程可见性
- 防止指令重排序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class Test {
public static boolean flag = true;
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
System.out.println("start");
while (flag){
}
System.out.println("end");
}).start();
Thread.sleep(100);
flag = false;
}
}
// 只会打印 start
class Test {
public static volatile boolean flag = true;
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
System.out.println("start");
while (flag){
}
System.out.println("end");
}).start();
Thread.sleep(100);
flag = false;
}
}
//打印 start end
class Test {
public static boolean flag = true;
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
System.out.println("start");
while (flag){
System.out.println("doing");
}
System.out.println("end");
}).start();
Thread.sleep(100);
flag = false;
}
}
//打印 start doing... end
这是一个经典《线程可见性》案例,volatile对结果有影响,但是具体是如何影响还不确定,因为当添加《System.out.println(“doing”);》 “end” 也同样被打印出来
为什么双重校验单例模式(DCL)需要添加 volatile 关键字
Object o = new Object() 这行代码具体都干了什么
1
2
3
4
new #2 <java/lang/Object>
dup
invokespecial #1 <java/lang/Object.<init>>
astore_0
- 在堆内存中开辟空间
- 执行构造方法
- 对 o 进行赋值
在不加 volatile 的情况下,JVM 可能会对代码进行指令重排,重排后的执行顺序为
- 在堆内存中开辟空间
- 对 o 进行赋值
- 执行构造方法
在多线程情况下,如果第二步执行完,CUP资原发生切换,在另一线程使用的时候就会产生《还没有执行构造方法,就使用对象的情况》
乐观锁和悲观锁
-
悲观锁
总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞直到它拿到锁(共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程)。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。Java中synchronized和ReentrantLock等独占锁就是悲观锁思想的实现。 -
乐观锁
总是假设最好的情况,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号机制和CAS算法实现。乐观锁适用于多读的应用类型,这样可以提高吞吐量。在Java中java.util.concurrent.atomic包下面的原子变量类就是使用了乐观锁的一种实现方式CAS实现的。
乐观锁一般会使用版本号机制或CAS算法实现。
两种锁的使用场景
从上面对两种锁的介绍,我们知道两种锁各有优缺点,不可认为一种好于另一种,像乐观锁适用于写比较少的情况下(多读场景),即冲突真的很少发生的时候,这样可以省去了锁的开销,加大了系统的整个吞吐量。但如果是多写的情况,一般会经常产生冲突,这就会导致上层应用会不断的进行retry,这样反倒是降低了性能,所以一般多写的场景下用悲观锁就比较合适。
版本号机制
一般是在数据表中加上一个数据版本号version字段,表示数据被修改的次数,当数据被修改时,version值会加一。当线程A要更新数据值时,在读取数据的同时也会读取version值,在提交更新时,若刚才读取到的version值为当前数据库中的version值相等时才更新,否则重试更新操作,直到更新成功。
举一个简单的例子:
假设数据库中帐户信息表中有一个 version 字段,当前值为 1 ;而当前帐户余额字段( balance )为 $100 。当需要对账户信息表进行更新的时候,需要首先读取version字段。
操作员 A 此时将其读出( version=1 ),并从其帐户余额中扣除 $50( $100-$50 )。 在操作员 A 操作的过程中,操作员B 也读入此用户信息( version=1 ),并从其帐户余额中扣除 $20 ( $100-$20 )。 操作员 A 完成了修改工作,提交更新之前会先看数据库的版本和自己读取到的版本是否一致,一致的话,就会将数据版本号加1( version=2 ),连同帐户扣除后余额( balance=$50 ),提交至数据库更新,此时由于提交数据版本大于数据库记录当前版本,数据被更新,数据库记录 version 更新为 2 。 操作员 B 完成了操作,提交更新之前会先看数据库的版本和自己读取到的版本是否一致,但此时比对数据库记录版本时发现,操作员 B 提交的数据版本号为 2 ,而自己读取到的版本号为1 ,不满足 “ 当前最后更新的version与操作员第一次读取的版本号相等 “ 的乐观锁策略,因此,操作员 B 的提交被驳回。 这样,就避免了操作员 B 用基于 version=1 的旧数据修改的结果覆盖操作员A 的操作结果的可能。
CAS算法
即compare and swap(比较与交换),是一种有名的无锁算法。无锁编程,即不使用锁的情况下实现多线程之间的变量同步,也就是在没有线程被阻塞的情况下实现变量的同步,所以也叫非阻塞同步(Non-blocking Synchronization)。CAS算法涉及到三个操作数
需要读写的内存值 V 进行比较的值 A 拟写入的新值 B 当且仅当 V 的值等于 A时,CAS通过原子方式用新值B来更新V的值,否则不会执行任何操作(比较和替换是一个原子操作)。一般情况下是一个自旋操作,即不断的重试。
乐观锁的缺点
-
ABA 问题
如果一个变量V初次读取的时候是A值,并且在准备赋值的时候检查到它仍然是A值,那我们就能说明它的值没有被其他线程修改过了吗?很明显是不能的,因为在这段时间它的值可能被改为其他值,然后又改回A,那CAS操作就会误认为它从来没有被修改过。这个问题被称为CAS操作的 “ABA”问题。
JDK 1.5 以后的 AtomicStampedReference 类就提供了此种能力,其中的 compareAndSet 方法就是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。 -
循环时间长开销大
自旋CAS(也就是不成功就一直循环执行直到成功)如果长时间不成功,会给CPU带来非常大的执行开销。 如果JVM能支持处理器提供的pause指令那么效率会有一定的提升,pause指令有两个作用,第一它可以延迟流水线执行指令(de-pipeline),使CPU不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起CPU流水线被清空(CPU pipeline flush),从而提高CPU的执行效率。 -
只能保证一个共享变量的原子操作
CAS 只对单个共享变量有效,当操作涉及跨多个共享变量时 CAS 无效。但是从 JDK 1.5开始,提供了AtomicReference类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行 CAS 操作.所以我们可以使用锁或者利用AtomicReference类把多个共享变量合并成一个共享变量来操作。
公平锁和非公平锁
公平锁
多个线程按照申请锁的顺序去获得锁,线程会直接进入队列去排队,永远都是队列的第一位才能得到锁。
- 优点:所有的线程都能得到资源,不会饿死在队列中。
- 缺点:吞吐量会下降很多,队列里面除了第一个线程,其他的线程都会阻塞,cpu唤醒阻塞线程的开销会很大。
非公平锁
多个线程去获取锁的时候,会直接去尝试获取,获取不到,再去进入等待队列,如果能获取到,就直接获取到锁。
- 优点:可以减少CPU唤醒线程的开销,整体的吞吐效率会高点,CPU也不必取唤醒所有线程,会减少唤起线程的数量。
- 缺点:你们可能也发现了,这样可能导致队列中间的线程一直获取不到锁或者长时间获取不到锁,导致饿死。
可重入锁
可重入就是说某个线程已经获得某个锁,可以再次获取锁而不会出现死锁。例如
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
new Thread(new Runnable() {
@Override
public void run() {
synchronized (this) {
System.out.println("第1次获取锁,这个锁是:" + this);
int index = 1;
while (true) {
synchronized (this) {
System.out.println("第" + (++index) + "次获取锁,这个锁是:" + this);
}
if (index == 10) {
break;
}
}
}
}
}).start();
为什么设计成重入锁
增加锁使用的灵活性,避免死锁。
可重入锁有
- synchronized
- ReentrantLock
排他锁(写锁)和共享锁(读锁)
ReentrantReadWriteLock
锁申请和释放策略
- 多个线程只申请读锁,都可以申请到
- 如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁。
- 如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或者读锁,则申请的线程会一直等待释放写锁。
- 要么是一个或多个线程同时有读锁,要么是一个线程有写锁,但是两者不会同时出现。
AQS (AbstractQueuedSynchronizer)
Lock 的实现者通过使用AQS提供的模板实现,AQS则实现了对同步状态的管理,以及对阻塞线程进行排队,等待通知等等一些底层的实现处理。AQS的核心也包括了这些方面:同步队列,独占式锁的获取和释放,共享锁的获取和释放以及可中断锁,超时等待锁获取这些特性的实现,而这些实际上则是AQS提供出来的模板方法;
需要实现如下方法
- tryAcquire
- tryRelease
- tryAcquireShared
- tryReleaseShared
- isHeldExclusively
重要方法
- acquire
ReentrantLock
使用 ReentrantLock 的时候一定要手动释放锁,并且加锁次数和释放次数要一样
- 显示锁
- 可重入锁
- 默认是非公平锁,可以通过参数 ReentrantLock(boolean fair) 设定为公平锁
- 悲观锁
- 可设置超时时间,防止死锁产生
Anroid为什么不允许在子线程中更新UI呢
因为 Android 中的 UI 控件不是线程安全的。如果在多线程中并发访问可能会导致 UI 控件处于不可预期的状态
ViewRootImpl.requestLayout 方法中的线程检测,是一种保护机制;
synchronized 与 Lock
AtomicXXX
利用 Unsafe 相关方法直接访问主内存 value 值
计算通过 Unsafe.getAndAddInt 方法实现
1
2
3
4
5
6
7
8
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = this.getIntVolatile(o, offset);
} while(!this.weakCompareAndSetInt(o, offset, v, v + delta));
return v;
}
weakCompareAndSetInt 方法的原子性,是由 C++ 底层来保证的
sleep与wait差异总结
1、来自不同的类:sleep是Thread的静态类方法,谁调用的谁去睡觉,即使在a线程里调用了b的sleep方法,实际上还是a去睡觉,要让b线程睡觉要在b的代码中调用sleep。
2、有没有释放锁(释放资源):sleep不出让系统资源;wait是进入线程等待池等待,出让系统资源,其他线程可以占用CPU。
3、一般wait不会加时间限制,因为如果wait线程的运行资源不够,再出来也没用,要等待其他线程调用notify/notifyAll唤醒等待池中的所有线程,才会进入就绪队列等待OS分配系统资源。sleep(milliseconds)可以用时间指定使它自动唤醒过来,如果时间不到只能调用interrupt()强行打断。
4、sleep必须捕获异常,而wait,notify和notifyAll不需要捕获异常。
Thread.sleep()方法是会让出 CUP 时间片。不是让出持有锁,Thread.sleep 和 锁无关
notify 和 notifyAll
notify 随机唤醒一个 wait , 只有被唤醒的线程,可以参与锁竞争 notifyAll 唤醒所有 wait , 所有 wait 都可以在此参与锁竞争