java的concurrent用法详解.docx_第1页
java的concurrent用法详解.docx_第2页
java的concurrent用法详解.docx_第3页
java的concurrent用法详解.docx_第4页
java的concurrent用法详解.docx_第5页
已阅读5页,还剩17页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

我们都知道,在JDK1.5之前,Java中要进行业务并发时,通常需要有程序员独立完成代码实现,当然也有一些开源的框架提供了这些功能,但是这些依然没有JDK自带的功能使用起来方便。而当针对高质量Java多线程并发程序设计时,为防止死蹦等现象的出现,比如使用java之前的wait()、notify()和synchronized等,每每需要考虑性能、死锁、公平性、资源管理以及如何避免线程安全性方面带来的危害等诸多因素,往往会采用一些较为复杂的安全策略,加重了程序员的开发负担.万幸的是,在JDK1.5出现之后,Sun大神(Doug Lea)终于为我们这些可怜的小程序员推出了java.util.concurrent工具包以简化并发完成。开发者们借助于此,将有效的减少竞争条件(race conditions)和死锁线程。concurrent包很好的解决了这些问题,为我们提供了更实用的并发程序模型。Executor :具体Runnable任务的执行者。ExecutorService :一个线程池管理者,其实现类有多种,我会介绍一部分。我们能把Runnable,Callable提交到池中让其调度。Semaphore :一个计数信号量ReentrantLock :一个可重入的互斥锁定 Lock,功能类似synchronized,但要强大的多。Future :是与Runnable,Callable进行交互的接口,比如一个线程执行结束后取返回的结果等等,还提供了cancel终止线程。BlockingQueue :阻塞队列。CompletionService : ExecutorService的扩展,可以获得线程执行结果的CountDownLatch :一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。CyclicBarrier :一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点Future :Future 表示异步计算的结果。ScheduledExecutorService :一个 ExecutorService,可安排在给定的延迟后运行或定期执行的命令。接下来逐一介绍Executors主要方法说明newFixedThreadPool(固定大小线程池)创建一个可重用固定线程集合的线程池,以共享的无界队列方式来运行这些线程(只有要请求的过来,就会在一个队列里等待执行)。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。newCachedThreadPool(无界线程池,可以进行自动线程回收)创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能。调用 execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源。注意,可以使用 ThreadPoolExecutor 构造方法创建具有类似属性但细节不同(例如超时参数)的线程池。newSingleThreadExecutor(单个后台线程)创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。(注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程,那么如果需要,一个新线程将代替它执行后续的任务)。可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。与其他等效的 newFixedThreadPool(1) 不同,可保证无需重新配置此方法所返回的执行程序即可使用其他的线程。这些方法返回的都是ExecutorService对象,这个对象可以理解为就是一个线程池。这个线程池的功能还是比较完善的。可以提交任务submit()可以结束线程池shutdown()。01importjava.util.concurrent.ExecutorService;02importjava.util.concurrent.Executors;03publicclassMyExecutor extendsThread 04privateintindex;05publicMyExecutor(inti)06this.index=i;0708publicvoidrun()09try10System.out.println(+this.index+ start.);11Thread.sleep(int)(Math.random()*1000);12System.out.println(+this.index+ end.);1314catch(Exception e)15e.printStackTrace();161718publicstaticvoidmain(String args)19ExecutorService service=Executors.newFixedThreadPool(4);20for(inti=0;i0)14System.out.println(顾客+this.id+进入厕所,有空位);1516else17System.out.println(顾客+this.id+进入厕所,没空位,排队);1819position.acquire();20System.out.println(顾客+this.id+获得坑位);21Thread.sleep(int)(Math.random()*1000);22System.out.println(顾客+this.id+使用完毕);23position.release();2425catch(Exception e)26e.printStackTrace();272829publicstaticvoidmain(String args)30ExecutorService list=Executors.newCachedThreadPool();31Semaphore position=newSemaphore(2);32for(inti=0;i10;i+)33list.submit(newMySemaphore(i+1,position);3435list.shutdown();36position.acquireUninterruptibly(2);37System.out.println(使用完毕,需要清扫了);38position.release(2);3940ReentrantLock一个可重入的互斥锁定 Lock,它具有与使用 synchronized 方法和语句所访问的隐式监视器锁定相同的一些基本行为和语义,但功能更强大。ReentrantLock 将由最近成功获得锁定,并且还没有释放该锁定的线程所拥有。当锁定没有被另一个线程所拥有时,调用 lock 的线程将成功获取该锁定并返回。如果当前线程已经拥有该锁定,此方法将立即返回。可以使用 isHeldByCurrentThread() 和 getHoldCount() 方法来检查此情况是否发生。此类的构造方法接受一个可选的公平参数。当设置为 true时,在多个线程的争用下,这些锁定倾向于将访问权授予等待时间最长的线程。否则此锁定将无法保证任何特定访问顺序。与采用默认设置(使用不公平锁定)相比,使用公平锁定的程序在许多线程访问时表现为很低的总体吞吐量(即速度很慢,常常极其慢),但是在获得锁定和保证锁定分配的均衡性时差异较小。不过要注意的是,公平锁定不能保证线程调度的公平性。因此,使用公平锁定的众多线程中的一员可能获得多倍的成功机会,这种情况发生在其他活动线程没有被处理并且目前并未持有锁定时。还要注意的是,未定时的 tryLock 方法并没有使用公平设置。因为即使其他线程正在等待,只要该锁定是可用的,此方法就可以获得成功。建议总是 立即实践,使用 try 块来调用 lock,在之前/之后的构造中,最典型的代码如下:01classX 02privatefinalReentrantLock lock = newReentrantLock();03/ .04publicvoidm() 05lock.lock(); / block until condition holds06try07/ . method body08 finally09lock.unlock()101112我的例子:01importjava.util.concurrent.ExecutorService;02importjava.util.concurrent.Executors;03importjava.util.concurrent.locks.ReentrantLock;04publicclassMyReentrantLock extendsThread05TestReentrantLock lock;06privateintid;07publicMyReentrantLock(inti,TestReentrantLock test)08this.id=i;09this.lock=test;1011publicvoidrun()12lock.print(id);1314publicstaticvoidmain(String args)15ExecutorService service=Executors.newCachedThreadPool();16TestReentrantLock lock=newTestReentrantLock();17for(inti=0;i10;i+)18service.submit(newMyReentrantLock(i,lock);1920service.shutdown();212223classTestReentrantLock24privateReentrantLock lock=newReentrantLock();25publicvoidprint(intstr)26try27lock.lock();28System.out.println(str+获得);29Thread.sleep(int)(Math.random()*1000);3031catch(Exception e)32e.printStackTrace();3334finally35System.out.println(str+释放);36lock.unlock();373839BlockingQueue支持两个附加操作的 Queue,这两个操作是:检索元素时等待队列变为非空,以及存储元素时等待空间变得可用。BlockingQueue 不接受 null 元素。试图 add、put 或 offer 一个 null 元素时,某些实现会抛出 NullPointerException。null 被用作指示 poll 操作失败的警戒值。BlockingQueue 可以是限定容量的。它在任意给定时间都可以有一个 remainingCapacity,超出此容量,便无法无阻塞地 put 额外的元素。没有任何内部容量约束的 BlockingQueue 总是报告 Integer.MAX_VALUE 的剩余容量。BlockingQueue 实现主要用于生产者-使用者队列,但它另外还支持 Collection 接口。因此,举例来说,使用 remove(x) 从队列中移除任意一个元素是有可能的。然而,这种操作通常不 会有效执行,只能有计划地偶尔使用,比如在取消排队信息时。BlockingQueue 实现是线程安全的。所有排队方法都可以使用内部锁定或其他形式的并发控制来自动达到它们的目的。然而,大量的 Collection 操作(addAll、containsAll、retainAll 和 removeAll)没有 必要自动执行,除非在实现中特别说明。因此,举例来说,在只添加了 c 中的一些元素后,addAll(c) 有可能失败(抛出一个异常)。BlockingQueue 实质上不 支持使用任何一种“close”或“shutdown”操作来指示不再添加任何项。这种功能的需求和使用有依赖于实现的倾向。例如,一种常用的策略是:对于生产者,插入特殊的 end-of-stream 或 poison 对象,并根据使用者获取这些对象的时间来对它们进行解释。下面的例子演示了这个阻塞队列的基本功能。01importjava.util.concurrent.BlockingQueue;02importjava.util.concurrent.ExecutorService;03importjava.util.concurrent.Executors;04importjava.util.concurrent.LinkedBlockingQueue;05publicclassMyBlockingQueue extendsThread 06publicstaticBlockingQueue queue = newLinkedBlockingQueue(3);07privateintindex;08publicMyBlockingQueue(inti) 09this.index = i;1011publicvoidrun() 12try13queue.put(String.valueOf(this.index);14System.out.println(+ this.index + in queue!);15 catch(Exception e) 16e.printStackTrace();171819publicstaticvoidmain(String args) 20ExecutorService service = Executors.newCachedThreadPool();21for(inti = 0; i 10; i+) 22service.submit(newMyBlockingQueue(i);2324Thread thread = newThread() 25publicvoidrun() 26try27while(true) 28Thread.sleep(int) (Math.random() * 1000);29if(MyBlockingQueue.queue.isEmpty()30break;31String str = MyBlockingQueue.queue.take();32System.out.println(str + has take!);3334 catch(Exception e) 35e.printStackTrace();363738;39service.submit(thread);40service.shutdown();4142-执行结果-0 in queue!1 in queue!2 in queue!3 in queue!0 has take!4 in queue!1 has take!6 in queue!2 has take!7 in queue!3 has take!8 in queue!4 has take!5 in queue!6 has take!9 in queue!7 has take!8 has take!5 has take!9 has take!-CompletionService将生产新的异步任务与使用已完成任务的结果分离开来的服务。生产者 submit 执行的任务。使用者 take 已完成的任务,并按照完成这些任务的顺序处理它们的结果。例如,CompletionService 可以用来管理异步 IO ,执行读操作的任务作为程序或系统的一部分提交,然后,当完成读操作时,会在程序的不同部分执行其他操作,执行操作的顺序可能与所请求的顺序不同。通常,CompletionService 依赖于一个单独的 Executor 来实际执行任务,在这种情况下,CompletionService 只管理一个内部完成队列。ExecutorCompletionService 类提供了此方法的一个实现。01importjava.util.concurrent.Callable;02importjava.util.concurrent.CompletionService;03importjava.util.concurrent.ExecutorCompletionService;04importjava.util.concurrent.ExecutorService;05importjava.util.concurrent.Executors;06publicclassMyCompletionService implementsCallable 07privateintid;0809publicMyCompletionService(inti)10this.id=i;1112publicstaticvoidmain(String args) throwsException13ExecutorService service=Executors.newCachedThreadPool();14CompletionService completion=newExecutorCompletionService(service);15for(inti=0;i10;i+)16completion.submit(newMyCompletionService(i);1718for(inti=0;i10;i+)19System.out.println(completion.take().get();2021service.shutdown();2223publicString call() throwsException 24Integer time=(int)(Math.random()*1000);25try26System.out.println(this.id+ start);27Thread.sleep(time);28System.out.println(this.id+ end);2930catch(Exception e)31e.printStackTrace();3233returnthis.id+:+time;3435CountDownLatch一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier。CountDownLatch 是一个通用同步工具,它有很多用途。将计数 1 初始化的 CountDownLatch 用作一个简单的开/关锁存器,或入口:在通过调用 countDown() 的线程打开入口前,所有调用 await 的线程都一直在入口处等待。用 N 初始化的 CountDownLatch 可以使一个线程在 N 个线程完成某项操作之前一直等待,或者使其在某项操作完成 N 次之前一直等待。CountDownLatch 的一个有用特性是,它不要求调用 countDown 方法的线程等到计数到达零时才继续,而在所有线程都能通过之前,它只是阻止任何线程继续通过一个 await。一下的例子是别人写的,非常形象。01importjava.util.concurrent.CountDownLatch;02importjava.util.concurrent.ExecutorService;03importjava.util.concurrent.Executors;04publicclassTestCountDownLatch 05publicstaticvoidmain(String args) throwsInterruptedException 06/ 开始的倒数锁07finalCountDownLatch begin = newCountDownLatch(1);08/ 结束的倒数锁09finalCountDownLatch end = newCountDownLatch(10);10/ 十名选手11finalExecutorService exec = Executors.newFixedThreadPool(10);1213for(intindex = 0; index 10; index+) 14finalintNO = index + 1;15Runnable run = newRunnable() 16publicvoidrun() 17try18begin.await();/一直阻塞19Thread.sleep(long) (Math.random() * 10000);20System.out.println(No.+ NO + arrived);21 catch(InterruptedException e) 22 finally23end.countDown();242526;27exec.submit(run);2829System.out.println(Game Start);30begin.countDown();31end.await();32System.out.println(Game Over);33exec.shutdown();3435CountDownLatch最重要的方法是countDown()和await(),前者主要是倒数一次,后者是等待倒数到0,如果没有到达0,就只有阻塞等待了。CyclicBarrier一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作 很有用。示例用法:下面是一个在并行分解设计中使用 barrier 的例子,很经典的旅行团例子:01importjava.text.SimpleDateFormat;02importjava.util.Date;03importjava.util.concurrent.BrokenBarrierException;04importjava.util.concurrent.CyclicBarrier;05importjava.util.concurrent.ExecutorService;06importjava.util.concurrent.Executors;07publicclassTestCyclicBarrier 08/ 徒步需要的时间: Shenzhen, Guangzhou, Shaoguan, Changsha, Wuhan09privatestaticint timeWalk = 5, 8, 15, 15, 10;10/ 自驾游11privatestaticint timeSelf = 1, 3, 4, 4, 5;12/ 旅游大巴13privatestaticint timeBus = 2, 4, 6, 6, 7;1415staticString now() 16SimpleDateFormat sdf = newSimpleDateFormat(HH:mm:ss);17returnsdf.format(newDate() + : ;1819staticclassTour implementsRunnable 20privateint times;21privateCyclicBarrier barrier;22privateString tourName;23publicTour(CyclicBarrier barrier, String tourName, int times) 24this.times = times;25this.tourName = tourName;26this.barrier = barrier;2728publicvoidrun() 29try30Thread.sleep(times0 * 1000);31System.out.println(now() + tourName + Reached Shenzhen);32barrier.await();33Thread.sleep(times1 * 1000);34System.out.println(now() + tourName + Reached Guangzhou);35barrier.await();36Thread.sleep(times2 * 1000);37System.out.println(now() + tourName + Reached Shaoguan);38barrier.await();39Thread.sleep(times3 * 1000);40System.out.println(now() + tourName + Reached Changsha);41barrier.await();42Thread.sleep(times4 * 1000);43System.out.println(now() + tourName + Reached Wuhan);44barrier.await();45 catch(InterruptedException e) 46 catch(BrokenBarrierException e) 47484950publicstaticvoidmain(String args) 51/ 三个旅行团52CyclicBarrier barrier = newCyclicBarrier(3);53ExecutorService exec = Executors.newFixedThreadPool(3);54exec.submit(newTour(barrier, WalkTour, timeWalk);55exec.submit(newTour(barrier, SelfTour, timeSelf);56/当我们把下面的这段代码注释后,会发现,程序阻塞了,无法继续运行下去。57exec.submit(newTour(barrier, BusTour, timeBus);58exec.shutdown();5960CyclicBarrier最重要的属性就是参与者个数,另外最要方法是await()。当所有线程都调用了await()后,就表示这些线程都可以继续执行,否则就会等待。FutureFuture 表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并检索计算的结果。计算完成后只能使用 get 方法来检索结果,如有必要,计算完成前可以阻塞此方法。取消则由 cancel 方法来执行。还提供了其他方法,以确定任务是正常完成还是被取消了。一旦计算完成,就不能再取消计算。如果为了可取消性而使用 Future但又不提供可用的结果,则可以声明 Future 形式类型、并返回 null 作为基础任务的结果。这个我们在前面CompletionService已经看到了,这个Future的功能,而且这个可以在提交线程的时候被指定为一个返回对象的。ScheduledExecutorService一个 ExecutorService,可安排在给定的延迟后运行或定期执行的命令。schedule 方法使用各种延迟创建任务,并返回一个可用于取消或检查执行的任务对象。scheduleAtFixedRate 和 scheduleWithFixedDelay 方法创建并执行某些在取消前一直定期运行的任务。用 Executor.execute(java.lang.Runnable) 和 ExecutorService 的 submit 方法所提交的命令,通过所请求的

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论