首页
登录 | 注册

生产者消费者模式与实际应用

1. 前言

目前的项目需要在用户提交订单后,将订单放入队列,并由专门处理订单的线程对队列中的订单做处理,由此便引入了生产者与消费者模式。

2. 生产者与消费者模式概述

生产者与消费者模式是通过一个容器来解决生产者与消费者的强耦合关系,生产者与消费者之间不直接进行通讯,而是利用阻塞队列来进行通讯,生产者生成数据后直接丢给阻塞队列,消费者需要数据则从阻塞队列获取,实际应用中,生产者与消费者模式则主要解决生产者与消费者生产与消费的速率不一致的问题,达到平衡生产者与消费者的处理能力,而阻塞队列则相当于缓冲区。

这就好比去学校食堂吃饭,食堂大妈并不会等你点了餐之后才立马去给你做,而是先把饭菜做好了放在食堂窗口等你来自己取,而这个窗口就相当于阻塞队列,食堂大妈就是生产者,而学生就是消费者。

整个生产者与消费者模型大致如下图所示。

生产者消费者模式与实际应用

当然,实际情况可能需要在消费者处理完之后又作为生产者把数据放进新的队列由其他消费者处理。

3. 应用与实现

回归原项目,原项目的应用场景大致为:用户提交订单,订单进入引擎的阻塞队列中,由专门的线程从阻塞队列中获取数据并处理,接下来将使用此场景模拟生产者与消费者模式的实现。

本模拟代码主要包括生产者线程、消费者线程、引擎(包含阻塞队列)、订单类等,详细代码如下。

/**
 * 生产者
 */
public class ProducerThread implements Runnable  {
    private Engine engine;

    public ProducerThread(Engine engine) {
        this.engine = engine;
    }

    @Override
    public void run() {
        // 模拟像引擎中加入100条订单
        for (int i = 0; i < 100; i ++) {
            engine.addOrder(produceOrder());

            try {
                // 模拟生成订单时间
                Thread.sleep(100 * (number % 10));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }

    /** 产生订单 */
    private Order produceOrder() {
        Order order = new Order();
        order.setCommodityId("id_" + number);
        order.setCommodityName("name_" + number);
        order.setPrice(new Double(number));
        order.setQuantity(number);

        number ++;

        return order;
    }

    private int number = 0;
}
/**
 * 消费者
 */
public class ConsumerThread implements Runnable {
    private Engine engine;

    public ConsumerThread(Engine engine) {
        this.engine = engine;
    }

    @Override
    public void run() {
        // 循环处理订单
        while (true) {
            Order order = engine.getOrder();
            disposeOrder(order);
        }
    }

    private void disposeOrder(Order order) {
        System.out.println("处理订单信息[商品ID:" + order.getCommodityId() + ", 商品名称:" + order.getCommodityName() + 
                ", 商品价格:" + order.getPrice() + ", 商品数量:" + order.getQuantity());
    }

}
/**
 * 引擎
 */
public class Engine {
    private BlockingQueue<Order> qrderQueue = new LinkedBlockingQueue<Order>();

    /** 
     * 获取订单<br>
     * - 如果订单队列为空将一直等待,直到有新的订单或者被中断 
     */
    public Order getOrder() {
        Order order = null;

        try {
            order = qrderQueue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return order;
    }

    /**
     * 添加订单<br>
     * - 如果订单队列已满将一直等待,直到队列有可用空间或者被中断
     */
    public void addOrder(Order order) {
        try {
            qrderQueue.put(order);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}
/**
 * 订单
 */
public class Order implements Serializable {

    /** 商品ID */
    private String commodityId;

    /** 商品名称 */
    private String commodityName;

    /** 价格 */
    private Double price;

    /** 数量 */
    private Integer quantity;

    public String getCommodityId() {
        return commodityId;
    }

    public void setCommodityId(String commodityId) {
        this.commodityId = commodityId;
    }

    public String getCommodityName() {
        return commodityName;
    }

    public void setCommodityName(String commodityName) {
        this.commodityName = commodityName;
    }

    public Double getPrice() {
        return price;
    }

    public void setPrice(Double price) {
        this.price = price;
    }

    public Integer getQuantity() {
        return quantity;
    }

    public void setQuantity(Integer quantity) {
        this.quantity = quantity;
    }

    private static final long serialVersionUID = 4826685511830052034L;

}

接下便是测试类了,这里只开了一个线程,主要模拟生产者与消费者速率不一致的问题。

public class Test {
    public static void main(String[] args) {
        Engine engine = new Engine();
        ProducerThread pt = new ProducerThread(engine);
        ConsumerThread ct = new ConsumerThread(engine);

        Thread pth = new Thread(pt);
        pth.setName("生产者线程");

        Thread cth = new Thread(ct);
        pth.setName("消费者线程");

        pth.start();
        cth.start();
    }
}

4. 总结

针对原先使用轮询的方式处理队列中的订单方式,无疑生产者与消费者模式将更加节约CPU资源,这在多个消费者同时处理队列中的数据时会体现得更加明显。

而在实际应用中,可能会面临更负责的问题,选择何种设计模式也需要更具具体情形去考虑。



2020 jeepxie.net webmaster#jeepxie.net
10 q. 0.009 s.
京ICP备10005923号