生产者-消费者问题

2025年3月17日 | 阅读 8 分钟

生产者-消费者问题是一个经典的**多进程同步问题**,即我们试图在多个进程之间实现同步。

在生产者-消费者问题中,只有一个生产者,它在生产一些物品,而只有一个消费者,它在消耗生产者生产的物品。生产者和消费者共享同一个固定大小的内存缓冲区。

生产者的任务是生产物品,将其放入内存缓冲区,然后再次开始生产物品。而消费者的任务是从内存缓冲区中消耗物品。

让我们来理解一下这是什么问题?

以下几点被认为是生产者-消费者问题中出现的潜在问题:

  • 生产者应该仅在缓冲区未满时生产数据。如果发现缓冲区已满,则不允许生产者将任何数据存储到内存缓冲区中。
  • 消费者只有在内存缓冲区非空时才能消耗数据。如果发现缓冲区为空,则不允许消费者使用内存缓冲区中的任何数据。
  • 生产者和消费者不应同时访问内存缓冲区。

让我们看看上述问题的代码。

生产者代码

Producer-Consumer problem

生产者代码

Producer-Consumer problem

让我们来理解上面的生产者和消费者代码。

在开始解释代码之前,先理解代码中使用的一些术语。

  1. 生产者代码中的"in"表示下一个空缓冲区
  2. 消费者代码中的"out"表示第一个已填满的缓冲区
  3. count 记录缓冲区中元素的数量。
  4. count 在生产者和消费者代码的块中被进一步细分为三行代码。

如果先谈论生产者代码:

--Rp 是一个寄存器,用于保存 m[count] 的值。

--Rp 被递增(因为元素已添加到缓冲区)。

--Rp 的递增值被存回 m[count]。

同样,如果接下来谈论消费者代码:

--Rc 是一个寄存器,用于保存 m[count] 的值。

--Rc 被递减(因为元素已从缓冲区中移除)。

--Rc 的递减值被存回 m[count]。

缓冲区

Producer-Consumer problem
Producer-Consumer problem

从图示中可以看出:缓冲区总共有 8 个空间,其中前 5 个已填满,in = 5(指向下一个空位置),out = 0(指向第一个已填满的位置)。

我们开始讲生产者,它想生产一个元素“F”。根据代码,它将进入 producer() 函数,while(1) 始终为真,将尝试插入 itemP = F 到缓冲区中,在此之前,while(count == n) 将求值为 False。

注意:while 循环后的分号将阻止代码继续执行,如果条件为 True(即无限循环/缓冲区已满)。

Buffer[in] = itemP → Buffer[5] = F。(现在已插入 F)

in = (in + 1) mod n → (5 + 1) mod 8 → 6,因此 in = 6;(下一个空缓冲区)。

插入 F 后,缓冲区看起来像这样:

此时 out = 0,但 in = 6。

Producer-Consumer problem

由于 count = count + 1; 被分成三个部分:

Load Rp, m[count] → 将 count 值(即 5)复制到寄存器 Rp。

Increment Rp → 将 Rp 递增到 6。

假设在递增之后,并且在执行第三行(store m[count], Rp)之前,发生了上下文切换,代码跳转到消费者代码。。。

消费者代码

现在开始讲消费者,它想消耗第一个元素“A”。根据代码,它将进入 consumer() 函数,while(1) 始终为真,while(count == 0) 将求值为 False(因为 count 仍然是 5,不等于 0)。

注意:while 循环后的分号将阻止代码继续执行,如果条件为 True(即无限循环/缓冲区中没有元素)。

itemC = Buffer[out] → itemC = A(因为 out 是 0)。

out = (out + 1) mod n → (0 + 1) mod 8 → 1,因此 out = 1(第一个已填满的位置)。

A 已被移除。

移除 A 后,缓冲区看起来像这样:

此时 out = 1,in = 6。

Producer-Consumer problem

由于 count = count - 1; 被分成三个部分:

Load Rc, m[count] → 将 count 值(即 5)复制到寄存器 Rp。

Decrement Rc → 将 Rc 递减到 4。

store m[count], Rc → count = 4。

现在 count 的当前值为 4。

假设在这之后发生了上下文切换,回到生产者代码剩余的部分。。。

由于在生产者代码中的上下文切换发生在递增之后,并且在执行第三行(store m[count], Rp)之前。

所以我们从这里恢复,因为 Rp 持有已递增的值 6。

因此,store m[count], Rp → count = 6。

现在 count 的当前值为 6,这是错误的,因为缓冲区只有 5 个元素。这种情况称为竞态条件,问题是生产者-消费者问题。

使用信号量解决生产者-消费者问题

生产者和消费者出现的上述问题,由于上下文切换和产生不一致的结果,可以通过信号量来解决。

为了解决上面出现的竞态条件问题,我们将使用二元信号量和计数信号量

二元信号量(Binary Semaphore):在二元信号量中,任何时候最多只有两个进程可以竞争进入其临界区,除此之外,还保持了互斥条件。

计数信号量(Counting Semaphore):在计数信号量中,任何时候可以有多个进程竞争进入其临界区,除此之外,还保持了互斥条件。

信号量(Semaphore):信号量是 S 中的一个整数变量,除了初始化外,它只能通过两个标准的原子操作——wait 和 signal 来访问,其定义如下:


从上面 wait 的定义可以看出,如果 S 的值 <= 0,则它将进入无限循环(因为 while 循环后有分号;)。而 signal 的作用是增加 S 的值。

让我们看看使用信号量(二元和计数信号量)解决生产者和消费者问题的代码。

生产者代码-解决方案

消费者代码-解决方案

让我们来理解上面的生产者和消费者代码的解决方案。

在开始解释代码之前,先理解代码中使用的一些术语。

  1. 生产者代码中的"in"表示下一个空缓冲区
  2. 消费者代码中的"out"表示第一个已填满的缓冲区
  3. "empty" 是计数信号量,它记录空缓冲区的数量。
  4. "full" 是计数信号量,它记录已填满缓冲区的数量。
  5. "S" 是一个二元信号量,代表缓冲区

如果我们看看缓冲区当前的状况:

Producer-Consumer problem

S = 1(二元信号量的初始值)

in = 5(下一个空缓冲区)

out = 0(第一个已填满的缓冲区)

Producer-Consumer problem

从图示中可以看出:缓冲区总共有 8 个空间,其中前 5 个已填满,in = 5(指向下一个空位置),out = 0(指向第一个已填满的位置)。

生产者代码中使用的信号量:

6. wait(empty) 将计数信号量变量 empty 的值减 1。也就是说,当生产者生产一个元素时,缓冲区中的可用空间数量会自动减少 1。如果缓冲区已满,即计数信号量变量 "empty" 的值为 0,则 wait(empty); 将根据 wait 的定义阻止进程,不允许其继续前进。

7. wait(S) 将二元信号量变量 S 减至 0,以便任何其他试图进入其临界区的进程都不能进入。

8. signal(s) 将二元信号量变量 S 增至 1,以便其他试图进入其临界区的进程现在可以进入。

9. signal(full) 将计数信号量变量 full 增 1。因为在缓冲区中添加一个项目后,缓冲区中的一个空间被占用,所以 full 变量必须更新。

生产者代码中使用的信号量:

10. wait(full) 将计数信号量变量 full 的值减 1。也就是说,当消费者消耗一个元素时,缓冲区中已填满的空间数量会自动减少 1。如果缓冲区为空,即计数信号量变量 full 的值为 0,则 wait(full); 将根据 wait 的定义阻止进程,不允许其继续前进。

11. wait(S) 将二元信号量变量 S 减至 0,以便任何其他试图进入其临界区的进程都不能进入。

12. signal(S) 将二元信号量变量 S 增至 1,以便其他试图进入其临界区的进程现在可以进入。

13. signal(empty) 将计数信号量变量 empty 增 1。因为从缓冲区移除一个项目后,缓冲区中出现一个空闲空间,所以 empty 变量必须相应地更新。

生产者代码

让我们开始讲 producer(),它想生产一个元素 "F",根据代码,它将进入 producer() 函数。

wait(empty); 将 empty 的值减 1,即 empty = 2。

假设在此之后发生上下文切换,并跳转到 consumer 代码。

消费者代码

现在开始讲 consumer,它想消耗第一个元素 "A"。根据代码,它将进入 consumer() 函数。

wait(full); 将 full 的值减 1,即 full = 4。

wait (S); 将 S 的值减至 0。

itemC = Buffer[out]; → itemC = A(因为 out 是 0)。

A 已被移除。

out = (out + 1) mod n → (0 + 1) mod 8 → 1,因此 out = 1(第一个已填满的位置)。

Producer-Consumer problem

S = 0(二元信号量的值)。

in = 5(下一个空缓冲区)

out = 1(第一个已填满的缓冲区)。

假设在此之后发生上下文切换,回到 producer 代码。

由于 producer() 的下一条指令是 wait(S);,这将阻止 producer 进程,因为 S 的当前值为 0,而 wait(0); 是一个无限循环:根据 wait 的定义,因此 producer 无法继续前进。

因此,我们接下来回到 consumer 进程的下一条指令。

signal(S); 现在将 S 的值增至 1。

signal(empty); 将 empty 增 1,即 empty = 3。

Producer-Consumer problem

现在回到 producer() 代码;

由于 producer() 的下一条指令是 wait(S);,它将成功执行,因为 S 现在是 1,它会将 S 的值减 1,即 S = 0。

Buffer[in] = itemP; → Buffer[5] = F。(现在已插入 F)。

in = (in + 1) mod n → (5 + 1) mod 8 → 6,因此 in = 6;(下一个空缓冲区)。

signal(S); 将 S 增 1。

signal(full); 将 full 增 1,即 full = 5。

Producer-Consumer problem

现在将 full 和 empty 的当前值相加,即 full + empty = 5 + 3 = 8(这绝对没问题)。即使经过多次上下文切换,也没有产生不一致的结果。但在之前没有信号量的生产者和消费者条件下,我们在上下文切换时会看到不一致的结果。

这就是生产者消费者问题的解决方案。