2019独角兽企业重金招聘Python工程师标准>>>
- CountDownLatch
CountDownLatch允许一个或多个线程等待其他线程完成操作。
CountDownLatch的构造函数接受一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N。
当我们调用CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await方法会阻塞当前线程,直到N编程零。此外还有一个带指定时间的await方法,这个方法等待指定时间后,就不会再阻塞当前线程。
package test;public class ThreadJoinTest {public static void main(String[] args) throws InterruptedException {Thread parser1 = new Thread(new Runnable(){public void run() {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("parser1 finish");}});Thread parser2 = new Thread(new Runnable(){public void run() {System.out.println("parser2 finish");}});parser1.start();parser2.start();parser1.join();parser2.join();System.out.println("all finished");}
}
---------
parser2 finish
parser1 finish
all finishedpackage test;import java.util.concurrent.CountDownLatch;public class CountDownLatchTest {static CountDownLatch c = new CountDownLatch(2);public static void main(String[] args) throws InterruptedException {new Thread(new Runnable(){public void run() {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("parser1 finish");c.countDown();}}).start();new Thread(new Runnable(){public void run() {System.out.println("parser2 finish");c.countDown();}}).start();c.await();System.out.println("all finished");}
}
---------
parser2 finish
parser1 finish
all finished
- CyclicBarrier
CyclicBarrier让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,所有被屏障拦截的线程才会继续运行。
CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier已经到达了屏障,然后当前线程被阻塞。
CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties , Runnable barrierAction),用于在线程到达屏障是,优先执行barrierAction,方便处理更复杂的业务。package test;import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier;public class CyclicBarrierTest {static CyclicBarrier c = new CyclicBarrier(2);public static void main(String args[]) throws InterruptedException, BrokenBarrierException{new Thread(new Runnable(){public void run() {System.out.println("begin");try {Thread.sleep(1000);c.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}System.out.println("end");}}).start();System.out.println("current thread waiting..");c.await();System.out.println("all arrived");} }------- current thread waiting.. begin end all arrived
package test;import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier;public class CyclicBarrierTest2 {static CyclicBarrier c = new CyclicBarrier(2,new A());public static void main(String args[]) throws InterruptedException, BrokenBarrierException{new Thread(new Runnable(){public void run() {System.out.println("begin");try {Thread.sleep(1000);c.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}System.out.println("end");}}).start();System.out.println("current thread waiting..");c.await();System.out.println("all arrived");}static class A implements Runnable{public void run() {System.out.println("Thread A Running..");}} } ---------- current thread waiting.. begin Thread A Running.. end all arrived
- CyclicBarrier和CountDownLatch的区别
CountDownLatch的计数只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,所以CyclicBarrier能处理更复杂的业务场景。
CyclicBarrier还提供其他的方法,如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量,isBroken()方法用来了解阻塞的线程是否被中断。 - Semaphore
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,他通过协调各个线程,以保证合理的使用公共资源。
Semaphore可以用于做流量控制,特别是公共资源有限的应用场景,比如数据库连接。package test;import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore;public class SemaphoreTest {private static final int THREAD_COUNT = 10;private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);private static Semaphore s = new Semaphore(5);public static void main(String[] args) throws InterruptedException {while(true){threadPool.execute(new Runnable(){public void run() {try {s.acquire();System.out.println(Thread.currentThread().getName()+": save data...");Thread.sleep(5000);s.release();} catch (InterruptedException e) {e.printStackTrace();}}});System.out.println("availablePermits: "+s.availablePermits());Thread.sleep(500);}//threadPool.shutdown();} } ---------------------------- availablePermits: 5 pool-1-thread-1: save data... availablePermits: 4 pool-1-thread-2: save data... availablePermits: 3 pool-1-thread-3: save data... availablePermits: 2 pool-1-thread-4: save data... availablePermits: 1 pool-1-thread-5: save data... availablePermits: 0 availablePermits: 0 availablePermits: 0 availablePermits: 0 availablePermits: 0 pool-1-thread-6: save data... availablePermits: 0 pool-1-thread-7: save data... availablePermits: 0 pool-1-thread-8: save data... availablePermits: 0 pool-1-thread-9: save data... availablePermits: 0 pool-1-thread-10: save data... availablePermits: 0 availablePermits: 0 availablePermits: 0 availablePermits: 0 availablePermits: 0 availablePermits: 0 pool-1-thread-1: save data... availablePermits: 0 pool-1-thread-7: save data... availablePermits: 0 pool-1-thread-2: save data... availablePermits: 0 pool-1-thread-9: save data... availablePermits: 0 pool-1-thread-3: save data... availablePermits: 0 availablePermits: 0 availablePermits: 0
在代码中,虽然有10个线程在执行,但是只允许5个并发执行。