java 在线程执行器中执行任务并处理结果
是什么?为什么?怎么做?
一、是什么
java线程执行器可以执行一个任务列表,返回任务执行的结果,并且对结果进行处理。
二、为什么
之前我们给线程执行器执行的一直是单线程,而且都是能够成功执行的线程。
在现实开发中,经常会有这样的场景:
主线程等待很多子线程执行完毕,执行成功的子线程将返回结果,不成功的子线程将返回错误。这时主线程就要对返回结果进行相应的处理。
所以,我们需要线程执行器帮助实现这样的功能。
三、怎么做
1.运行多个任务并处理第一个结果
(1)两条线程都能成功执行,返回先执行完的线程的结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
package testMultiTask; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class test { public static void main(String[] args) { List<task> tasks = new ArrayList<task>(); task task1 = new task(); task task2 = new task(); tasks.add(task1); tasks.add(task2); ExecutorService executorService = Executors.newCachedThreadPool(); String result = null; try { result = executorService.invokeAny(tasks); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } executorService.shutdown(); System.out.println(result); } } class task implements Callable<String> { public String call() throws Exception { // TODO Auto-generated method stub System.out.println(Thread.currentThread().getName() + " start"); int time = new Random().nextInt(10); System.out.println(Thread.currentThread().getName() + " need " + time + "s"); Thread.sleep(time * 1000); System.out.println(Thread.currentThread().getName() + " end"); return Thread.currentThread().getName() + " finish"; } } |
需要注意的是executorService.invokeAny方法。线程执行器执行一个任务列表,如果任务列表中有一条线程执行成功,那么线程执行器将立即返回线程执行的结果。并且终止其他线程的执行。
在获取结果之前,当前线程会一直在invokeAny方法处被阻塞。
返回结果:
1 2 3 4 5 6 |
pool-1-thread-1 start pool-1-thread-2 start pool-1-thread-2 need 2s pool-1-thread-1 need 8s pool-1-thread-2 end pool-1-thread-2 finish |
在这里,因为thread2只需要2秒就执行完成了,而此时thread1正处于睡眠状态,被提前终止了(没有输出 thread1 end),输出是thread2的结果。
(2)两条线程,一条成功执行,一条报错,将返回执行成功的结果
改造一下例子,让thread2线程主动报错:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
package testMultiTask; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class test { public static void main(String[] args) { List<task> tasks = new ArrayList<task>(); task task1 = new task(); taskWithError task2 = new taskWithError(); tasks.add(task1); tasks.add(task2); ExecutorService executorService = Executors.newCachedThreadPool(); String result = null; try { result = executorService.invokeAny(tasks); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } executorService.shutdown(); System.out.println(result); } } class task implements Callable<String> { public String call() throws Exception { // TODO Auto-generated method stub System.out.println(Thread.currentThread().getName() + " start"); int time = new Random().nextInt(10); System.out.println(Thread.currentThread().getName() + " need " + time + "s"); Thread.sleep(time * 1000); System.out.println(Thread.currentThread().getName() + " end"); return Thread.currentThread().getName() + " finish"; } } class taskWithError extends task { public String call() throws Exception { // TODO Auto-generated method stub System.out.println(Thread.currentThread().getName() + " start"); int time = new Random().nextInt(10); System.out.println(Thread.currentThread().getName() + " need " + time + "s"); Thread.sleep(time * 1000); System.out.println(Thread.currentThread().getName() + " end"); throw new Exception("test exception"); } } |
如果报错的线程先执行完毕:
1 2 3 4 5 6 7 |
pool-1-thread-1 start pool-1-thread-2 start pool-1-thread-2 need 6s pool-1-thread-1 need 9s pool-1-thread-2 end pool-1-thread-1 end pool-1-thread-1 finish |
在这里,thread2先于thread1执行完毕。但是因为thread2报错,所以线程执行器等待thread1执行完毕后,才输出thread1的结果。
如果可成功执行的线程先执行完毕:
1 2 3 4 5 6 7 |
pool-1-thread-1 start pool-1-thread-2 start pool-1-thread-2 need 1s pool-1-thread-1 need 2s pool-1-thread-2 end pool-1-thread-1 end pool-1-thread-1 finish |
在这里,thread1先于thread2执行。因为thread1可以返回结果,所以线程执行器不需要再等待thread2执行,将其提前终止。
(3)两条线程都报错,invokeAny方法将抛出异常
改造一下例子,让两条线程都主动报错:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
package testMultiTask; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class test { public static void main(String[] args) { List<taskWithError> tasks = new ArrayList<taskWithError>(); taskWithError task1 = new taskWithError(); taskWithError task2 = new taskWithError(); tasks.add(task1); tasks.add(task2); ExecutorService executorService = Executors.newCachedThreadPool(); String result = null; try { result = executorService.invokeAny(tasks); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } executorService.shutdown(); System.out.println(result); } } class taskWithError implements Callable<String> { public String call() throws Exception { // TODO Auto-generated method stub System.out.println(Thread.currentThread().getName() + " start"); int time = new Random().nextInt(10); System.out.println(Thread.currentThread().getName() + " need " + time + "s"); Thread.sleep(time * 1000); System.out.println(Thread.currentThread().getName() + " end"); throw new Exception("test exception"); } } |
运行结果为invokeAny报错:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
pool-1-thread-2 start pool-1-thread-1 start pool-1-thread-2 need 0s pool-1-thread-1 need 4s pool-1-thread-2 end pool-1-thread-1 end java.util.concurrent.ExecutionException: java.lang.Exception: test exception null at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at java.util.concurrent.AbstractExecutorService.doInvokeAny(AbstractExecutorService.java:193) at java.util.concurrent.AbstractExecutorService.invokeAny(AbstractExecutorService.java:215) at testMultiTask.test.main(test.java:28) Caused by: java.lang.Exception: test exception at testMultiTask.taskWithError.call(test.java:58) at testMultiTask.taskWithError.call(test.java:1) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) |
两条线程都报出错误,无返回值,导致invokeAny方法报错。
2.运行多个任务并处理所有结果
(1)两条线程都能成功执行,返回所有线程的结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
package testMultiTask; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class test { public static void main(String[] args) { List<task> tasks = new ArrayList<task>(); task task1 = new task(); task task2 = new task(); tasks.add(task1); tasks.add(task2); ExecutorService executorService = Executors.newCachedThreadPool(); List<Future<String>> futures = null; try { futures = executorService.invokeAll(tasks); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } executorService.shutdown(); if (futures != null) { for (Future<String> future : futures) { try { System.out.println(future.get()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } } class task implements Callable<String> { public String call() throws Exception { // TODO Auto-generated method stub System.out.println(Thread.currentThread().getName() + " start"); int time = new Random().nextInt(10); System.out.println(Thread.currentThread().getName() + " need " + time + "s"); Thread.sleep(time * 1000); System.out.println(Thread.currentThread().getName() + " end"); return Thread.currentThread().getName() + " finish"; } } |
运行结果:
1 2 3 4 5 6 7 8 |
pool-1-thread-1 start pool-1-thread-2 start pool-1-thread-2 need 1s pool-1-thread-1 need 9s pool-1-thread-2 end pool-1-thread-1 end pool-1-thread-1 finish pool-1-thread-2 finish |
(2)两条线程,一条成功执行,一条报错,将输出各自的结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
package testMultiTask; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class test { public static void main(String[] args) { List<task> tasks = new ArrayList<task>(); task task1 = new task(); taskWithError task2 = new taskWithError(); tasks.add(task1); tasks.add(task2); ExecutorService executorService = Executors.newCachedThreadPool(); List<Future<String>> futures = null; try { futures = executorService.invokeAll(tasks); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } executorService.shutdown(); if (futures != null) { for (Future<String> future : futures) { try { System.out.println(future.get()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } } class task implements Callable<String> { public String call() throws Exception { // TODO Auto-generated method stub System.out.println(Thread.currentThread().getName() + " start"); int time = new Random().nextInt(10); System.out.println(Thread.currentThread().getName() + " need " + time + "s"); Thread.sleep(time * 1000); System.out.println(Thread.currentThread().getName() + " end"); return Thread.currentThread().getName() + " finish"; } } class taskWithError extends task { public String call() throws Exception { // TODO Auto-generated method stub System.out.println(Thread.currentThread().getName() + " start"); int time = new Random().nextInt(10); System.out.println(Thread.currentThread().getName() + " need " + time + "s"); Thread.sleep(time * 1000); System.out.println(Thread.currentThread().getName() + " end"); throw new Exception("test exception"); } } |
运行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
pool-1-thread-1 start pool-1-thread-2 start pool-1-thread-1 need 6s pool-1-thread-2 need 9s pool-1-thread-1 end pool-1-thread-2 end pool-1-thread-1 finish java.util.concurrent.ExecutionException: java.lang.Exception: test exception at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at testMultiTask.test.main(test.java:46) Caused by: java.lang.Exception: test exception at testMultiTask.taskWithError.call(test.java:94) at testMultiTask.task.call(test.java:1) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) |
(3)两条线程都报错,invokeAny方法将抛出两次异常
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
package testMultiTask; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class test { public static void main(String[] args) { List<taskWithError> tasks = new ArrayList<taskWithError>(); taskWithError task1 = new taskWithError(); taskWithError task2 = new taskWithError(); tasks.add(task1); tasks.add(task2); ExecutorService executorService = Executors.newCachedThreadPool(); List<Future<String>> futures = null; try { futures = executorService.invokeAll(tasks); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } executorService.shutdown(); if (futures != null) { for (Future<String> future : futures) { try { System.out.println(future.get()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } } class taskWithError implements Callable<String> { public String call() throws Exception { // TODO Auto-generated method stub System.out.println(Thread.currentThread().getName() + " start"); int time = new Random().nextInt(10); System.out.println(Thread.currentThread().getName() + " need " + time + "s"); Thread.sleep(time * 1000); System.out.println(Thread.currentThread().getName() + " end"); throw new Exception("test exception"); } } |
输出结果为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
pool-1-thread-1 start pool-1-thread-2 start pool-1-thread-2 need 7s pool-1-thread-1 need 6s pool-1-thread-1 end pool-1-thread-2 end java.util.concurrent.ExecutionException: java.lang.Exception: test exception at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at testMultiTask.test.main(test.java:46) Caused by: java.lang.Exception: test exception at testMultiTask.taskWithError.call(test.java:75) at testMultiTask.taskWithError.call(test.java:1) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) java.util.concurrent.ExecutionException: java.lang.Exception: test exception at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at testMultiTask.test.main(test.java:46) Caused by: java.lang.Exception: test exception at testMultiTask.taskWithError.call(test.java:75) at testMultiTask.taskWithError.call(test.java:1) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) |
四、总结
要体会线程执行器的思想。