Java 并发:线程协作 wait/notify/sleep/yield/join
Last modified: 2020-09-13
线程协作方法
Object#wait
: 等待某个条件而使当前线程发生阻塞,条件由另外线程提供,释放对象锁,必须与 synchronized 一起使用(使用前提是必须 “拥有” 对象锁)Object#notify
,Object#notifyAll
: 通知由于 wait 阻塞的特定或者全部线程条件已达成,注意,做两个方法不会主动释放锁,这意味着调用 wait 的线程要继续执行还需要等获取到锁后才会继续执行- Thread#sleep: 线程休眠阻塞固定时间,让出 cpu,不释放对象锁
- Thread#yield: 线程主动让出 cpu 给其他线程使用,无固定时间,也不保证马上就让出,因此在少量场景下使用该方法,而主要用于 debug 和 test
- Thread#join: 当前线程等待另外一个线程执行完后再执行
举例
生产者消费者模型,生产者 Producer 不停产生商品 Goods,消费者 Customer 不停消费,且中间有一个商店 Store 用于存放商品。
商店本质上是一个队列,更好的方法是使用 BlockingQueue
wait/notify:
假设有一个生产者和三个消费者:
public static void execute() {
Store store = new Store(new Producer());
for (int i = 0; i < 3; i++) {
store.welcome(new Customer(i));
}
}
static class Goods {
int id;
public Goods(int id) {
this.id = id;
}
@Override
public String toString() {
return "Goods(" + id + ")";
}
}
static class Store {
List<Goods> goodList = new LinkedList<>();
Producer producer;
int MAX_GOODS_SIZE = 5;
ExecutorService service = Executors.newCachedThreadPool();
boolean closed;
public Store(Producer producer) {
this.producer = producer;
this.producer.store = this;
this.service.execute(producer);
}
public void welcome(Customer customer) {
customer.store = this;
service.execute(customer);
}
public boolean isClosed() {
return closed;
}
public void close() {
closed = true;
service.shutdownNow();
}
public void add(Goods goods) {
goodList.add(goods);
}
public boolean isEmpty() {
return goodList.size() == 0;
}
public boolean isFull() {
return goodList.size() == MAX_GOODS_SIZE;
}
@Override
public String toString() {
return "Store goods[" + goodList.size() + "]";
}
public Goods get() {
return goodList.remove(0);
}
}
static class Producer implements Runnable {
Store store;
int count;
@Override
public void run() {
while (true) {
synchronized (store) {
if (!store.isFull()) {
Goods goods = new Goods(count++);
store.add(goods);
System.out.println("Producer add " + goods + " | " + store);
store.notifyAll();
} else {
try {
store.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (count == 20) {
System.out.println("Producer finish today.");
store.close();
return;
}
} // sync
if ((count & 1) == 0) {
Thread.yield();
}
}
}
}
static class Customer implements Runnable {
Store store;
int id;
public Customer(int id) {
this.id = id;
}
@Override
public String toString() {
return "Customer(" + id + ")";
}
@Override
public void run() {
while (true) {
synchronized (store) {
if (!store.isEmpty()) {
Goods goods = store.get();
System.out.println(this + " consume " + goods + " | " + store);
store.notifyAll();
} else if (store.isClosed()) {
System.out.println("Store closed. " + this + " leave.");
return;
} else {
System.out.println(this + " wait.");
try {
store.wait();
} catch (InterruptedException e) {
e.printStackTrace();
return;
}
}
} // sync
Thread.yield();
}
}
}
- 使用 yeild 使线程切换更明显
- 生产者生产完最后一件商品通知完消费者后直接 shutdownNow 线程池
- shutdownNow 和 shutdown 不同,shutdownNow 会对线程进行 interrupt,而 shutdown 会等待线程全部完成。
- 消费者没有设置和生产者一样的退出机制。因此只能通过线程池的 interrupt 来退出
- 消费者收到线程池的 interrupt 后
- 阻塞的会通过异常捕获退出
- 非阻塞的消费完成后通过下一个循环条件退出
对应的显示锁实现为 ReentrantLook$Condition,对应方法为 Condition#await/Condition#signal/Condition#signalAll
wiat 补充
- wait(0) 等于 wait()
- wait(1000),1000 毫秒后自动被 notify,也仍可以通过 notify 提前唤醒
wait 信号量错失
避免像这样的代码:
// run():
while(condition) {
synchronized(obj) {
obj.wait();
}
}
多线程的情况下,当有线程 a 已经被 wait 阻塞的时候,可能有其他线程 b 处于 while(condition) 的位置
此时如果有线程 c 进行了 notify 并改变了 condition 为 false,那么线程 b 会错过 condition 的改变,继续进入 wait 导致的阻塞。同时会对其他想获取锁的线程造成死锁。
合适的写法:
// run():
synchronized(obj) {
while(condition) {
obj.wait();
}
}
Thread#join
当前线程等待另外一个线程执行完后再执行,异步变同步
join 的实现其实是使用了 Object#wait
以下为 Java7 的代码
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
- join() 等于 join(0)
- 假设当前线程调用了 start 的线程 t.join(),那么调用关系就会是:currentThread.run() -> t.join() -> synchronizd(this) { while (isAlive()) { this.wait() } }
代码就会像这样:
void run() { // currentThread
......
synchronized(t) {
while(t.isAlive()) {
t.wait()
}
}
......
}
无疑,currentThread 阻塞,释放了 t 的对象锁,等待其他线程获得 改变条件后再通知 currentThread 继续执行
那么问题来了,t 在 run 执行的过程中并不会必须有 notify 的代码,那 t 在 run 结束时,currentThread 能否从 wait 的阻塞恢复运行呢?
测试代码:
// main.java
T1 t1 = new T1();
T2 t2 = new T2(t1);
t1.start();
t2.start();
Thread.sleep(3000);
System.out.println("main end");
static class T1 extends Thread {
@Override
public void run() {
System.out.println("t1 start");
for (int i = 0; i <= 100000000; i++) {
if (i == 0 || i == 1000
|| i == 10000 || i == 100000
|| i == 1000000 || i == 10000000
|| i == 100000000) {
System.out.println("t1 is running: " + i);
}
}
System.out.println("t1 end");
}
}
static class T2 extends Thread {
T1 t1;
public T2(T1 t1) {
this.t1 = t1;
}
@Override
public void run() {
System.out.println("t2 start");
synchronized (t1) {
try {
System.out.println("t2 call t1.wait in synchronized");
t1.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("t2 end");
}
}
目的很明确,就是想知道 t2.run() 里面调用了 t1.wait() 阻塞之后,t1.run() 之后是否会让 t2 恢复运行还是继续阻塞,运行结果:
//output:
t1 start
t1 is running: 0
t1 is running: 1000
t2 start
t2 call t1.wait in synchronized
t1 is running: 10000
t1 is running: 100000
t1 is running: 1000000
t1 is running: 10000000
t1 is running: 100000000
t1 end
t2 end
main end
t2 恢复了!
注意:当前无法从 java 层里面找到任何 notify 导致其恢复的代码,只是可以明确,t.join() 调用后,t 如果已经完成,t.join 会返回,也就是上面测试代码的现象。具体实现也许在 native 层