博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
CompletionService实例
阅读量:2352 次
发布时间:2019-05-10

本文共 6702 字,大约阅读时间需要 22 分钟。

 

参考:http://m.blog.csdn.net/article/details?id=51287803

 

Java SE5的java.util.concurrent包中的执行器(Executor)将为你管理Thread对象,从而简化了并发编程。Executor在客户端和执行任务之间提供了一个间接层,Executor代替客户端执行任务。Executor允许你管理异步任务的执行,而无须显式地管理线程的生命周期。Executor在Java SE5/6中时启动任务的优选方法。Executor引入了一些功能类来管理和使用线程Thread,其中包括线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等

 

创建线程池

Executors类,提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。

 

public static ExecutorService newFixedThreadPool(int nThreads)

创建固定数目线程的线程池。

public static ExecutorService newCachedThreadPool()

创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。

public static ExecutorService newSingleThreadExecutor()

创建一个单线程化的Executor。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。

 

见类图,接口Executor只有一个方法execute,接口ExecutorService扩展了Executor并添加了一些生命周期管理的方法,如shutdown、submit等。一个Executor的生命周期有三种状态,运行 ,关闭 ,终止。

 

Callable,Future用于返回结果

Future<V>代表一个异步执行的操作,通过get()方法可以获得操作的结果,如果异步操作还没有完成,则,get()会使当前线程阻塞。FutureTask<V>实现了Future<V>和Runable<V>。Callable代表一个有返回值得操作。

实例:用ExecutorService  实现对一个大数组并行求和

 

package executor;import java.util.*;import java.util.concurrent.*;/* * 并行计算求和. * 本例中,把一个整数数组的求和分解到每个线程中,每个线程求该数值的部分和, * 然后主程序把各个和再次求和就能得到最后的数字。从这个架构上跟mapreduce有点神似。 *  */public class ExecutorServiceParalelSumdemo {		private int coreCpuNum;       private ExecutorService  executor;           /*      * save the result of each thread's sum calculation     *      */    private List
> tasks = new ArrayList
>(); public ExecutorServiceParalelSumdemo(){ coreCpuNum = Runtime.getRuntime().availableProcessors(); System.out.println("this host has "+coreCpuNum+ " CPU(s)"); //for before Java 8.0 //executor = Executors.newFixedThreadPool(coreCpuNum); //this CPU parallelism API is Java8 or later ONLY executor = Executors.newWorkStealingPool(coreCpuNum); } /* * thread main body */ class CalculatorTask implements Callable
{ int nums[]; int start; int end; public CalculatorTask(final int nums[],int start,int end){ this.nums = nums; this.start = start; this.end = end; } @Override public Long call() throws Exception { long sum =0; for(int i=start;i
nums.length){ end = nums.length; } //create thread tasks CalculatorTask calculator = new CalculatorTask(nums, start, end); //create each future result per thread task FutureTask
task = new FutureTask
(calculator); tasks.add(task); if(!executor.isShutdown()){ //execute() can't return result executor.submit(task); } } return getFinalSum(); } public void close(){ executor.shutdown(); } }

 

CompletionService

在上述例子中,getResult()方法的实现过程中,迭代了FutureTask的数组,如果任务还没有完成则当前线程会阻塞,如果我们希望任意任务完成后就把其结果加到result中,而不用依次等待每个任务完成,可以使用CompletionService。

它与ExecutorService最主要的区别在于submit的task不一定是按照加入时的顺序完成的。CompletionService对ExecutorService进行了包装,内部维护一个保存Future对象的BlockingQueue。只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,take()方法其实就是Producer-Consumer中的Consumer。它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。所以,先完成的必定先被取出。这样就减少了不必要的等待时间。

 

CompletionService版本的求和例子

 

package executor;import java.util.*;import java.util.concurrent.*;public class CompletionServiceDemo {		/*	 * 并行计算求和.	 * 本例中,把一个整数数组的求和分解到每个线程中,每个线程求该数值的部分和,	 * 然后主程序把各个和再次求和就能得到最后的数字。从这个架构上跟mapreduce有点神似。	 * 	 */			private int coreCpuNum;   	    private ExecutorService  executor;	    /*	     * CompletionService与ExecutorService最主要的区别在于	     *前者submit的task不一定是按照加入时的顺序完成的。CompletionService对ExecutorService进行了包装,	     *内部维护一个保存Future对象的BlockingQueue。	     *只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,take()方法其实就是Producer-Consumer中的Consumer。	     *它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。	     *所以,先完成的必定先被取出。这样就减少了不必要的等待时间。	     * 	     */	    /* 	     * CompletionService has a internal bloking queue to save the result of each 	     * thread's sum calculation. so List
> tasks appears unnecessary now * */ private CompletionService
mcs; /* * save the result of each thread's sum calculation * */ public CompletionServiceDemo(){ coreCpuNum = Runtime.getRuntime().availableProcessors(); System.out.println("this host has "+coreCpuNum+ " CPU(s)"); //for before Java 8.0 //executor = Executors.newFixedThreadPool(coreCpuNum); //this CPU parallelism API is Java8 or later ONLY executor = Executors.newWorkStealingPool(coreCpuNum); mcs=new ExecutorCompletionService<>(executor); } /* * thread main body */ class CalculatorTask implements Callable
{ int nums[]; int start; int end; public CalculatorTask(final int nums[],int start,int end){ this.nums = nums; this.start = start; this.end = end; } @Override public Long call() throws Exception { long sum =0; for(int i=start;i
nums.length){ end = nums.length; } //create thread tasks CalculatorTask mthread = new CalculatorTask(nums, start, end); if(!executor.isShutdown()){ mcs.submit(mthread); } } return getFinalSum(); } public void close(){ executor.shutdown(); } }

 

 

测试main方法:
package executor;public class MainTest {	public static void main(String[] args) {				System.out.println("ExcutorServer thread pool demo show");		int[] nums={12890,345678,2345,5678,865,234,3434,454,4656,67678,678,1234,6789};		ExecutorServiceParalelSumdemo mysum=new ExecutorServiceParalelSumdemo();				System.out.println("result per ExecutorServiceParalelSumdemo = "		                  +mysum.ParallelSum(nums));				System.out.println("CompletionServiceDemo thread pool demo show");		CompletionServiceDemo mcom=new CompletionServiceDemo();		System.out.println("result per CompletionServiceDemo = "		                 +mcom.ParallelSum(nums));	}}
输出:
 
ExcutorServer thread pool demo show
this host has 4 CPU(s)
4 future tasks in pool
result per ExecutorServiceParalelSumdemo = 452613
CompletionServiceDemo thread pool demo show
this host has 4 CPU(s)
result per CompletionServiceDemo = 452613
你可能感兴趣的文章
如何通过OpenFace实现人脸识别框架
查看>>
Angle和XBGoost以及Spark的性能对比
查看>>
IOS CoreImage实现人脸识别
查看>>
Tensorflow的高级封装
查看>>
Storm 1.1.0 集群安装
查看>>
图像压缩算法
查看>>
一张图看懂小程序全生态
查看>>
electron开发
查看>>
NodeJS开发c++扩展模块
查看>>
Electron如何调用NodeJS扩展模块
查看>>
NodeJS通过ffi调用DLL
查看>>
Electron通过ffi调用DLL
查看>>
Node.js & Electron的扩展模块
查看>>
Mysql semi-sync VS group replication, 谁快?
查看>>
Android Looper Message MessageQueue Handler
查看>>
Win10下安装卸载Oracle11g的教程及各种坑
查看>>
Zookeeper
查看>>
更新mysql5.7修改字符集
查看>>
Windows系统护眼色设置
查看>>
JUC多线程&lambda之美&ThreadLocal
查看>>