java Phaser的特点及使用
分析了一下Phaser的几个特点,试着写了几个例子。
一、动态调整注册任务的数量
Phaser可以通过register和bulkRegister方法动态增加注册任务的数量,此外也支持通过其构造函数进行指定初始数量。相对的,Phaser也可以通过arriveAndDeregister方法减少注册任务的数量。
之前的CyclicBarrier和CountDownLatch,注册的任务数量都必须确定,不能改变。
主要体现为以下几个方法:
1.register
将当前线程注册到Phaser中,注册的线程为未运行状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
class moretask implements Runnable { private Phaser phaser; public moretask(Phaser phaser) { // TODO Auto-generated constructor stub this.phaser = phaser; } public void run() { // TODO Auto-generated method stub phaser.register(); System.out.println(Thread.currentThread().getName() + " start"); int time = new Random().nextInt(10) * 1000; try { Thread.sleep(time); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " end"); phaser.arrive(); } } |
2.bulkRegister
为phaser注册5条未运行的线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
public class moretest { public static void main(String[] args) { Phaser phaser = new Phaser(); phaser.bulkRegister(5); moretask task = new moretask(phaser); for (int i = 0; i < 5; i++) { Thread thread = new Thread(task); thread.start(); } phaser.awaitAdvance(phaser.getPhase()); System.out.println("main thread end"); } } |
3.arriveAndDeregister
在到达时间点后,可以使用arriveAndDeregister方法取消当前线程的注册。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
package testPhaser; import java.util.Random; import java.util.concurrent.Phaser; public class moretest { public static void main(String[] args) { Phaser phaser = new Phaser(); phaser.bulkRegister(5); moretask task = new moretask(phaser); for (int i = 0; i < 5; i++) { Thread thread = new Thread(task); thread.start(); } System.out.println(phaser.getRegisteredParties()); // 输出为5 phaser.awaitAdvance(phaser.getPhase()); System.out.println(phaser.getRegisteredParties()); // 输出为0 } } class moretask implements Runnable { private Phaser phaser; public moretask(Phaser phaser) { // TODO Auto-generated constructor stub this.phaser = phaser; } public void run() { // TODO Auto-generated method stub System.out.println(Thread.currentThread().getName() + " start"); int time = new Random().nextInt(10) * 1000; try { Thread.sleep(time); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " end"); phaser.arriveAndDeregister(); // 取消当前线程的注册 } } |
二、“到达”
Phaser的部分功能和CyclicBarrier和CountDownLatch重合(这也说明Phaser可以代替这两者)。
具体实现可以查看:
http://www.xie4ever.com/2017/03/26/java-%E4%BD%BF%E7%94%A8phaser%E5%8F%96%E4%BB%A3cyclicbarrier%E5%92%8Ccyclicbarrier/
三、终止
Phaser可以主动进行终止。被终止之后,Phaser无法调整注册任务的数量。正在等待中的任务会立刻执行。
CyclicBarrier和CountDownLatch没有终止的功能。
1.重写onAdvance方法
在phaser中,有一个onAdvance方法,该方法在参与者数量为0的时候,返回true,来表示该phaser状态为终止状态。
它在phaser阶段改变的时候会自动执行,需要两个参数,当前阶段数和注册的参与者数量。继承并覆盖phaser的onAdvance方法,就可以实现阶段切换的功能。
如果使用这种方法,只有在phaser切换阶段时,才可以进行终止。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
package testPhaser; import java.util.Random; import java.util.concurrent.Phaser; public class moretest { public static void main(String[] args) { Phaser phaser = new MyPhaser(); phaser.bulkRegister(5); moretask task = new moretask(phaser); for (int i = 0; i < 5; i++) { Thread thread = new Thread(task); thread.start(); } phaser.awaitAdvance(phaser.getPhase()); } } class moretask implements Runnable { private Phaser phaser; public moretask(Phaser phaser) { // TODO Auto-generated constructor stub this.phaser = phaser; } public void run() { // TODO Auto-generated method stub System.out.println(Thread.currentThread().getName() + " start"); int time = new Random().nextInt(10) * 1000; try { Thread.sleep(time); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " end"); // phaser.arriveAndDeregister(); // 取消当前线程的注册,phaser会被终止 phaser.arrive(); // 不取消线程的注册,phaser不会被终止 } } class MyPhaser extends Phaser { @Override protected boolean onAdvance(int phase, int registeredParties) { // TODO Auto-generated method stub if (registeredParties == 0) { System.out.println("no registeredParties!"); return true; } else { System.out.println("phaser go on!"); } return super.onAdvance(phase, registeredParties); } } |
在这里,在phaser进入下一阶段时,将执行onAdvance方法。如果此时注册线程数为0,就输出“no registeredParties!”并且返回false,终止phaser。否则,就输出“phaser go on!”,返回true,phaser没有被终止。
2.forceTermination
重写onAdvance方法,只有在phaser切换阶段时才能终止phaser,有一定的局限性(但是能保证当前阶段线程执行顺序的安全)。如果使用forceTermination方法,phaser将立即终止,所有被阻塞的线程将会继续执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
package testPhaser; import java.util.Random; import java.util.concurrent.Phaser; public class moretest { public static void main(String[] args) { Phaser phaser = new Phaser(); phaser.bulkRegister(5); moretask task = new moretask(phaser); for (int i = 0; i < 5; i++) { Thread thread = new Thread(task); thread.start(); } phaser.awaitAdvance(phaser.getPhase()); System.out.println(phaser.isTerminated()); } } class moretask implements Runnable { private Phaser phaser; public moretask(Phaser phaser) { // TODO Auto-generated constructor stub this.phaser = phaser; } public void run() { // TODO Auto-generated method stub System.out.println(Thread.currentThread().getName() + " start"); int time = new Random().nextInt(10) * 1000; try { Thread.sleep(time); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " end"); phaser.arrive(); phaser.forceTermination(); } } |
isTerminated方法可以检测phaser是否被终止,如果被终止,就返回true。在这里,因为phaser被中断,所以被阻塞的主线程继续执行,中途输出true。
1 2 3 4 5 6 7 8 9 10 11 |
Thread-2 start Thread-3 start Thread-0 start Thread-4 start Thread-1 start Thread-1 end true Thread-3 end Thread-0 end Thread-2 end Thread-4 end |
四、层次结构
Phaser中有很多方法,为了线程安全需要加锁。所以这些方法将彼此竞争锁资源。如果在同一Phaser注册的线程过多,竞争的开销将会很大。
如果对Phaser进行分层,将有效减少竞争的开销,提高吞吐量。但是相应的,单个操作的开销将会变高(如果存在Phaser分层结构,那么父子Phaser之间就需要相互协调,所以单个操作的开销变高)。
简单的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
package testPhaser; import java.util.Random; import java.util.concurrent.Phaser; public class moretest { public static void main(String[] args) { Phaser phaser = new Phaser(); Phaser childPhaser = new Phaser(phaser); childPhaser.bulkRegister(5); moretask task = new moretask(childPhaser); for (int i = 0; i < 5; i++) { Thread thread = new Thread(task); thread.start(); } childPhaser.awaitAdvance(childPhaser.getPhase()); System.out.println(childPhaser); System.out.println(childPhaser.getParent()); } } class moretask implements Runnable { private Phaser phaser; public moretask(Phaser phaser) { // TODO Auto-generated constructor stub this.phaser = phaser; } public void run() { // TODO Auto-generated method stub System.out.println(Thread.currentThread().getName() + " start"); int time = new Random().nextInt(10) * 1000; try { Thread.sleep(time); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " end"); phaser.arrive(); } } |
在创建phaser时,使用构造方法,可以形成一个分层结构。
输出结果:
1 2 3 4 5 6 7 8 9 10 11 12 |
Thread-1 start Thread-4 start Thread-3 start Thread-0 start Thread-2 start Thread-1 end Thread-0 end Thread-3 end Thread-2 end Thread-4 end java.util.concurrent.Phaser@5c647e05[phase = 1 parties = 5 arrived = 0] java.util.concurrent.Phaser@33909752[phase = 1 parties = 1 arrived = 0] |
我的个人理解:
父节点phaser本身没有注册的线程,只有一个子节点childPhaser,线程数(parties)为子节点本身 + 自身注册的线程 = 1 + 0 = 1。因为子节点执行了一个阶段,所以父节点的phase也跟随+1。
关于分层结构有很多细节值得研究,暂时不深入讨论。
五、总结
phaser有更多灵活的特性,值得深入学习。