java 对生产者消费者问题、读者写者问题的简单研究

之前我写了一篇文章实现生产者消费者模式,谈到了读者写者问题(未解决)。最近看到一篇写得非常好的文章(比我总结的好多了),所以按照该文作者的思路重新做一次简单的总结。

参考:http://www.cnblogs.com/jiangyang/p/6007030.html

 

一、为什么使用生产者消费者模式?

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。

在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。

同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,便有了生产者和消费者模式。

举个例子:假设我有一台打印机,有十份文件需要打印,“命令打印机打印一份文件”(生产者生产的速度)是很快的,但是“打印机实际打印一份文件”(消费者消费的速度)是需要一段时间的。如果不使用生产者消费者模式,我每次只能命令打印机打印一份文件,等待打印机打印完后,才能命令打印机打印下一份文件,效率非常低。

从常识出发,我们肯定会一次性设置一个打印队列来存放我们的命令(把命令生产到容器中),任务就结束了。打印机每打完一份文件,就拿出下一个命令(消费容器中的命令),自动打下一份文件。其实这就是生产者消费者模式,生产者不必受消费者的制约(反过来同理,消费者也不必受生产者的制约),起到了解耦的效果。

总结一下:

生产者消费者模式通过一个容器解决生产者和消费者的强耦合问题。

生产者和消费者彼此之间不直接通讯,而通过阻塞队列(这个阻塞队列就是用来给生产者和消费者解耦的)来进行通讯。生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取。阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类。在学习一些设计模式的过程中,如果先找到这个模式的第三者,能帮助我们快速熟悉一个设计模式。

二、生产者消费者问题的解决方法

生产者消费者问题是研究多线程程序时绕不开的经典问题之一,它描述是有一块缓冲区作为仓库,生产者可以将产品放入仓库,消费者则可以从仓库中取走产品。

解决生产者/消费者问题的方法可分为两类:(1)采用某种机制保护生产者和消费者之间的同步;(2)在生产者和消费者之间建立一个管道。

第一种方式有较高的效率,并且易于实现,代码的可控制性较好,属于常用的模式。第二种管道缓冲区不易控制,被传输数据对象不易于封装等,实用性不强。

同步问题核心在于:如何保证同一资源被多个线程并发访问时的完整性。常用的同步方法是采用信号或加锁机制,保证资源在任意时刻至多被一个线程访问。Java语言在多线程编程上实现了完全对象化,提供了对同步机制的良好支持。

在Java中一共有五种方法支持同步,其中前四个是同步方法,第五个是管道方法。

  • wait()、notify()方法。
  • await()、signal()方法(即使用Lock)。
  • BlockingQueue阻塞队列方法。
  • Semaphore方法。
  • PipedInputStream、PipedOutputStream方法。

前三章在上一篇文章已将讲全了,本文主要研究第四、第五种方法。

java 实现生产者消费者模式的三种方法

(1)Semaphore

关于Semaphore,可以参考:

http://www.xie4ever.com/2017/03/20/java-%E4%BD%BF%E7%94%A8semaphore%E6%8E%A7%E5%88%B6%E5%B9%B6%E5%8F%91%E8%AE%BF%E9%97%AE%E8%B5%84%E6%BA%90/

Semaphore是信号量,意思是设置一定数量的令牌,线程拿到令牌就可以执行,没有拿到令牌的线程则需要等待,等待执行完毕的线程归还令牌。具体实现为:

TestSemaphore.java

notFull.acquire()与mutex.acquire()的位置不能互换。如果线程A先拿到了互斥锁再进入等待(等待令牌),那么线程A无法释放互斥锁。如果此时线程B已经获取令牌但是无法获得互斥锁,那么线程B无法释放令牌。线程A和线程B进入环路相互等待,导致死锁。

个人认为比较难理解的是:

mutex信号量只有1,所以这里的Semaphore作用是synchronized,保证生产和消费的同步性。

但是notEmpty信号量为0我就很难理解了…如果信号量为0,那么notFull.acquire()将无法获取令牌,当前线程不就立刻进入阻塞状态了吗?

后来我发现,在执行acquire()前,notEmpty已经执行了notEmpty。个人认为,当信号量为0时,虽然无法执行acquire(),但是可以使用release()。只要等待某线程release之后,信号量 + 1,马上就可以acquire。

这里的notFull信号量设置为0,就是想让消费者线程马上进入阻塞状态,直到生产者加入了信号量为止。

(2)PipedInputStream、PipedOutputStream

这两个类位于java.io包中。一个线程将数据写入管道(生产者),另一个线程从管道读取数据(消费者),管道是缓冲区(容器),这就构成了生产者消费者模式。

TestPiped.java

三、读者写者问题的解决方法

读者—写者问题(Readers-Writers problem)是一个经典的并发程序设计问题,是经常出现的一种同步问题。

计算机系统中的数据(文件、记录)常被多个进程共享,但其中某些进程可能只要求读数据(称为读者Reader);另一些进程则要求修改数据(称为写者Writer)。就共享数据而言,Reader和Writer是两组并发进程共享一组数据区,要求:

  • 允许多个读者同时执行读操作。
  • 不允许读者、写者同时操作。
  • 不允许多个写者同时操作。

Reader和Writer的同步问题分为读者优先、写者优先、公平竞争三种情况,它们的处理方式不同。

个人认为,读者写者问题比生产者消费者问题要难。这两个问题的区别有点像普通的lock和读写锁之间的区别。

(1)读写锁

关于读写锁,可以参考:

http://www.xie4ever.com/2017/03/16/java-%E8%AF%BB%E5%86%99%E9%94%81/

看到读者写者问题,我们应该很快反应,应该使用lock中的读写锁。具体实现为:

TestReadWriteLock.java

个人认为java中的读写锁封装得非常好,可以非常简单地解决读者写者问题。

(2)Semaphore

单纯使用信号量不能解决读者写者问题,必须引入计数器count(可以用CountDownLatch代替)对读进程计数,实现“读写进程需要同步,读进程间不需要同步”。

(注:使用Semaphore解决读者写者问题比较复杂,我暂时还没有弄明白,暂时不把代码贴出来。以后有机会再来填这个坑)

四、总结

算是全面复习了一次生产者消费者模式和读写锁,解决了之前的一些疑惑。