【muduo】base篇---BlockingQueue和BounderBlockingQueue
文章目录
一、经典的生产者和消费者模型
二、BlockingQueue简介(普通队列)
BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。
无界阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示:
多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。
三、BlockingQueue源码
#ifndef MUDUO_BASE_BLOCKINGQUEUE_H
#define MUDUO_BASE_BLOCKINGQUEUE_H
#include <muduo/base/Condition.h>
#include <muduo/base/Mutex.h>
#include <deque>
#include <assert.h>
namespace muduo
{
template<typename T>
class BlockingQueue : noncopyable
{
public:
BlockingQueue()
: mutex_(),
notEmpty_(mutex_),
queue_()
{
}
// 向缓冲区中添加任务
void put(const T& x)
{
MutexLockGuard lock(mutex_);
queue_.push_back(x);
notEmpty_.notify(); // 通知其他线程当前缓冲区不为空(可以来取任务了)
}
// 移动拷贝函数
void put(T&& x)
{
MutexLockGuard lock(mutex_);
queue_.push_back(std::move(x));
notEmpty_.notify();
}
// 向缓冲区取任务
T take()
{
MutexLockGuard lock(mutex_);
while (queue_.empty()) // 避免虚假唤醒
{
notEmpty_.wait(); // 等待缓冲区不为空的条件(缓冲区如果空则无法取任务)
}
assert(!queue_.empty());
T front(std::move(queue_.front())); // 取队头元素
queue_.pop_front();
return std::move(front);
}
// 返回缓冲区的大小
size_t size() const
{
MutexLockGuard lock(mutex_);
return queue_.size();
}
private:
mutable MutexLock mutex_;
Condition notEmpty_ GUARDED_BY(mutex_);
std::deque<T> queue_ GUARDED_BY(mutex_);
};
} // namespace muduo
#endif // MUDUO_BASE_BLOCKINGQUEUE_H
四、BounderBlockingQueue简介(环形队列)
【有界阻塞队列的两个常见阻塞场景】:
当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。
当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。
五、BounderBlockingQueue源码
#ifndef MUDUO_BASE_BOUNDEDBLOCKINGQUEUE_H
#define MUDUO_BASE_BOUNDEDBLOCKINGQUEUE_H
#include <muduo/base/Condition.h>
#include <muduo/base/Mutex.h>
#include <boost/circular_buffer.hpp>
#include <assert.h>
namespace muduo
{
template<typename T>
class BoundedBlockingQueue : noncopyable
{
public:
explicit BoundedBlockingQueue(int maxSize)
: mutex_(),
notEmpty_(mutex_),
notFull_(mutex_),
queue_(maxSize)
{
}
void put(const T& x)
{
MutexLockGuard lock(mutex_);
while (queue_.full())
{
notFull_.wait(); // 等待缓冲区不为满的条件(缓冲区如果满则无法添加任务)
}
assert(!queue_.full());
queue_.push_back(x); // 如果缓冲区不为满,则可以向缓冲区中添加任务
notEmpty_.notify(); // 通知其他线程当前缓冲区不为空(可以来取任务了)
}
T take()
{
MutexLockGuard lock(mutex_);
while (queue_.empty())
{
notEmpty_.wait(); // 等待缓冲区不为空的条件(缓冲区如果空则无法取任务)
}
assert(!queue_.empty());
T front(queue_.front()); // 取队头任务
queue_.pop_front();
notFull_.notify(); // 通知其他线程当前缓冲区不为满(可以添加任务了)
return front;
}
bool empty() const
{
MutexLockGuard lock(mutex_);
return queue_.empty(); // 队列为空
}
bool full() const
{
MutexLockGuard lock(mutex_);
return queue_.full(); // 队列满
}
size_t size() const
{
MutexLockGuard lock(mutex_);
return queue_.size(); // 队列当前有多少元素
}
size_t capacity() const
{
MutexLockGuard lock(mutex_);
return queue_.capacity(); // 队列的容量
}
private:
mutable MutexLock mutex_;
Condition notEmpty_ GUARDED_BY(mutex_);
Condition notFull_ GUARDED_BY(mutex_);
// 使用了boost库的环形缓冲区
boost::circular_buffer<T> queue_ GUARDED_BY(mutex_);
};
} // namespace muduo
#endif // MUDUO_BASE_BOUNDEDBLOCKINGQUEUE_H