预备知识、各种同步工具
synchronized + wait/notify
概括:Java内置的最基础的线程同步机制,基于对象监视器实现。
用途:用于简单的线程互斥和等待通知机制,如传统的生产者-消费者问题。CountDownLatch
概括:一次性的事件等待器,通过倒计时控制线程执行顺序。
用途:等待多个并行任务全部完成后再继续执行,如初始化完成后启动主应用。CompletableFuture
概括:声明式的异步编程框架,支持任务链式编排和组合。
用途:优雅地编排多个异步任务,处理复杂的异步依赖关系,如并行调用多个微服务。Semaphore
概括:基于许可证的资源控制器,限制并发访问数量。
用途:控制对有限资源的并发访问,如数据库连接池、限流器。ReentrantLock + Condition
概括:功能更丰富的显式锁,提供比synchronized更灵活的控制。
用途:需要高级同步特性的场景,如可中断锁、公平锁、多个等待条件。BlockingQueue
BlockingQueue 是 Java 并发包中开箱即用的 “生产者 - 消费者” 解决方案,本质是对 ReentrantLock + Condition 的高级封装(比如 ArrayBlockingQueue 底层就是用这组工具实现的)
一、按序打印(Leetcode 1114)
给你一个类:
publicclassFoo{publicvoidfirst(){print("first");}publicvoidsecond(){print("second");}publicvoidthird(){print("third");}}三个不同的线程 A、B、C 将会共用一个 Foo 实例。
线程 A 将会调用 first() 方法
线程 B 将会调用 second() 方法
线程 C 将会调用 third() 方法
请设计修改程序,以确保 second() 方法在 first() 方法之后被执行,third() 方法在 second() 方法之后被执行。
classFoo{publicFoo(){}publicvoidfirst(RunnableprintFirst)throwsInterruptedException{printFirst.run();}publicvoidsecond(RunnableprintSecond)throwsInterruptedException{printSecond.run();}publicvoidthird(RunnableprintThird)throwsInterruptedException{printThird.run();}}方法一:忙等待
优点:简单
缺点:空转CPU,可能会死循环
classFoo{privatevolatileintx=0;publicFoo(){}publicvoidfirst(RunnableprintFirst)throwsInterruptedException{printFirst.run();x++;}publicvoidsecond(RunnableprintSecond)throwsInterruptedException{while(x!=1){}printSecond.run();x++;}publicvoidthird(RunnableprintThird)throwsInterruptedException{while(x!=2){}printThird.run();}}1)这里x++因为不是原子性操作,如果有多个线程同时调用first/second/third 会出现问题,
2)while循环里面可以加Thread.yield() 减少自旋的 CPU 消耗
classFoo{privateAtomicIntegerx=newAtomicInteger(0);publicFoo(){}publicvoidfirst(RunnableprintFirst)throwsInterruptedException{printFirst.run();x.incrementAndGet();}publicvoidsecond(RunnableprintSecond)throwsInterruptedException{while(x.get()!=1){Thread.yield();}printSecond.run();x.incrementAndGet();}publicvoidthird(RunnableprintThird)throwsInterruptedException{while(x.get()!=2){Thread.yield();}printThird.run();}}方法二:synchronized + wait/notify
classFoo{privateintx=0;privatefinalObjectlock=newObject();publicFoo(){}publicvoidfirst(RunnableprintFirst)throwsInterruptedException{synchronized(lock){printFirst.run();x++;lock.notifyAll();}}publicvoidsecond(RunnableprintSecond)throwsInterruptedException{synchronized(lock){while(x!=1){lock.wait();}printSecond.run();x++;lock.notifyAll();}}publicvoidthird(RunnableprintThird)throwsInterruptedException{synchronized(lock){while(x!=2){lock.wait();}printThird.run();lock.notifyAll();}}}方法三:CountDownLatch
classFoo{privatefinalCountDownLatchlatch1=newCountDownLatch(1);privatefinalCountDownLatchlatch2=newCountDownLatch(1);publicFoo(){}publicvoidfirst(RunnableprintFirst)throwsInterruptedException{printFirst.run();latch1.countDown();}publicvoidsecond(RunnableprintSecond)throwsInterruptedException{latch1.await();printSecond.run();latch2.countDown();}publicvoidthird(RunnableprintThird)throwsInterruptedException{latch2.await();printThird.run();}}方法四:Semaphore
classFoo{privatefinalSemaphoresem2=newSemaphore(0);privatefinalSemaphoresem3=newSemaphore(0);publicFoo(){}publicvoidfirst(RunnableprintFirst)throwsInterruptedException{printFirst.run();sem2.release();}publicvoidsecond(RunnableprintSecond)throwsInterruptedException{sem2.acquire();printSecond.run();sem3.release();}publicvoidthird(RunnableprintThird)throwsInterruptedException{sem3.acquire();printThird.run();}}方法五:ReentrantLock+Condition
classFoo{privatefinalLocklock=newReentrantLock();privatefinalConditioncond1=lock.newCondition();privatefinalConditioncond2=lock.newCondition();privateintflag=0;publicFoo(){}publicvoidfirst(RunnableprintFirst)throwsInterruptedException{lock.lock();try{printFirst.run();flag++;cond1.signal();}finally{lock.unlock();}}publicvoidsecond(RunnableprintSecond)throwsInterruptedException{lock.lock();try{while(flag!=1){cond1.await();}printSecond.run();flag++;cond2.signal();}finally{lock.unlock();}}publicvoidthird(RunnableprintThird)throwsInterruptedException{lock.lock();try{while(flag!=2){cond2.await();}printThird.run();}finally{lock.unlock();}}}方法六:CompletableFuture
classFoo{privateCompletableFuture<Void>cf1=newCompletableFuture<>();privateCompletableFuture<Void>cf2=newCompletableFuture<>();publicvoidfirst(RunnableprintFirst){printFirst.run();cf1.complete(null);// 信号:first完成了}publicvoidsecond(RunnableprintSecond){cf1.join();// 阻塞等待cf1完成printSecond.run();cf2.complete(null);// 信号:second完成了}publicvoidthird(RunnableprintThird){cf2.join();// 阻塞等待cf2完成printThird.run();}}二、交替打印(Leetcode 1115)
给你一个类:
classFooBar{publicvoidfoo(){for(inti=0;i<n;i++){print("foo");}}publicvoidbar(){for(inti=0;i<n;i++){print("bar");}}}两个不同的线程将会共用一个 FooBar 实例:
线程 A 将会调用 foo() 方法,而
线程 B 将会调用 bar() 方法
请设计修改程序,以确保 “foobar” 被输出 n 次。
方法一:忙等待
classFooBar{privateintn;publicFooBar(intn){this.n=n;}privatevolatilebooleanstate=true;publicvoidfoo(RunnableprintFoo)throwsInterruptedException{for(inti=0;i<n;){if(state){printFoo.run();i++;state=false;}else{Thread.yield();}}}publicvoidbar(RunnableprintBar)throwsInterruptedException{for(inti=0;i<n;){if(!state){printBar.run();i++;state=true;}else{Thread.yield();}}}}方法二:synchronized + wait/notify
classFooBar{privateintn;publicFooBar(intn){this.n=n;}privatebooleanstate=true;privatefinalObjectlock=newObject();publicvoidfoo(RunnableprintFoo)throwsInterruptedException{for(inti=0;i<n;i++){synchronized(lock){while(!state){lock.wait();}printFoo.run();state=false;lock.notifyAll();}}}publicvoidbar(RunnableprintBar)throwsInterruptedException{for(inti=0;i<n;i++){synchronized(lock){while(state){lock.wait();}printBar.run();state=true;lock.notifyAll();}}}}方法三:Semaphore
classFooBar{privateintn;publicFooBar(intn){this.n=n;}privatefinalSemaphoresem1=newSemaphore(1);privatefinalSemaphoresem2=newSemaphore(0);publicvoidfoo(RunnableprintFoo)throwsInterruptedException{for(inti=0;i<n;i++){sem1.acquire();printFoo.run();sem2.release();}}publicvoidbar(RunnableprintBar)throwsInterruptedException{for(inti=0;i<n;i++){sem2.acquire();printBar.run();sem1.release();}}}方法四:ReentrantLock+Condition
classFooBar{privateintn;publicFooBar(intn){this.n=n;}privatefinalLocklock=newReentrantLock();privatefinalConditioncond=lock.newCondition();privatebooleanflag=true;publicvoidfoo(RunnableprintFoo)throwsInterruptedException{for(inti=0;i<n;i++){lock.lock();try{while(!flag){cond.await();}printFoo.run();flag=false;cond.signal();}finally{lock.unlock();}}}publicvoidbar(RunnableprintBar)throwsInterruptedException{for(inti=0;i<n;i++){lock.lock();try{while(flag){cond.await();}printBar.run();flag=true;cond.signal();}finally{lock.unlock();}}}}