1.模型介绍
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
1.生产者之间互斥(因为生产数据在仓库中保存是覆盖式的);消费者之间互斥(因为消费数据不能像之前抢票一样出现线程安全问题);消费者与生产者之间互斥同步(互斥:防止消费者消费一般生产者覆盖数据;同步:使得消费者与生产者之间不出现反复确认是否需要数据)
2.优点:
解耦(消费者与生产者之间有缓冲,不强相关联)
支持忙闲不均(生产者可以在消费者不消费时生产;同理消费者也可以)
提高效率(之后解释)
2.条件变量
1.线程的执行遵循: 加锁 -- 判断执行条件 -- 执行 -- 解锁
2.不过,光光是判断条件是不对的,因为某个执行流一定会出现不断加锁,判断条件,条件不满足,解锁,继续加锁如此循环往复的动作。那么我们是否存在解决的手段呢?答案是肯定的,只需要加入条件变量。条件变量就是一个类似于排队等待的数据结构,当线程进入执行时,一旦判断执行条件失败,此刻我们就可以通过所谓的条件变量使得线程解锁后进行阻塞等待,直到条件能使得线程继续进行执行。
初始化和销毁
等待
pthread_wait的第二个参数一定是被使用的锁,并且当等待时,该cond会将当前的锁释放,交给其他线程执行。一旦被重新唤起,线程将会重新在此处进行执行,并且同时得到锁
释放
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond = PTHREAD_COND_INITIALIZER; int tickets = 1000; void *start_routine(void *args) { std::string name = static_cast<const char *>(args); while (true) { pthread_mutex_lock(&mutex); pthread_cond_wait(&cond, &mutex); std::cout << name << "->" << tickets << std::endl; tickets--; pthread_mutex_unlock(&mutex); } } int main() { pthread_t t[5]; for (int i = 0; i < 5; i++) { char *name = new char[]; snprintf(name, , "thread %d", i + 1); pthread_create(t + i, nullptr, start_routine, (void *)name); } while (true) { sleep(1); pthread_cond_signal(&cond); } for (int i = 0; i < 5; i++) { pthread_join(t[i], nullptr); } return 0; }
访问有明显的替换。
3.实现生产消费模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
const static int gmaxcap=500; template<class T> class BlockQueue { public: BlockQueue(const int &maxcap=gmaxcap) :_maxcap(gmaxcap) { pthread_mutex_init(&_mutex,nullptr); pthread_cond_init(&_pcond,nullptr); pthread_cond_init(&_ccond,nullptr); } void push(const T &in) //输入型参数 const& { pthread_mutex_lock(&_mutex); while(is_full()) //伪唤醒,使用while进行持续判断 { pthread_cond_wait(&_pcond,&_mutex); //生产条件不满足,生产者进行等待 } //在这里一定不满 _q.push(in); //此时一定有数据,所以唤醒消费者访问 pthread_cond_signal(&_ccond); pthread_mutex_unlock(&_mutex); sleep(1); } void pop(T* out) //输出型参数 * 输入输出型为 & { pthread_mutex_lock(&_mutex); while(is_empty()) { pthread_cond_wait(&_ccond,&_mutex); //生产条件不满足,生产者进行等待 } //在这里一定不为空 *out = _q.front(); _q.pop(); //此时一定有空间,所以唤醒生产者访问 pthread_cond_signal(&_pcond); pthread_mutex_unlock(&_mutex); } ~BlockQueue() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_pcond); pthread_cond_destroy(&_ccond); } private: bool is_empty() { return _q.empty(); } bool is_full() { return _q.size()==_maxcap; } private: std::queue<T> _q; int _maxcap; //队列元素上限 pthread_mutex_t _mutex; pthread_cond_t _pcond; //生产者对应的条件变量 pthread_cond_t _ccond; //消费者对应的条件变量 };
需要注意的就是判断条件变量时是需要循环判断的,以免由于多线程出现公共数据访问的问题。
提高效率问题
1.如果消费者和生产者之间互相制约,使得其效率并没有起到什么作用,依然是串行执行的。但是所谓的生产与消费并不是只有这个模型下的工作。
2.生产者线程构建任务需要花费时间,同理消费者线程完成任务需要花费时间。我们需要花费的任务可能十分复杂。那么其实当线程拿到数据后,其他线程也可以拿其他数据,相互不影响,并发的执行提高了效率。
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- huatuo9.cn 版权所有 赣ICP备2023008801号-1
违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务