java ThreadPoolExecutor处理线程执行器中被拒绝的任务
是什么?为什么?怎么做?
一、是什么
1.线程执行器的结束
想要结束线程执行器的时候,可以调用shutdown方法让线程执行器结束。
但是,如果存在正在运行的任务或者等待执行的任务,线程执行器不会马上结束。要等到所有任务都执行完毕,线程执行器才会结束。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
package testReject; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class test { public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2); ExecutorService executorService = Executors.newFixedThreadPool(2); task task = new task(); executorService.submit(task); scheduledExecutorService.schedule(task, 3, TimeUnit.SECONDS); executorService.shutdown(); scheduledExecutorService.shutdown(); } } class task implements Runnable { public void run() { // TODO Auto-generated method stub System.out.println(Thread.currentThread().getName() + " start"); try { Thread.sleep(10000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " end"); } } |
运行结果:
1 2 3 4 |
pool-2-thread-1 start pool-1-thread-1 start pool-2-thread-1 end pool-1-thread-1 end |
2.任务被拒绝
如果在调用shutdown方法到执行器结束这个时间段中发送一个任务给线程执行器,这个任务会被拒绝(线程执行器即将关闭,不再接受任务)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
package testReject; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class test { public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2); ExecutorService executorService = Executors.newFixedThreadPool(2); task task = new task(); executorService.submit(task); scheduledExecutorService.schedule(task, 3, TimeUnit.SECONDS); executorService.shutdown(); scheduledExecutorService.shutdown(); task newTask = new task(); executorService.submit(newTask); scheduledExecutorService.schedule(newTask, 3, TimeUnit.SECONDS); } } class task implements Runnable { public void run() { // TODO Auto-generated method stub System.out.println(Thread.currentThread().getName() + " start"); try { Thread.sleep(10000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " end"); } } |
运行结果:
1 2 3 4 5 6 7 8 9 10 |
pool-2-thread-1 start Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@5c647e05 rejected from java.util.concurrent.ThreadPoolExecutor@33909752[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112) at testReject.test.main(test.java:26) pool-1-thread-1 start pool-2-thread-1 end pool-1-thread-1 end |
executorService直接报出拒绝任务的错误,scheduledExecutorService虽然没有报出错误,但是也拒绝执行延时任务(scheduledExecutorService没有拒绝服务的处理机制)。
那么被拒绝的任务应该如何处理呢?对此,ThreadPoolExecutor类提供了一套机制,当任务被拒绝时,就会调用这套机制对其进行处理。
二、为什么
任务不能提前预知线程执行器的结束。
如果任务非常重要,却又被线程执行器拒绝从而不能进行任何处理,可能会导致工作不能完成/出现不可预知的错误。
因此,有必要建立一套机制处理被线程执行器拒绝的任务。
三、怎么做
1.简单例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
package testReject; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; public class test { public static void main(String[] args) { ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2); rejectTaskHandler rejectTaskHandler = new rejectTaskHandler(); threadPoolExecutor.setRejectedExecutionHandler(rejectTaskHandler); task task = new task(); threadPoolExecutor.submit(task); threadPoolExecutor.shutdown(); task newTask = new task(); threadPoolExecutor.submit(newTask); } } class task implements Runnable { public void run() { // TODO Auto-generated method stub System.out.println(Thread.currentThread().getName() + " start"); try { Thread.sleep(10000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " end"); } } class rejectTaskHandler implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // TODO Auto-generated method stub System.out.println(r.toString() + " has been rejected"); System.out.println("ThreadPoolExecutor:" + executor.toString()); System.out.println("Terminating:" + executor.isTerminating()); System.out.println("Terminated:" + executor.isTerminated()); } } |
这里的重点就是setRejectedExecutionHandler方法,这是ThreadPoolExecutor类提供的方法,为了使用此方法,必须把线程执行器强转为ThreadPoolExecutor:
1 |
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2); |
之后,我们自己实现一个拒绝任务处理类,实现RejectedExecutionHandler接口:
1 2 3 4 5 6 7 8 9 10 11 |
class rejectTaskHandler implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // TODO Auto-generated method stub System.out.println(r.toString() + " has been rejected"); System.out.println("ThreadPoolExecutor:" + executor.toString()); System.out.println("Terminating:" + executor.isTerminating()); System.out.println("Terminated:" + executor.isTerminated()); } } |
这样就可以对线程执行器拒绝的服务进行处理了。
运行结果:
1 2 3 4 5 6 |
java.util.concurrent.FutureTask@70dea4e has been rejected pool-1-thread-1 start ThreadPoolExecutor:java.util.concurrent.ThreadPoolExecutor@5c647e05[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0] Terminating:true Terminated:false pool-1-thread-1 end |
2.复杂一点的例子
修改一下拒绝任务处理类,执行被拒绝的任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
class rejectTaskHandler implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // TODO Auto-generated method stub System.out.println(r.toString() + " has been rejected"); ExecutorService executorService = Executors.newCachedThreadPool(); executorService.submit(r); executorService.shutdown(); } } |
运行结果:
1 2 3 4 5 |
java.util.concurrent.FutureTask@70dea4e has been rejected pool-1-thread-1 start pool-2-thread-1 start pool-1-thread-1 end pool-2-thread-1 end |
这就保证了所有任务都能被执行。
四、总结
平时似乎很少有使用的机会…