Java分布式应用学习笔记05多线程下的并发同步器.doc_第1页
Java分布式应用学习笔记05多线程下的并发同步器.doc_第2页
Java分布式应用学习笔记05多线程下的并发同步器.doc_第3页
Java分布式应用学习笔记05多线程下的并发同步器.doc_第4页
Java分布式应用学习笔记05多线程下的并发同步器.doc_第5页
已阅读5页,还剩7页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

Java分布式应用学习笔记05多线程下的并发同步器刘岩Email:Blog: 1. 前言JDK提供的并发包,除了上一篇提到的用于集合外,还有线程的调度、协作、调度等等功能。上篇提到过,线程之间除了竞争关系,还有协作关系。在高并发环境下有效利用Java并发包解决线程之间协作的特殊场景。在并行计算,尤其是多线程计算的结果集合并的时候都需要用到这些并发同步器。还有一种使用场景,就是跨越多台机器(实机)的多线程进行并行运算,需要将多台机器进行结果集的汇总,合并。其原理核心也是使用这些并发协作包。2. FutureTaskFutureTask是进行并行结果集合并的类,此类是Future接口的实现。在主线程中启动多个线程进行并发计算,之后再根据各个线程的执行结果进行汇总,归并,得出一个总的结果,这个多线程可以是在一台机器上,充分利用多核CPU硬件,在科研单位可能分布式集群环境一起并发计算一个大任务,每个机器相当于一个线程,执行完毕后将反馈的结果返回来进行合并后才是最终的结果。而主线程可以等待分线程的结果,也可以不等待,全凭具体业务需要而定,不过一般情况下还是要等一等分线程的结果才能往下执行的。如果不等分线程,也可以在主线程中不再理会分线程即可。举个实例,比如这时候东方不败要想练成葵花宝典,那么需要前提条件是2个,第一手中得有葵花宝典秘籍,第二就是挥刀自宫。恩,挥刀自宫这个主线程东方不败可以自己完成,夺取葵花宝典可以派别人兄弟童柏雄去干,2条线并行实施,等另一个人取得葵花宝典了,这边主线程也挥刀自宫了,行了,能练了!咱先看代码行吧package threadConcurrent.test;import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.FutureTask;/* * 分线程汇总 * author liuyan */public class FutureTaskDemo SuppressWarnings(unchecked)public static void main(String args) / 初始化一个Callable对象和FutureTask对象Callable otherPerson = new OtherPerson();/ 由此任务去执行FutureTask futureTask = new FutureTask(otherPerson);/ 使用futureTask创建一个线程Thread newhread = new Thread(futureTask);System.out.println(newhread线程现在开始启动,启动时间为: + System.nanoTime()+ 纳秒);newhread.start();System.out.println(主线程东方不败,开始执行其他任务);System.out.println(东方不败开始准备小刀,消毒.);/兄弟线程在后台的计算线程是否完成,如果未完成则等待/阻塞while (!futureTask.isDone() try Thread.sleep(500);System.out.println(东方不败:“等兄弟回来了,我就和小弟弟告别颤抖”); catch (InterruptedException e) e.printStackTrace();System.out.println(newhread线程执行完毕,此时时间为 + System.nanoTime();String result = null;try result = (String) futureTask.get(); catch (InterruptedException e) e.printStackTrace(); catch (ExecutionException e) e.printStackTrace();if(OtherPerson:经过一番厮杀取得葵花宝典.equals(result)System.out.println(兄弟,干得好,我挥刀自宫了啊!);elseSystem.out.println(还好我没自宫!否则白白牺牲了);SuppressWarnings(all)class OtherPerson implements Callable Overridepublic Object call() throws Exception / 先休息休息再拼命去!Thread.sleep(5000);String result = OtherPerson:经过一番厮杀取得葵花宝典;System.out.println(result);return result;在这个例子中主线程代表东方不败,分线程代表兄弟童柏雄,主线程派出FutureTask,把它放置于一个线程对象中,之后线程开始启动,分支线程开始工作。主线程也没闲着,继续做自己的事情,消毒,做着心理斗争等等,通过一个阻塞的死循环,等待分线程的状态,调用分线程的futureTask.isDone()方法进行判断,看看兄弟是否执行结束了,结束了通过futureTask.get()将分线程的执行结果取出来,结果出来了,主线程根据分线程的执行结果再做决定。执行后,结果大家都明了。有一点需要说明的是,有可能分线程与主线程的执行不在一台物理机器上,分线程可以使用jms、webservic、rmi甚至socket技术请求远程的类为其服务。分线程根据远程返回的结果再返回给本机器的主线程,之后再做决策。分布式计算的核心原理也是如此,当然分布式计算比这个复杂得多,笔者只是说其核心的实现原理。3. SemaphoreSemaphore是限制多线程共享资源的一个东东,多线程并发访问一个资源的时候,可以限制线程最大使用个数,其他多出来的线程,没办法,耐心等着吧。这个例子在生活中比比皆是,在火车站售票处一共开设了5个窗口,也就表示在同一时间内,火车站的工作人员最多只能为5个人服务,那么高峰时其他人呢,理想的情况下是排队等着,不理想的情况下是,等待的队列没有秩序,有的只是拳头和权势,没有办法,人家的爸爸是李刚,撞人都没事何况是排队买票了,人家说的就是王法。当然了,这个咱们看具体程序。package threadConcurrent.test;import java.util.Random;import java.util.concurrent.Semaphore;/* * 使用Semaphore,限制可以执行的线程数,空闲资源放到队列中等待 * * author liuyan */public class SemaphoreDemo public static void main(String args) Runnable limitedCall = new Runnable() / 随机生成数final Random rand = new Random();/ 限制只有3个资源是活动的,第二个参数为true则是按照标准“队列”结构先进先出final Semaphore available = new Semaphore(5, true);int count = 0;public void run() int time = rand.nextInt(10);int num = count+;try / 请求资源available.acquire();int needTime = time * 2000;System.out.println(乘客 + num + 买票需要 + needTime+ 秒. #);Thread.sleep(needTime);System.out.println(乘客 + num + 买完了 # !);/ 运行完了就释放available.release(); catch (InterruptedException intEx) intEx.printStackTrace();for (int i = 0; i 25; i+)new Thread(limitedCall).start();注释已经写得比较明确了,构建Semaphore的时候,第一个参数代表线程的执行的最大数目,第二个参数是按照队列的形式将未执行的线程放到队列中,当有线程执行完了后,按照先进先出的原则,进行线程的唤醒,执行。即便是main启动了25个线程,那么其余的线程要向执行也要等前面的线程执行完毕后才能有资格执行。要想让线程按规矩执行,首先应该先向资源池申请资源,available.acquire();就是请求资源池给个资源,如果资源池当前有空闲资源,那么线程就可以正常运行了,如果没有,没办法,排队吧啊。线程运行完毕了,要记得归还资源available.release();如果构造Semaphore的时候没指定第二个参数,或者第二个参数为false,估计您有幸能见到我之前说的李刚的儿子的现象!在此不再赘述。4. ScheduledFuture提到Quartz,大家都知道他是一个负责任务调度的开源工具,使用它可以轻易地在某一时段,某一频率执行相关业务功能。如果仅仅是简单的根据某些时间频率执行某些任务,其实到不必屠龙刀杀小鸡,使用ScheduledFuture可以轻松解决此类频率的问题,启动另一个线程来,在某一个时间频率执行代码。这个还是举个例子吧,战争年代巡视城防,赵云带一个小兵去巡视城防,赵云是将军,每5秒钟巡视一次士兵,看看士兵有没有偷懒,士兵比较累,每1秒巡视一次城防,不能睡觉。如下程序package threadConcurrent.test;import static java.util.concurrent.TimeUnit.SECONDS;import java.util.Date;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.ScheduledFuture;/* * 时间频率调度 * author liuyan */public class ScheduledFutureDemo SuppressWarnings(unchecked)public static void main(String args) / 线程池开辟2个线程final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);/ 将军final Runnable general = new Runnable() int count = 0;public void run() System.out.println(Thread.currentThread().getName() + :+ new Date() + 赵云巡视来了 + (+count);/ 士兵final Runnable soldier = new Runnable() int count = 0;public void run() System.out.println(Thread.currentThread().getName() + :+ new Date() + 士兵巡视来了 + (+count);/ 1秒钟后运行,并每隔2秒运行一次final ScheduledFuture beeperHandle1 = scheduler.scheduleAtFixedRate(soldier, 1, 1, SECONDS);/ 5秒钟后运行,并每隔2秒运行一次final ScheduledFuture beeperHandle2 = scheduler.scheduleWithFixedDelay(general, 5, 5, SECONDS);/ 30秒后结束关闭任务,并且关闭Schedulerscheduler.schedule(new Runnable() public void run() beeperHandle1.cancel(true);beeperHandle2.cancel(true);scheduler.shutdown();, 60, SECONDS);程序的注释已经明白了,在此不再赘述。5. CountDownLatch很多资料上都说CountDownLatch是倒数计数器,我觉得这种说法太过专业,其实它就是一个数数的人员。利用它,可以在多线程执行任务完毕后完成进行多线程的等待,便于等待所有的线程之后在干别的事情,这个有点类似于FutureTask,使用上不太一样。这个场景就是一个线程必须要等到其他线程执行完毕后才能往下执行,注意,这里这个线程没必要需要其他线程的执行结果,而是只有一个条件就是其他线程必须执行完毕。咱们依然做个比喻,韦小宝需要8部四十二章经才能去鹿鼎山找寻宝藏,怎么办,他有7个老婆,不能资源浪费啊,这个任务同时进行吧。咱们设计8个线程同时进行,等所有老婆都执行完毕了,每个老婆找齐了四十二章经了,好了,他可以自己去找宝藏了。等等,咱们小宝哥有7个老婆,何来8个线程,这个问题,读者不必较真,举个例子罢了,咱们给他加个老婆不就行了!代码如下package threadConcurrent.test;import java.util.concurrent.CountDownLatch;/* * 分部执行任务 * * author liuyan * */public class CountDownLatchDemo implements Runnable private int id;/ 线程之间不影响,到了终点后继续做自己的事情private CountDownLatch countDownLatch;public CountDownLatchDemo(int id, CountDownLatch countDownLatch) this.id = id;this.countDownLatch = countDownLatch;SuppressWarnings(static-access)Overridepublic void run() try System.out.println(第 + (id + 1) + 小老婆开始查找四十二章经.);Thread.currentThread().sleep(id * 1000);System.out.println(第 + (id + 1) + 本四十二章经找到);/计数器将等待数字-1countDownLatch.countDown();System.out.println(第 + (id + 1) + 小老婆继续干其他事情); catch (InterruptedException e) e.printStackTrace();public static void main(String args) CountDownLatch countDownLatch = new CountDownLatch(8);for (int i = 0; i 8; i+) new Thread(new CountDownLatchDemo(i, countDownLatch).start();try System.out.println(韦小宝等着等着8本四十二章);/ 韦小宝等着等着countDownLatch.await();/ 等待运动员到达终点System.out.println(8本四十二章经找寻完成,可以寻宝了!); catch (InterruptedException e) e.printStackTrace();主线程就当做是韦小宝吧,主线程首先开辟了线程计数器对象,之后就开辟了8个线程,派出了8个小老婆去办事,之后主线程调用countDownLatch.await()阻塞,在家里一直喝着小茶,听着小曲等着8个线程的执行完毕。咱们来看小老婆们,小老婆们就辛苦了,开始找寻经书,之后调用countDownLatch.countDown()方法通知线程计数器减去1,让等待的线程减去1。就好比说有个小老婆找到了四十二章经,用iphone发个彩信将经书的夹缝地图发给韦小宝,韦小宝收到了,恩,这个老婆能干,任务完成,我的等待目标减去1。等到等待线程为0的时候,小宝开始行动了,将手机里的地图通过游戏拼图游戏,一拼凑,大事可成,自己寻宝去!这里大家也看到了CountDownLatch与FutureTask的区别。CountDownLatch侧重的是分线程的完成个数,每次完成一个分线程,等待数目减少一个,等待线程为0的时候,主线程的就不阻塞了,开始往下走。而分线程一旦调用countDownLatch.countDown()方法,就代表分线程任务搞定,主线程就不会因为你的其他事情而不能往下走,完成任务了,小老婆们也可以去旅旅游,休息休息!而FutureTask则是注重执行结果的,主线程需要它的确切结果。所以futureTask执行的call()有返回值。6. CyclicBarrierCyclicBarrier相对于CountDownLatch来说,最大的不同是,分线程具体的执行过程受其他分线程的影响,必须每个分线程都执行完毕了,主线程才继续往下走,而分线程如果在所有分线程执行完毕后还有其他动作,ok,还你自由,不必阻塞了,往下走你的路吧。这个例子是网上的游戏玩家的例子,4个小孩玩游戏,游戏要求必须是4个小孩都得通过第一关,才能开启第二关的关口!否则其他完成第一关的人都得等着其他人完成。这个有点像我们的项目开发,分模块开发,到一定阶段将模块汇总,联调,测试,如果这时候有一个模块没完成,大家等着吧,大家都在那里静静地、盯着你、等着你。package threadConcurrent.test;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;public class GameBarrier public static void main(String args) CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() Overridepublic void run() System.out.println(所有玩家进入第二关!););for (int i = 0; i 4; i+) new Thread(new Player(i, cyclicBarrier).start();class Player implements Runnable /* * 线程之间需要交互,到一定的条件下,所有线程才能往下走 */private CyclicBarrier cyclicBarrier;private int id;public Player(int id, CyclicBarrier cyclicBarrier) this.cyclicBarrier = cyclicBarrier;this.id = id;Overridepublic void run() try System.out.println(玩家 + id + 正在玩第一关.);cyclicBarrier.await();System.out.println(玩家 + id + 进入第二关.); catch (InterruptedException e) e.printStackTrace(); catch (BrokenBarrierException e) e.printStackTrace();使用cyclicBarrier.await();方法进行等待、阻塞,当所有分线程执行完毕了,主线程开始执行,分线程的自由也解脱了,继续往下走,开始第二关。7. ExchangerExchanger是线程资源交换器,线程A与线程B在某个运行阶段需要互换资源才能完成任务。这就好比2个公司职员叶小钗和一页书。分别在不同的项目组组A和组B,两个组开发者不同的项目,在正常时候叶小钗在组A上班开发者BOSS项目,一页书在项目组B开发ESB中间件产品。而在特殊时期项目组B不需要一页书了,需要叶小钗提供技术支持,就和项目组A要叶小钗,项目组A的leader也不是吃素的,你要叶小钗,没问题,把一页书也得接我们项目组剥削几天!就这样项目组B与项目组A做了这种“交易”(交换),用完了之后,恩看程序吧package threadConcurrent.test;import java.util.concurrent.Exchanger;/* * 资源交换 * author liuyan */pu

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论