【muduo】base篇---BlockingQueue和BounderBlockingQueue

一、经典的生产者和消费者模型

【muduo】base篇---BlockingQueue和BounderBlockingQueue


二、BlockingQueue简介(普通队列)

  BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。

  无界阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示:
【muduo】base篇---BlockingQueue和BounderBlockingQueue

  多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。

【muduo】base篇---BlockingQueue和BounderBlockingQueue


三、BlockingQueue源码

#ifndef MUDUO_BASE_BLOCKINGQUEUE_H
#define MUDUO_BASE_BLOCKINGQUEUE_H

#include <muduo/base/Condition.h>
#include <muduo/base/Mutex.h>

#include <deque>
#include <assert.h>

namespace muduo
{

template<typename T>
class BlockingQueue : noncopyable
{
 public:
  BlockingQueue()
    : mutex_(),
      notEmpty_(mutex_),
      queue_()
  {
  }

  // 向缓冲区中添加任务
  void put(const T& x)
  {
    MutexLockGuard lock(mutex_);
    queue_.push_back(x);
    notEmpty_.notify(); // 通知其他线程当前缓冲区不为空(可以来取任务了)
  }

  // 移动拷贝函数
  void put(T&& x)
  {
    MutexLockGuard lock(mutex_);
    queue_.push_back(std::move(x));
    notEmpty_.notify();
  }

  // 向缓冲区取任务
  T take()
  {
    MutexLockGuard lock(mutex_);
    while (queue_.empty()) // 避免虚假唤醒
    {
      notEmpty_.wait(); // 等待缓冲区不为空的条件(缓冲区如果空则无法取任务)
    }
    assert(!queue_.empty());
    T front(std::move(queue_.front())); // 取队头元素
    queue_.pop_front();
    return std::move(front);
  }

  // 返回缓冲区的大小
  size_t size() const
  {
    MutexLockGuard lock(mutex_);
    return queue_.size();
  }

 private:
  mutable MutexLock mutex_;
  Condition         notEmpty_ GUARDED_BY(mutex_);
  std::deque<T>     queue_ GUARDED_BY(mutex_);
};

}  // namespace muduo

#endif  // MUDUO_BASE_BLOCKINGQUEUE_H

四、BounderBlockingQueue简介(环形队列)

【有界阻塞队列的两个常见阻塞场景】:

【muduo】base篇---BlockingQueue和BounderBlockingQueue
  当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。
【muduo】base篇---BlockingQueue和BounderBlockingQueue
  当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒
【muduo】base篇---BlockingQueue和BounderBlockingQueue


五、BounderBlockingQueue源码

#ifndef MUDUO_BASE_BOUNDEDBLOCKINGQUEUE_H
#define MUDUO_BASE_BOUNDEDBLOCKINGQUEUE_H

#include <muduo/base/Condition.h>
#include <muduo/base/Mutex.h>

#include <boost/circular_buffer.hpp>
#include <assert.h>

namespace muduo
{

template<typename T>
class BoundedBlockingQueue : noncopyable
{
 public:
  explicit BoundedBlockingQueue(int maxSize)
    : mutex_(),
      notEmpty_(mutex_),
      notFull_(mutex_),
      queue_(maxSize)
  {
  }

  void put(const T& x)
  {
    MutexLockGuard lock(mutex_);
    while (queue_.full())
    {
      notFull_.wait(); // 等待缓冲区不为满的条件(缓冲区如果满则无法添加任务)
    }
    assert(!queue_.full());
    queue_.push_back(x); // 如果缓冲区不为满,则可以向缓冲区中添加任务
    notEmpty_.notify(); // 通知其他线程当前缓冲区不为空(可以来取任务了)
  }

  T take()
  {
    MutexLockGuard lock(mutex_);
    while (queue_.empty())
    {
      notEmpty_.wait(); // 等待缓冲区不为空的条件(缓冲区如果空则无法取任务)
    }
    assert(!queue_.empty());
    T front(queue_.front()); // 取队头任务
    queue_.pop_front();
    notFull_.notify(); // 通知其他线程当前缓冲区不为满(可以添加任务了)
    return front;
  }

  bool empty() const
  {
    MutexLockGuard lock(mutex_);
    return queue_.empty(); // 队列为空
  }

  bool full() const
  {
    MutexLockGuard lock(mutex_);
    return queue_.full(); // 队列满
  }

  size_t size() const
  {
    MutexLockGuard lock(mutex_);
    return queue_.size(); // 队列当前有多少元素
  }

  size_t capacity() const
  {
    MutexLockGuard lock(mutex_);
    return queue_.capacity(); // 队列的容量
  }

 private:
  mutable MutexLock          mutex_;
  Condition                  notEmpty_ GUARDED_BY(mutex_);
  Condition                  notFull_ GUARDED_BY(mutex_);
  // 使用了boost库的环形缓冲区
  boost::circular_buffer<T>  queue_ GUARDED_BY(mutex_); 
};

}  // namespace muduo

#endif  // MUDUO_BASE_BOUNDEDBLOCKINGQUEUE_H