java CompletionService获取线程异步执行的结果
是什么?为什么?怎么做?
感谢:http://www.cnblogs.com/nayitian/p/3273468.html
一、是什么
想要获取线程异步执行的结果,有两种做法,一种是使用Future接口,另一种是使用CompletionService接口。
CompletionService是java.util.concurrent包提供的一个接口,用来获取线程异步执行的结果。
这个接口看上去效果和Future很像,实现的功能相近,但是实现原理不同,适用场景也不同,不可一概而论。
二、为什么
1.Futrue的缺陷
Future也是java.util.concurrent包提供的一个接口,用来获取线程异步执行的结果。那么CompletionService和Future有什么不同?为什么CompletionService效果更好?
之所以使用CompletionService,是因为Future存在一些缺陷。
如果使用Future,需要自己维护一个集合,保存submit方法返回的Future对象。然后在主线程中遍历这个集合并且使用Future对象的get方法获取异步线程的返回值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
package testCompletionService; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; public class testCompletionService { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(2); BlockingQueue<Future<String>> blockingQueue = new LinkedBlockingQueue<Future<String>>(); for (int i = 0; i < 5; i++) { Future<String> future = executorService.submit(new taskA("thread" + i)); blockingQueue.add(future); } for (Future<String> future : blockingQueue) { try { System.out.println(future.get()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } } executorService.shutdown(); } } class taskA implements Callable<String> { private String name; public taskA(String name) { // TODO Auto-generated constructor stub this.name = name; } public String call() throws Exception { // TODO Auto-generated method stub System.out.println(name + " start"); int time = new Random().nextInt(10); System.out.println(name + " need " + time + "s"); Thread.sleep(time * 1000); System.out.println(name + " end"); return name + " result"; } } |
运行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
thread0 start thread1 start thread0 need 8s thread1 need 6s thread1 end thread2 start thread2 need 5s thread0 end thread3 start thread3 need 8s thread0 result thread1 result thread2 end thread4 start thread4 need 9s thread2 result thread3 end thread3 result thread4 end thread4 result |
从输出结果可以看到,thread0需要运行8s,而thread1只需要运行6s,thread1先于thread0执行完毕,但是为什么thread1没有先于thread0输出结果?
这是因为,blockingQueue集合输入的Future对象是有顺序的,在遍历blockingQueue集合时,将从第一个输入的Future对象开始遍历。而第一个Future对象需要使用get方法获取thread0异步执行的结果。因为thread0还没有执行完,主线程会阻塞在get方法上,即使别的线程已经执行完,主线程依然处于阻塞状态。所以,就算thread1先于thread0执行完,也要等到主线程获取thread0的结果之后,才会获取thead1的结果。
如果系统被设计成每条线程执行完后就能根据其结果继续做后面的事,处于集合后面但是先完成的线程就会增加额外的等待时间,导致系统效率变低。
在这种情况下,Future表现不佳,CompletionService是更好的解决方案。
2.CompletionService能不能完全取代Future?
并不是这么回事。
CompletionService在强调“线程执行完毕马上采取操作”的场景中特别适用。take方法将获取“第一个执行完成的任务结果”,如果所有任务都没有完成,当前线程就会阻塞在take方法处。一旦take成功获取了一个执行完毕的任务,线程执行器就会将其移除。
也就是说,take获取的任务对象都是已经执行完毕的任务,状态都是执行完毕,一旦take成功获取任务,线程执行器就会将其移除,无法再次从线程执行器中获取此任务。
如果想要获取任务执行的状态(比如说检查任务是否执行完毕,如果没有则记录日志/取消任务),那么还是需要使用Future。CompletionService不能完全取代Future,在下面的例子中会有体现。
我个人认为CompletionService还可以代替线程执行器中的invokeAny和invokeAll方法。invokeAny从任务集中获取第一个执行成功的结果,之后就可以取消其他的任务。而CompletionService只要获取第一个Futrue,获取结果,就可以关闭线程执行器(虽然实现的原理不一样,但是效果是一样的)。invokeAll是获取任务集中所有结果,CompletionService也可以做到这一点。具体实现就不在这里拓展了。
三、怎么做
1.简单例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
package testCompletionService; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class testCompletionService { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(2); CompletionService<String> completionService = new ExecutorCompletionService<String>(executorService); for (int i = 0; i < 5; i++) { taskB taskB = new taskB("thread" + i); completionService.submit(taskB); } for (int i = 0; i < 5; i++) { try { Future<String> future = completionService.take(); System.out.println(future.get()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } } executorService.shutdown(); } } class taskB implements Callable<String> { private String name; public taskB(String name) { // TODO Auto-generated constructor stub this.name = name; } public String call() throws Exception { // TODO Auto-generated method stub System.out.println(name + " start"); int time = new Random().nextInt(10); System.out.println(name + " need " + time + "s"); Thread.sleep(time * 1000); System.out.println(name + " end"); return name + " result"; } } |
运行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
thread0 start thread1 start thread0 need 8s thread1 need 2s thread1 end thread2 start thread1 result thread2 need 9s thread0 end thread3 start thread3 need 5s thread0 result thread2 end thread4 start thread4 need 7s thread2 result thread3 end thread3 result thread4 end thread4 result |
线程运行结束后,第一时间就会输出结果。
2.复杂一点的例子
老师让三个学生做一道题,每隔一段时间看看学生做完没有,如果学生没做完,就提醒一下做题方法,如果学生做完了,就拿给老师检查。
我一开始以为CompletionService和Future的使用方式大同小异,就写成了这样:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
package testCompletionService; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class testTeacher { public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(3); CompletionService<String> completionService = new ExecutorCompletionService<String>(executorService); for (int i = 0; i < 3; i++) { student student = new student("student" + i); completionService.submit(student); } for (int i = 0; i < 3; i++) { Future<String> future = completionService.take(); if (future.isDone()) { try { System.out.println(future.get()); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } } else { System.out.println("teacher give advice"); } } for (int i = 0; i < 3; i++) { Future<String> future = completionService.take(); try { System.out.println(future.get()); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } } executorService.shutdown(); } } class student implements Callable<String> { private String name; public student(String name) { // TODO Auto-generated constructor stub this.name = name; } public String call() throws Exception { // TODO Auto-generated method stub int time = new Random().nextInt(10); System.out.println(name + " is solving the problem , need " + time + "s"); Thread.sleep(time * 1000); return name + " give the result"; } } |
出现了奇怪的现象,运行结果永远类似:
1 2 3 4 5 6 |
student0 is solving the problem , need 4s student2 is solving the problem , need 3s student1 is solving the problem , need 0s student1 give the result student2 give the result student0 give the result |
主线程会被阻塞在第二个take方法处,程序无法退出:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
for (int i = 0; i < 3; i++) { Future<String> future = completionService.take(); // 被阻塞在这里 if (future.isDone()) { try { System.out.println(future.get()); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } } else { System.out.println("teacher give advice"); } } |
后来我仔细看了CompletionService的文档,才发现自己搞错了用法。take方法的描述为:
Retrieves and removes the Future representing the next completed task, waiting if none are yet present.
take方法需要获取第一个“执行完毕”的任务对象,如果所有任务都没有完成,当前线程就会阻塞在take方法处。一旦take成功获取了一个执行完毕的任务,线程执行器就会将其移除,无法再次从线程执行器中获取此任务。
所以,这里的else是没有意义的。take方法获取的都是已经执行完毕的任务,future.isDone()永远为true,else永远不会执行。:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
for (int i = 0; i < 3; i++) { Future<String> future = completionService.take(); if (future.isDone()) { try { System.out.println(future.get()); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } } else { System.out.println("teacher give advice"); } } |
这个循环也是没有意义的。在执行这个循环之前,所有的任务已经被获取并移除了。因为线程执行器中不存在任务,所以当前线程会阻塞在take方法处,程序无法退出:
1 2 3 4 5 6 7 8 9 10 11 |
for (int i = 0; i < 3; i++) { Future<String> future = completionService.take(); try { System.out.println(future.get()); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } } |
这个场景强调的是线程执行的状态(老师想知道学生是否已经完成),所以更适合使用Future。CompletionService更适合“线程执行完毕马上采取操作”的场景,在这里不适用。
至此,奇怪的现象得以解释,同时也证明了在很多情景中CompletionService无法代替Future。
四、总结
要清楚到CompletionService的使用场景,具体情况具体分析。