
本教程旨在解决java多生产者多消费者并发模型中一个常见问题:当生产者完成任务后,消费者线程仍无限期等待,导致程序无法正常终止。文章将深入分析问题根源,并提供一种通过为消费者设定明确的消费上限来优雅地结束所有线程,从而确保程序能够正确退出的解决方案,并附带详细的代码示例和注意事项。
1. 问题背景与分析
在并发编程中,生产者-消费者模式是一种常见的设计模式,用于解决不同线程之间数据生产和消费的同步问题。通常,生产者负责生成数据并将其放入共享缓冲区,而消费者则从缓冲区取出数据进行处理。为了确保线程安全和资源有效利用,我们常常使用wait()和notify()(或notifyAll())机制进行线程间的协调。
然而,一个常见的问题是,当所有生产者都完成了它们的数据生产任务后,消费者线程可能会因为共享缓冲区为空而持续调用wait(),进入无限期等待状态,导致整个程序无法终止。这通常发生在消费者线程被设计为无限循环(while(true))以等待新数据,而没有明确的退出条件时。
在提供的代码示例中,Producer 类通过一个有限的循环 (for (int i = 1; i <= productionSize; i++)) 来控制生产总量,一旦达到 productionSize,生产者线程就会自然结束。然而,Consumer 类的 run() 方法中包含一个无限循环 (while (true)),这意味着消费者会一直尝试从共享队列中取出数据。当生产者完成所有生产任务后,队列最终会变空,此时消费者会调用 sharedQueue.wait() 并无限期等待下去,因为没有任何生产者会再次 notify() 它。
2. 解决方案:为消费者设定明确的消费上限
解决消费者无限等待问题的核心在于,为消费者线程提供一个明确的终止条件,使其在完成预定任务后能够自行退出,而不是无限期地等待。最直接的方法是像生产者一样,为消费者设定一个预期的消费总量。
立即学习“Java免费学习笔记(深入)”;
2.1 修改 Consumer 类
我们需要在 Consumer 类中引入两个变量:
wants: 表示该消费者期望消费的总商品数量。gets: 记录该消费者已经消费的商品数量。
然后,修改 run() 方法的循环条件,使其在 gets 达到 wants 时终止。
稿定抠图
AI自动消除图片背景
76 查看详情
以下是修改后的 Consumer 类代码:
class Consumer implements Runnable { private List sharedQueue; // 设定每个消费者期望消费的商品数量 // 这个值应该根据总生产量和消费者数量来合理分配 private static int wantsPerConsumer = 5; private int gets = 0; // 记录当前消费者已消费的数量 public Consumer(List sharedQueue) { this.sharedQueue = sharedQueue; } @Override public void run() { // 当已消费数量达到期望值时,消费者线程终止 while (gets = wantsPerConsumer) { return; // 达到上限,直接返回,不再等待 } System.out.println(Thread.currentThread().getName() + ", 队列为空, consumerThread正在等待producerThread生产, sharedQueue's size= 0"); sharedQueue.wait(); } Thread.sleep((long) (Math.random() * 2000)); System.out.println(Thread.currentThread().getName() + ", CONSUMED : " + sharedQueue.remove(0)); // 唤醒等待的生产者或消费者。在多生产者多消费者场景下,notifyAll() 更安全。 // 但对于本例的特定终止逻辑,notify() 也能在大多数情况下工作。 sharedQueue.notify(); } }}
2.2 确定 wantsPerConsumer 的值
wantsPerConsumer 的值需要根据总的生产量和系统中消费者线程的数量来确定。在原代码中:
Producer.productionSize = 5 (每个生产者生产5个商品)。有两个生产者 (producer0, producer1)。总生产量 = 2 * productionSize = 2 * 5 = 10。有两个消费者 (consumer0, consumer1)。
因此,如果每个消费者平均分担消费任务,那么 wantsPerConsumer 应该设置为 总生产量 / 消费者数量 = 10 / 2 = 5。这与修改后的 Consumer 类中 wantsPerConsumer = 5 的设定相符。
3. 完整示例代码
下面是包含了修改后的 Consumer 类的完整代码示例:
import java.util.LinkedList;import java.util.List;// main classpublic class MULTIPLE_ProducerConsumerWaitNotify { public static void main(String args[]) throws InterruptedException { List sharedQueue = new LinkedList(); // Creating shared object // 设定每个生产者的生产数量 int productionSizePerProducer = 5; // 设定生产者数量 int numberOfProducers = 2; // 设定消费者数量 int numberOfConsumers = 2; // 计算总生产量 int totalProduction = productionSizePerProducer * numberOfProducers; // 计算每个消费者期望消费的数量 // 假设任务平均分配,或者总生产量是消费者数量的倍数 int wantsPerConsumer = totalProduction / numberOfConsumers; // 创建生产者线程 Producer producer0 = new Producer(sharedQueue, 0, productionSizePerProducer); Thread producerThread0 = new Thread(producer0, "ProducerThread0"); Producer producer1 = new Producer(sharedQueue, 1, productionSizePerProducer); Thread producerThread1 = new Thread(producer1, "ProducerThread1"); // 创建消费者线程 Consumer consumer0 = new Consumer(sharedQueue, wantsPerConsumer); Thread consumerThread0 = new Thread(consumer0, "ConsumerThread0"); Consumer consumer1 = new Consumer(sharedQueue, wantsPerConsumer); Thread consumerThread1 = new Thread(consumer1, "ConsumerThread1"); // 启动所有线程 producerThread0.start(); producerThread1.start(); consumerThread0.start(); consumerThread1.start(); // 等待所有生产者线程完成 producerThread0.join(); producerThread1.join(); System.out.println("All producers have finished their tasks."); // 在生产者完成任务后,可以考虑使用一个机制通知消费者 // 例如,一个“毒丸”对象,或者在队列为空时,消费者通过计数判断是否退出 // 当前的方案是消费者自行判断是否达到消费上限 // 等待所有消费者线程完成 consumerThread0.join(); consumerThread1.join(); System.out.println("All consumers have finished their tasks."); System.out.println("Program terminated successfully."); }}// Producer classclass Producer implements Runnable { private List sharedQueue; private int maxSize = 4; // maximum number of products which sharedQueue can hold at a time. private int productionSize; // Total no of items to be produced by THIS producer int producerNo; public Producer(List sharedQueue, int producerNo, int productionSize) { this.sharedQueue = sharedQueue; this.producerNo = producerNo; this.productionSize = productionSize; } @Override public void run() { for (int i = 1; i <= productionSize; i++) { // produce products. try { produce(i); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // 恢复中断状态 System.err.println(Thread.currentThread().getName() + " was interrupted."); break; // 捕获中断异常后退出循环 } } System.out.println(Thread.currentThread().getName() + " has finished producing " + productionSize + " items and is terminating."); } private void produce(int i) throws InterruptedException { synchronized (sharedQueue) { // if sharedQuey is full wait until consumer consumes. while (sharedQueue.size() == maxSize) { System.out.println(Thread.currentThread().getName() + ", 队列已满, producerThread正在等待consumerThread消费, sharedQueue's size= " + maxSize); sharedQueue.wait(); } // Bcz each producer must produce unique product // Ex= producer0 will produce 1-5 and producer1 will produce 6-10 in random order int producedItem = (this.productionSize * producerNo) + i; System.out.println(Thread.currentThread().getName() + " Produced : " + producedItem); sharedQueue.add(producedItem); Thread.sleep((long) (Math.random() * 1000)); sharedQueue.notify(); // 唤醒等待的消费者 } }}// Consumer class (Modified)class Consumer implements Runnable { private List sharedQueue; private int wantsToConsume; // 这个消费者期望消费的总数量 private int gets = 0; // 记录当前消费者已消费的数量 public Consumer(List sharedQueue, int wantsToConsume) { this.sharedQueue = sharedQueue; this.wantsToConsume = wantsToConsume; } @Override public void run() { // 当已消费数量达到期望值时,消费者线程终止 while (gets = wantsToConsume) { return; // 达到上限,直接返回,不再等待 } System.out.println(Thread.currentThread().getName() + ", 队列为空, consumerThread正在等待producerThread生产, sharedQueue's size= 0"); sharedQueue.wait(); } // 成功从队列中移除元素,表示一次有效消费 Thread.sleep((long) (Math.random() * 2000)); System.out.println(Thread.currentThread().getName() + ", CONSUMED : " + sharedQueue.remove(0)); gets++; // 只有在实际消费后才增加计数 sharedQueue.notify(); // 唤醒等待的生产者 } }}
重要更新: 在上述 Consumer 类中,gets++ 的位置从 run() 方法中移到了 consume() 方法的 synchronized 块内部,紧跟在 sharedQueue.remove(0) 之后。这是为了确保只有在实际成功消费(即从队列中取出一个元素)之后,gets 计数器才会被递增,从而更准确地反映消费情况并避免潜在的计数错误。同时,在 while (sharedQueue.isEmpty()) 循环内部增加了对 gets >= wantsToConsume 的检查,这确保了即使在等待状态下,如果消费者已经满足了其消费数量,它也能及时退出等待,不再被无谓地唤醒。
4. 注意事项与总结
总消费量与总生产量匹配:确保所有消费者期望消费的总量与所有生产者生产的总量相匹配。如果不匹配,可能会导致部分商品未被消费,或者部分消费者因无商品可消费而继续等待(如果它们没有明确的退出条件)。notify() vs notifyAll():在多生产者多消费者的复杂场景中,通常推荐使用 notifyAll() 来唤醒所有等待的线程,让它们重新评估条件。notify() 只唤醒一个随机等待的线程,可能导致“惊群效应”或“死锁”风险(例如,唤醒了一个生产者,但队列已满,而真正需要被唤醒的消费者仍在等待)。在本例中,由于我们为消费者设定了明确的退出条件,notify() 也能工作,但作为最佳实践,notifyAll() 更为健壮。中断处理:在 try-catch 块中捕获 InterruptedException 时,最佳实践是调用 Thread.currentThread().interrupt() 来重新设置中断状态,并通常选择退出循环或线程,以便外部代码能够感知到中断请求。优雅退出:通过为消费者设定明确的消费上限,我们实现了程序的优雅退出。当所有生产者完成生产,所有消费者也完成了它们的消费任务后,所有线程都会自然终止,main 方法中的 join() 调用将确保程序在所有工作线程结束后才最终退出。“毒丸”机制(Poison Pill):除了设定消费上限,另一种常用的优雅退出机制是“毒丸”对象。当生产者完成所有任务后,可以向队列中放入一个特殊的“毒丸”对象。消费者在取出对象时,如果识别到是“毒丸”,就处理完队列中剩余的正常数据后,自己也终止,并可能将“毒丸”传递给下一个消费者(如果存在多个消费者)。这种方法在总生产量不确定或需要更灵活的终止策略时非常有用。
通过上述修改,我们成功解决了Java多生产者多消费者模型中消费者无限等待导致程序无法终止的问题,确保了并发程序的健壮性和正确性。
以上就是Java多生产者多消费者模型:解决消费者无限等待导致的程序终止问题的详细内容,更多请关注创想鸟其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1021728.html
微信扫一扫
支付宝扫一扫