package com.test;import java.util.Collection;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;// LockTest1.java// 仓库class Depot { private int size; // 仓库的实际数量 private Lock lock; // 独占锁 public Depot() { this.size = 0; this.lock = new ReentrantLock(); } public void produce(int val) { lock.lock(); try { size += val; System.out.printf("%s produce(%d) --> size=%d\n", Thread.currentThread().getName(), val, size); } finally { lock.unlock(); } } public void consume(int val) { lock.lock(); try { size -= val; System.out.printf("%s consume(%d) <-- size=%d\n", Thread.currentThread().getName(), val, size); } finally { lock.unlock(); } }}; // 生产者class Producer { private Depot depot; public Producer(Depot depot) { this.depot = depot; } // 消费产品:新建一个线程向仓库中生产产品。 public void produce(final int val) { new Thread() { public void run() { depot.produce(val); } }.start(); }}// 消费者class Customer { private Depot depot; public Customer(Depot depot) { this.depot = depot; } // 消费产品:新建一个线程从仓库中消费产品。 public void consume(final int val) { new Thread() { public void run() { depot.consume(val); } }.start(); }}public class LockTest1 { public static void main(String[] args) { Depot mDepot = new Depot(); Producer mPro = new Producer(mDepot); Customer mCus = new Customer(mDepot); //4个线程,2个线程操作produce方法,2个线程操作consume方法。4个线程使用同一个锁,每次只能由一个线程执行。 mPro.produce(60); mPro.produce(120); mCus.consume(90); mCus.consume(150); }}
ReentrantLock是一个可重入的互斥锁,又被称为“独占锁”。顾名思义,ReentrantLock锁在同一个时间点只能被一个线程锁持有;而可重入的意思是,ReentrantLock锁,可以被单个线程多次获取。ReentrantLock分为“公平锁”和“非公平锁”。它们的区别体现在获取锁的机制上是否公平。“锁”是为了保护竞争资源,防止多个线程同时操作线程而出错,ReentrantLock在同一个时间点只能被一个线程获取(当某线程获取到“锁”时,其它线程就必须等待);ReentraantLock是通过一个FIFO的等待队列来管理获取该锁所有线程的。在“公平锁”的机制下,线程依次排队获取锁;而“非公平锁”在锁是可获取状态时,不管自己是不是在队列的开头都会获取锁。
Condition的作用是对锁进行更精确的控制。Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法。不同的是,Object中的wait(),notify(),notifyAll()方法是和"同步锁"(synchronized关键字)捆绑使用的;而Condition是需要与"互斥锁"/"共享锁"捆绑使用的。
Condition函数列表复制代码// 造成当前线程在接到信号或被中断之前一直处于等待状态。void await()// 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。boolean await(long time, TimeUnit unit)// 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。long awaitNanos(long nanosTimeout)// 造成当前线程在接到信号之前一直处于等待状态。void awaitUninterruptibly()// 造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。boolean awaitUntil(Date deadline)// 唤醒一个等待线程。void signal()// 唤醒所有等待线程。void signalAll()
Condition示例示例1是通过Object的wait(), notify()来演示线程的休眠/唤醒功能。示例2是通过Condition的await(), signal()来演示线程的休眠/唤醒功能。示例3是通过Condition的高级功能。示例1复制代码public class WaitTest1 { public static void main(String[] args) { ThreadA ta = new ThreadA("ta"); synchronized(ta) { // 通过synchronized(ta)获取“对象ta的同步锁” try { System.out.println(Thread.currentThread().getName()+" start ta"); ta.start(); System.out.println(Thread.currentThread().getName()+" block"); ta.wait(); // 等待 System.out.println(Thread.currentThread().getName()+" continue"); } catch (InterruptedException e) { e.printStackTrace(); } } } static class ThreadA extends Thread{ public ThreadA(String name) { super(name); } public void run() { synchronized (this) { // 通过synchronized(this)获取“当前对象的同步锁” System.out.println(Thread.currentThread().getName()+" wakup others"); notify(); // 唤醒“当前对象上的等待线程” } } }}复制代码 示例2复制代码import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;public class ConditionTest1 { private static Lock lock = new ReentrantLock(); private static Condition condition = lock.newCondition(); public static void main(String[] args) { ThreadA ta = new ThreadA("ta"); lock.lock(); // 获取锁 try { System.out.println(Thread.currentThread().getName()+" start ta"); ta.start(); System.out.println(Thread.currentThread().getName()+" block"); condition.await(); // 等待,会释放锁, System.out.println(Thread.currentThread().getName()+" continue"); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); // 释放锁 } } static class ThreadA extends Thread{ public ThreadA(String name) { super(name); } public void run() { lock.lock(); // 获取锁 try { System.out.println(Thread.currentThread().getName()+" wakup others"); condition.signal(); // 唤醒“condition所在锁上的其它线程” } finally { lock.unlock(); // 释放锁 } } }}复制代码运行结果:main start tamain blockta wakup othersmain continue通过“示例1”和“示例2”,我们知道Condition和Object的方法有一下对应关系: Object Condition 休眠 wait await唤醒个线程 notify signal唤醒所有线程 notifyAll signalAllCondition除了支持上面的功能之外,它更强大的地方在于:能够更加精细的控制多线程的休眠与唤醒。对于同一个锁,我们可以创建多个Condition,在不同的情况下使用不同的Condition。例如,假如多线程读/写同一个缓冲区:当向缓冲区中写入数据之后,唤醒"读线程";当从缓冲区读出数据之后,唤醒"写线程";并且当缓冲区满的时候,"写线程"需要等待;当缓冲区为空时,"读线程"需要等待。 如果采用Object类中的wait(), notify(), notifyAll()实现该缓冲区,当向缓冲区写入数据之后需要唤醒"读线程"时,不可能通过notify()或notifyAll()明确的指定唤醒"读线程",而只能通过notifyAll唤醒所有线程(但是notifyAll无法区分唤醒的线程是读线程,还是写线程)。 但是,通过Condition,就能明确的指定唤醒读线程。看看下面的示例3,可能对这个概念有更深刻的理解。
满的时候只能减,空的时候只能加。加和减是2个线程组。
package com.test;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;import java.util.concurrent.locks.Condition;// LockTest3.java// 仓库class Depot1 { private int capacity; // 仓库的容量 private int size; // 仓库的实际数量 private Lock lock; // 独占锁 private Condition fullCondtion; // 生产条件 private Condition emptyCondtion; // 消费条件 public Depot1(int capacity) { this.capacity = capacity; this.size = 0; this.lock = new ReentrantLock(); this.fullCondtion = lock.newCondition(); this.emptyCondtion = lock.newCondition(); } public void produce(int val) { lock.lock(); try { // left 表示“想要生产的数量”(有可能生产量太多,需多此生产) while (val > 0) { // 库存已满时,等待“消费者”消费产品。 while (size >= capacity) fullCondtion.await(); //等待,释放锁 // 获取“实际生产的数量”(即库存中新增的数量) // 如果“库存”+“想要生产的数量”>“总的容量”,则“实际增量”=“总的容量”-“当前容量”。(此时填满仓库) // 否则“实际增量”=“想要生产的数量” size += val; System.out.printf("%s produce --> val=%3d, size=%3d\n", Thread.currentThread().getName(), val, size); // 通知“消费者”可以消费了。 emptyCondtion.signal(); } } catch (InterruptedException e) { } finally { lock.unlock(); } } public void consume(int val) { lock.lock(); try { // left 表示“客户要消费数量”(有可能消费量太大,库存不够,需多此消费) while (val > 0) { // 库存为0时,等待“生产者”生产产品。 while (size <= 0) emptyCondtion.await(); // 获取“实际消费的数量”(即库存中实际减少的数量) // 如果“库存”<“客户要消费的数量”,则“实际消费量”=“库存”; // 否则,“实际消费量”=“客户要消费的数量”。 size -= val; System.out.printf("%s consume<-- val=%3d, size=%3d\n", Thread.currentThread().getName(), val, size); fullCondtion.signal(); } } catch (InterruptedException e) { } finally { lock.unlock(); } } public String toString() { return "capacity:"+capacity+", actual size:"+size; }}; // 生产者class Producer1 { private Depot1 depot; public Producer1(Depot1 depot) { this.depot = depot; } // 消费产品:新建一个线程向仓库中生产产品。 public void produce(final int val) { new Thread() { public void run() { depot.produce(val); } }.start(); }}// 消费者class Customer1 { private Depot1 depot; public Customer1(Depot1 depot) { this.depot = depot; } // 消费产品:新建一个线程从仓库中消费产品。 public void consume(final int val) { new Thread() { public void run() { depot.consume(val); } }.start(); }}public class LockTest3 { public static void main(String[] args) { Depot1 mDepot = new Depot1(100); Producer1 mPro = new Producer1(mDepot); Customer1 mCus = new Customer1(mDepot); //5个线程共用同一把锁。2个线程减3个线程加。 mPro.produce(60); mPro.produce(120); mCus.consume(90); mCus.consume(150); mPro.produce(110); }}