本文共 4184 字,大约阅读时间需要 13 分钟。
BlockingQueue在生产者-消费者模式下的应用是一个常见的操作,但如果在调用队列的上层代码中使用了同步块,可能会导致线程死锁。这种情况下,如何合理地使用BlockingQueue而避免死锁问题,是一个值得深入探讨的课题。
当我们在生产者和消费者的代码中使用同步块时,可能会出现以下情况:假设生产者线程在执行queue.put("1")操作时,持有了锁定器。在此期间,如果消费者线程也持有了同样的锁定器,并试图执行queue.take()操作,那么这两个线程将会互相等待对方释放锁定器,从而导致死锁。
为了解决这个问题,我们可以采取以下优化策略:
非阻塞式操作:尽量使用非阻塞式操作,如queue.offer()和queue.poll()。这样可以减少持有锁定器的时间,降低死锁风险。
合理使用等待机制:在队列为空或满的情况下,使用等待机制等待适当的状态。这可以通过在队列为空时使用lock.wait()来实现。
减少锁定器的持有时间:避免在一个线程持有锁定器的同时进行长时间的操作,尤其是涉及到I/O或其他可能阻塞的操作。
以下是一个改进后的代码示例:
import com.alibaba.fastjson.JSON;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;public class MultiQueueSynTest { private static final BlockingQueue queue1 = new LinkedBlockingQueue<>(); private static final BlockingQueue queue2 = new LinkedBlockingQueue<>(); private static int seq = 1; private static final Object lock = new Object(); public static void commit(String msg) { synchronized (lock) { Packet packet = new Packet(); packet.setSeq(seq++); packet.setMsg(msg); try { // 非阻塞式添加元素 while (queue1.size() == Integer.MAX_VALUE) { lock.wait(); } queue1.offer(packet); System.out.println("commit msg: " + JSON.toJSONString(packet)); lock.notifyAll(); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void send() { while (true) { synchronized (lock) { try { // 非阻塞式取元素 while (queue1.isEmpty()) { lock.wait(); } Packet packet = queue1.poll(); System.out.println("send msg: " + JSON.toJSONString(packet)); lock.notifyAll(); // 非阻塞式添加元素 while (queue2.size() == Integer.MAX_VALUE) { lock.wait(); } queue2.offer(packet); System.out.println("msg->queue2: " + JSON.toJSONString(packet)); lock.notifyAll(); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void main(String[] args) { // 生产者1 new Thread(() -> { while (true) { commit("hello1"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); // 生产者2 new Thread(() -> { while (true) { commit("hello2"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); // 消费者 new Thread(send::run).start(); } private static class Packet { int seq; String msg; public int getSeq() { return seq; } public void setSeq(int seq) { this.seq = seq; } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } }} 运行结果如下:
commit msg: {"msg":"hello1","seq":1}send msg: {"msg":"hello1","seq":1}msg->queue2: {"msg":"hello1","seq":1}commit msg: {"msg":"hello2","seq":2}send msg: {"msg":"hello2","seq":2}msg->queue2: {"msg":"hello2","seq":2}commit msg: {"msg":"hello1","seq":3}send msg: {"msg":"hello1","seq":3}msg->queue2: {"msg":"hello1","seq":3}commit msg: {"msg":"hello2","seq":4}send msg: {"msg":"hello2","seq":4}msg->queue2: {"msg":"hello2","seq":4}commit msg: {"msg":"hello1","seq":5}send msg: {"msg":"hello1","seq":5}msg->queue2: {"msg":"hello1","seq":5}commit msg: {"msg":"hello2","seq":6}send msg: {"msg":"hello2","seq":6}msg->queue2: {"msg":"hello2","seq":6} 通过上述优化,我们可以看到生产者和消费者能够正常交替工作,避免了死锁问题。生产者线程负责提交消息到队列中,消费者线程负责从队列中取出消息并将其发送到另一个队列中。通过合理使用同步锁和非阻塞式操作,我们成功避免了死锁问题,确保了系统的稳定运行。
转载地址:http://fptoz.baihongyu.com/