1. 生产者消费者模式

我们先来看看什么是生产者消费者模式,生产者消费者模式是程序设计中非常常见的一种设计模式,被广泛运用在解耦、消息队列等场景。

在现实世界中,我们把生产商品的一方称为生产者,把消费商品的一方称为消费者,有时生产者的生产速度特别快,但消费者的消费速度跟不上,俗称“产能过剩”,又或是多个生产者对应多个消费者时,大家可能会手忙脚乱。

如何才能让大家更好地配合呢?

这时在生产者和消费者之间就需要一个中介来进行调度,于是便诞生了生产者消费者模式。

使用生产者消费者模式通常需要在两者之间增加一个阻塞队列作为媒介,有了媒介之后就相当于有了一个缓冲,平衡了两者的能力,整体的设计如图所示,最上面是阻塞队列,右侧的 1 是生产者线程,生产者在生产数据后将数据存放在阻塞队列中,左侧的 2 是消费者线程,消费者获取阻塞队列中的数据。而中间的 3 和 4 分别代表生产者消费者之间互相通信的过程,因为无论阻塞队列是满还是空都可能会产生阻塞,阻塞之后就需要在合适的时机去唤醒被阻塞的线程。

那么什么时候阻塞线程需要被唤醒呢?

有两种情况。

  • 第一种情况是当消费者看到阻塞队列为空时,开始进入等待,这时生产者一旦往队列中放入数据,就会通知所有的消费者,唤醒阻塞的消费者线程。
  • 另一种情况是如果生产者发现队列已经满了,也会被阻塞,而一旦消费者获取数据之后就相当于队列空了一个位置,这时消费者就会通知所有正在阻塞的生产者进行生产,这便是对生产者消费者模式的简单介绍。

2. Java实现生产者消费者模式

2.1 解决方案

思路

  • 1.采用某种机制保护生产者和消费者之间的同步。有较高的效率,并且易于实现,代码的可控制性较好,属于常用的模式。
  • 2.在生产者和消费者之间建立一个管道。管道缓冲区不易控制,被传输数据对象不易于封装等,实用性不强。

解决问题的核心

保证同一资源被多个线程并发访问时的完整性。常用的同步方法是采用信号或加锁机制,保证资源在任意时刻至多被一个线程访问。

Java能实现的几种方法

  • 1.wait() / notify()方法
  • 2.await() / signal()方法
  • 3.BlockingQueue阻塞队列方法
  • 4.信号量
  • 5.管道

2.2 代码实现

先把几个方法都会用到的公共代码贴上。

生产者代码如下:

public class Producer implements Runnable {

private Storage storage;

public Producer() {
}

public Producer(Storage storage) {
this.storage = storage;
}

@Override
public void run() {
while (true) {
try {
TimeUnit.SECONDS.sleep(1);
storage.produce();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

消费者代码如下:

public class Consumer implements Runnable {

private Storage storage;

public Consumer() {}

public Consumer(Storage storage) {
this.storage = storage;
}

@Override
public void run() {
while (true) {
try {
TimeUnit.SECONDS.sleep(3);
storage.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

测试代码:

public class ProdConsumerTest {  

public static void main(String[] args) {
Storage storage = new Storage(10);

Thread p1 = new Thread(new Producer(storage));
Thread p2 = new Thread(new Producer(storage));
Thread p3 = new Thread(new Producer(storage));

Thread c1 = new Thread(new Consumer(storage));
Thread c2 = new Thread(new Consumer(storage));
Thread c3 = new Thread(new Consumer(storage));

p1.start();
p2.start();
p3.start();
c1.start();
c2.start();
c3.start();
}

}

1. BlockingQueue阻塞队列方法

先从最简单的 BlockingQueue 开始讲起:

BlockingQueue是JDK5.0的新增内容,它是一个已经在内部实现了同步的队列,实现方式采用的是我们第2种await() / signal()方法。它可以在生成对象时指定容量大小,用于阻塞操作的是put()和take()方法。

  • put()方法:容量达到最大时,自动阻塞。

  • take()方法:容量为0时,自动阻塞。

public class Storage {

// 仓库存储的载体
private BlockingQueue<Object> queue;

// 使用LinkedBlockingDeque(双端队列)也可以
//private BlockingQueue<Object> queue = new LinkedBlockingDeque<>(10);

public Storage (int size) {
this.queue = new ArrayBlockingQueue<>(size);
}

public void produce() {
try {
queue.put(new Object());
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】生产一个产品,现库存" + queue.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public void consume() {
try {
queue.take();
System.out.println("【消费者" + Thread.currentThread().getName()
+ "】消费了一个产品,现库存" + queue.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

测试结果如下:

可能会出现put()或take()和System.out.println()输出不匹配的情况,是由于它们之间没有同步造成的。BlockingQueue可以放心使用,这可不是它的问题,只是在它和别的对象之间的同步有问题。

2. wait() / notify()方法

当缓冲区已满时,生产者线程停止执行,放弃锁,使自己处于等状态,让其他线程执行;当缓冲区已空时,消费者线程停止执行,放弃锁,使自己处于等状态,让其他线程执行。

当生产者向缓冲区放入一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态;当消费者从缓冲区取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。

代码如下:

public class Storage {

// 仓库容量
private int maxSize = 10;
// 仓库存储的载体
private LinkedList<Object> queue;

public Storage (int size) {
this.maxSize = size;
this.queue = new LinkedList<>();
}

public void produce() {
synchronized (queue) {
while (queue.size() + 1 > maxSize) {
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】仓库已满");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.add(new Object());
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】生产一个产品,现库存" + queue.size());
queue.notifyAll();
}
}

public void consume() {
synchronized (queue) {
while (queue.size() == 0) {
System.out.println("【消费者" + Thread.currentThread().getName()
+ "】仓库为空");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.remove();
System.out.println("【消费者" + Thread.currentThread().getName()
+ "】消费一个产品,现库存" + queue.size());
queue.notifyAll();
}
}

}

运行结果如下:

一个生产者线程运行produce方法,睡眠1s;一个消费者运行一次consume方法,睡眠3s。此次实验过程中,有3个生产者和3个消费者,也就是我们说的多对多的情况。仓库的容量为10,可以看出消费的速度明显慢于生产的速度,符合设定。

注意

notifyAll()方法可使所有正在等待队列中等待同一共享资源的“全部”线程从等待状态退出,进入可运行状态。此时,优先级最高的哪个线程最先执行,但也有可能是随机执行的,这要取决于JVM虚拟机的实现。即最终也只有一个线程能被运行,上述线程优先级都相同,每次运行的线程都不确定是哪个,后来给线程设置优先级后也跟预期不一样,还是要看JVM的具体实现吧。

3. await() / signal()方法

在JDK5中,用ReentrantLock和Condition可以实现等待/通知模型,具有更大的灵活性。通过在Lock对象上调用newCondition()方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。

在这里只需改动Storage类:

public class Storage {

// 仓库最大存储量
private int maxSize = 10;
// 仓库存储的载体
private LinkedList<Object> queue;
// 锁
private ReentrantLock lock;
// 仓库满的条件变量
private Condition full;
// 仓库空的条件变量
private Condition empty;

public Storage(int size) {
this.maxSize = size;
queue = new LinkedList<>();
lock = new ReentrantLock();
full = lock.newCondition();
empty = lock.newCondition();
}

public void produce() {
// 获得锁
lock.lock();
try {
while (queue.size() + 1 > maxSize) {
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】仓库已满");
full.await();
}
queue.add(new Object());
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】生产一个产品,现库存" + queue.size());

empty.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void consume() {
// 获得锁
try {
while (queue.isEmpty()) {
System.out.println("【消费者" + Thread.currentThread().getName()
+ "】仓库为空");
empty.await();
}
queue.remove();
System.out.println("【消费者" + Thread.currentThread().getName()
+ "】消费一个产品,现库存" + queue.size());

full.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

}

运行结果与wait()/notify()类似。

4. 信号量

Semaphore是一种基于计数的信号量。它可以设定一个阈值,基于此,多个线程竞争获取许可信号,做完自己的申请后归还,超过阈值后,线程申请许可信号将会被阻塞。Semaphore可以用来构建一些对象池,资源池之类的,比如数据库连接池,我们也可以创建计数为1的Semaphore,将其作为一种类似互斥锁的机制,这也叫二元信号量,表示两种互斥状态。计数为0的Semaphore是可以release的,然后就可以acquire(即一开始使线程阻塞从而完成其他执行)。

public class Storage {

// 仓库存储的载体
private LinkedList<Object> list = new LinkedList<Object>();
// 仓库的最大容量
Semaphore notFull;
// 将线程挂起,等待其他来触发
final Semaphore notEmpty = new Semaphore(0);
// 互斥锁
final Semaphore mutex = new Semaphore(1);

public Storage(int size) {
notFull = new Semaphore(size);
}

public void produce()
{
try {
notFull.acquire();
mutex.acquire();
list.add(new Object());
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】生产一个产品,现库存" + list.size());
}
catch (Exception e) {
e.printStackTrace();
} finally {
mutex.release();
notEmpty.release();
}
}

public void consume()
{
try {
notEmpty.acquire();
mutex.acquire();
list.remove();
System.out.println("【消费者" + Thread.currentThread().getName()
+ "】消费一个产品,现库存" + list.size());
} catch (Exception e) {
e.printStackTrace();
} finally {
mutex.release();
notFull.release();
}
}
}

3. Go实现生产者消费者

单向channel最典型的应用是“生产者消费者模型”。channel又分为有缓冲和无缓冲channel。channel中参数传递的时候,是作为引用传递。

3.1 无缓冲channel

示例代码一实现如下:

package main

import (
"fmt"
"time"
)

func producer(name string, out chan<- int) {
for i := 0; i < 10; i++ {
data := i * i

fmt.Println(name+"-生产数据: ", data)
out <- data // 缓冲区写入数据
}
}

func consumer(name string, in <-chan int) {
// 无需同步机制,先做后做
// 没有数据就阻塞等
for data := range in {
fmt.Println(name+"-得到数据:", data)
}
}

func main() {

// 传参的时候显式类型像隐式类型转换,双向管道向单向管道转换
ch := make(chan int) //无缓冲channel
go producer("生产者1", ch)
//go producer("生产者2", ch)
go consumer("消费者1", ch)
//go consumer("消费者2", ch)

time.Sleep(10 * time.Second)
close(ch) //关闭管道
}

这里使用无缓冲channel,生产者生产一次数据放入channel,然后消费者从channel读取数据,如果没有只能等待,也就是阻塞,直到管道被关闭。所以宏观是生产者消费者同步执行。

生产者每一次生产,消费者也只能拿到一次数据,缓冲区作用不大。结果如下:

3.2 有缓冲channel

示例代码二如下:

package main

import (
"fmt"
"time"
)

func producer(name string, out chan<- int) {
for i := 0; i < 10; i++ {
data := i * i

fmt.Println(name+"-生产数据: ", data)
out <- data // 缓冲区写入数据
}
}

func consumer(name string, in <-chan int) {
// 无需同步机制,先做后做
// 没有数据就阻塞等
for data := range in {
fmt.Println(name+"-得到数据:", data)
}
}

func main() {

// 传参的时候显式类型像隐式类型转换,双向管道向单向管道转换
ch := make(chan int, 5) //无缓冲channel
go producer("生产者1", ch)
//go producer("生产者2", ch)
go consumer("消费者1", ch)
//go consumer("消费者2", ch)

time.Sleep(30 * time.Second)
close(ch) //关闭管道
}

有缓冲channel,只修改ch := make(chan int, 5) // 添加缓冲一句,只要缓冲区不满,生产者可以持续向缓冲区channel放入数据,只要缓冲区不为空,消费者可以持续从channel读取数据。

就有了异步,并发的特性。结果如下:

这里之所以终端生产者连续打印了大于缓冲区容量的数据,是因为终端打印属于系统调用也是有延迟的,IO操作的时候,生产者同时向管道写入,请求打印,管道的写入读取与终端输出打印速度不匹配。

3.3 多生产者和多消费者

场景描述

  • 生产者:多个生产者,生产速度高于消费者消费速度
  • 消费者:多个消费者
  • 数据同步:程序中止信号发出,生产者暂停生产并退出线程,消费者继续消费,直到缓存数据被消费完。

代码实现

package main

import (
"fmt"
"strconv"
"sync"
"time"
)

// 暂停标志
var bStop = false

// 模拟异常/超时等使程序停止
func makeStop() {
time.Sleep(4 * time.Second)
bStop = true
}

// 生产者
func producer(id int, wg *sync.WaitGroup, out chan<- int) {

count := 0

// 标志位为false,不断写入数据
for !bStop {
// 模拟生产数据的耗时
time.Sleep(2 * time.Second)
count++
data := count
fmt.Println("producer-"+strconv.Itoa(id)+"<-->生产:", data)
out <- data
}

wg.Done()

}

// 消费者
func consumer(id int, wg *sync.WaitGroup, in <-chan int) {

// 不断读取,直到通道关闭
for data := range in {
time.Sleep(1 * time.Second)
fmt.Println("consumer-"+strconv.Itoa(id)+"<-->消费:", data)
}

wg.Done()

}

func main() {

// 缓存:模拟生产者完成生产,消费者未完成消费
ch := make(chan int, 30)

// 生产者和消费者计数器
wgPd := new(sync.WaitGroup)
wgCs := new(sync.WaitGroup)

// producer
for i := 0; i < 5; i++ {
wgPd.Add(1)
go producer(i, wgPd, ch)
}

// consumer
for i := 0; i < 3; i++ {
wgCs.Add(1)
go consumer(i, wgCs, ch)
}

go makeStop()

wgPd.Wait()
// 生产完成,关闭通道
close(ch)
wgCs.Wait()

}

运行结果如下:

该程序可以保证生产者数据全部被消费者消费。

重点解析:

消费者通过range读取数据

 golang 通道读取方式有select和range,本场景更适合range读取数据,因为for循环中,如果通道未被关闭,线程会堵塞读取,即使数据为空,除非通道被关闭,才会退出循环。

生产者完成之后,需要关闭通道,消费者才能正常退出

wgPd.Wait()  
// 生产完成,关闭通道
close(ch)
wgCs.Wait()

当生产完成之后,关闭通道,消费者发现通道关闭,对应线程才会退出,即生产者完成生产,通过关闭通道告诉消费者,我已完成生产,你消费完剩余数据就退出吧。

makeStop的作用

模拟程序收到暂停信号,或者超时信号,一般情况是通过通道来进行标志,本文通过bool来进行标志,因为通道适用于单线程select模型,本文中生产者为多线程,如果通过放多个数据(数据个数=生产者个数)到通道,也可解决,但是多维护一份数据的一致性没有必要。

生产者数量>消费者数量以及通道加缓存

生产者数量>消费者数量:模拟生产速度大于消费速度的情况,同时生产者和消费者加休眠时间模拟数据处理的耗时;通道加缓存是为了模拟,生产者生产了一定数量数据到通道,并停止生产,验证消费者是否将缓存数据消费。

参考感谢
golang多线程生产者消费者模型 - Memset - 博客园 (cnblogs.com)
生产者消费者模型及 Golang 实现 (qq.com)
生产者消费者模型及 Golang 实现 (qq.com)
Java实现生产者消费者的三种方式 (qq.com)
goroutine生产者消费者问题 (qq.com)