首页 > 其他分享 >一种有界队列(Bounded Buffer)的实现

一种有界队列(Bounded Buffer)的实现

时间:2022-12-06 02:33:07浏览次数:40  
标签:std 线程 buffer Buffer 队列 Bounded position buffers size

一、概述

在有 CPUGPU 参与的一种运算中,比如深度学习推理,CPU 需要预处理数据,然后交给 GPU 处理,最后 CPU 对 GPU 的运算结果进行后处理
在整个过程中都是 FIFO,即数据 ABC 按顺序输入,也需要按 A'B'C' 顺序输出。
如果采用同步阻塞的方式,在 CPU 预处理时 GPU 处于空闲状态,GPU 运算时 CPU 后处理处于空闲状态并且也不能进行后续数据的预处理。这样影响整体的吞吐。
期望是 GPU 运算时,CPU 可以同时进行数据预处理和后处理。这是典型的单生产者单消费者模式。

在两个线程之间传递数据时,为确保线程安全,可以在一个线程每次 mallocnew 申请内存,在另一个线程 freedelete。为了避免频繁的内存分配和释放,需要使用到内存池。

本文描述采用有界队列实现内存池,适用场景和限制:

  1. 需要把内存使用控制在一定范围内;
  2. 整个过程不允许丢弃数据;
  3. 生产和消费之间线程安全;
  4. 不会(也不允许)同时生产,不会(也不允许)同时消费。如果确实要多线程生产或多线程消费,调用代码自行确保线程安全。

二、实现

// File: bounded_buffer.h
#pragma once
#include <cstddef>
#include <functional>
#include <mutex>
#include <string>
#include <thread>

/*
 * @Description: BoundedBuffer。Produce 和 Consume 方法不是线程安全的。使用不同线程或确保线程安全地调用 Produce 和 Consume 方法。
 */
class BoundedBuffer
{
public:
  BoundedBuffer(const std::string& name, size_t buffers_capacity_, size_t buffer_size_max);
  ~BoundedBuffer();
  BoundedBuffer(const BoundedBuffer& rhs)            = delete;
  BoundedBuffer& operator=(const BoundedBuffer& rhs) = delete;

public:
  /**
   * @description: 生产。非线程安全,两个及以上线程调用 Produce 可能会导致脏写。
   * @param {function<void(void*)>} func
   * @return {void}
   */
  void Produce(std::function<void(void*)> func);

  /**
   * @description: 消费。非线程安全,两个及以上线程调用 Consume 可能会导致读取到同一份数据。
   * @param {function<void(void*)>} func
   * @return {void}
   */
  void Consume(std::function<void(void*)> func);

private:
  std::string _name;

  // 内存池
  void** _buffers;
  // 内存池容量
  size_t _buffers_capacity;
  // 内存块最大长度
  size_t _buffer_size_max;
  // 保护内存池
  std::mutex _buffers_mtx;
  // 内存池是否有可用的 slot (非满则可以写数据)
  std::condition_variable _buffers_not_full_cond;
  // 内存池是否非空 (非空则可以读数据)
  std::condition_variable _buffers_not_empty_cond;
  // 内存池将会读取的位置
  size_t _buffers_read_position;
  // 内存池当前可写入的位置
  size_t _buffers_write_position;
};
// File: bounded_buffer.cpp
#include "bounded_buffer.h"
#include <assert.h>

BoundedBuffer::BoundedBuffer(const std::string& name, size_t buffers_capacity, size_t buffer_size_max)
  : _name(name), _buffers_capacity(buffers_capacity), _buffer_size_max(buffer_size_max), _buffers_read_position(0), _buffers_write_position(0)
{
  assert(buffers_capacity > 1);
  assert(buffer_size_max > 0);
  _buffers = static_cast<void**>(std::malloc(sizeof(void*) * buffers_capacity));
  std::memset(_buffers, 0, sizeof(void*) * buffers_capacity);
}

BoundedBuffer::~BoundedBuffer()
{
  for (auto i = 0; i < _buffers_capacity; i++)
  {
    if (_buffers[i])
    {
      std::free(_buffers[i]);
     _buffers[i] = nullptr;
    }
  }
  std::free(_buffers);
  _buffers = nullptr;
}

void BoundedBuffer::Produce(std::function<void(void*)> func)
{
  std::unique_lock<std::mutex> buffers_lock(_buffers_mtx);
  // 等待可写 slot。要确保本次写入后,下次有写入位置,所以 +1。
  _buffers_not_full_cond.wait(buffers_lock, [&] { return ((_buffers_write_position + 1) % _buffers_capacity) != _buffers_read_position; });
  // 有可写 slot 马上释放。因为 func 可能是耗时操作,防止过久阻塞 Consume 造成有可读 slot 而无法读。
  buffers_lock.unlock();
  if (!_buffers[_buffers_write_position])
  {
    _buffers[_buffers_write_position] = std::malloc(_buffer_size_max);
  }
  auto buffer = _buffers[_buffers_write_position];
  func(buffer);
  // 更改写 slot
  _buffers_write_position = (_buffers_write_position + 1) % _buffers_capacity;
  _buffers_not_empty_cond.notify_one();
}

void BoundedBuffer::Consume(std::function<void(void*)> func)
{
  std::unique_lock<std::mutex> buffers_lock(_buffers_mtx);
  // 等待读 slot
  _buffers_not_empty_cond.wait(buffers_lock, [&] { return _buffers_write_position != _buffers_read_position; });
  // 有可读 slot 马上释放。因为 func 可能是耗时操作,防止过久阻塞 Produce 造成有可写 slot 而无法写。
  buffers_lock.unlock();
  auto buffer = _buffers[_buffers_read_position];
  func(buffer);
  // 更改读 slot
  _buffers_read_position = (_buffers_read_position + 1) % _buffers_capacity;
  _buffers_not_full_cond.notify_one();
}

三、测试

// File: test_bounded_queue.cpp
#include "bounded_buffer.h"
#include <iostream>
#include <thread>

int main(int argc, const char** argv)
{
  // Buffer 中每块数据最大长度 sizeof(size_t)。实际应用中,更大长度的内存才有意义。
  std::unique_ptr<BoundedBuffer> boundedBuffer = std::make_unique<BoundedBuffer>("Test", 4, sizeof(size_t));

  std::thread producer_thread(
    [&]
    {
      for (size_t i = 0; i < 1000; i++)
      {
        boundedBuffer->Produce(
        [=](void* buffer)
        {
          // 假设生产耗时 20ms 左右。
          std::this_thread::sleep_for(std::chrono::milliseconds(20));
          *(size_t*)buffer = i;
          // std::cout << "Produce: " << i << std::endl;
        });
      }
   });

  std::thread consumer_thread(
    [&]
    {
      for (size_t i = 0; i < 1000; i++)
      {
        boundedBuffer->Consume(
        [=](void* buffer)
        {
          auto value = *(size_t*)buffer;
          // 假设消费耗时 20ms 左右。
          std::this_thread::sleep_for(std::chrono::milliseconds(20));
          // std::cout << "Consume: " << value << std::endl;
        });
      }
   });

  producer_thread.join();
  consumer_thread.join();

  return 0;
}

运行:

$ time ./test_bounded_queue
./test_bounded_queue  0.05s user 0.05s system 0% cpu 24.314 total

理所应当地,粗略测试耗时 24s 左右比串行 40s 左右快——这不是重点,重点是达到了内存复用的目的。

四、说明

1、的确是需要 mutex 和 condition_variable 吗?

是的。比如在生产时,发现“无法获取到”可写的 slot,又不允许丢弃数据,为了不让生产者线程轮询则只能等待。

2、为什么 Produce 和 Consume 里 wait 返回后马上解锁?

比如生产时,生产的过程可能耗时。确保“能获取到”生产 slot 后立即解锁,以便消费者线程调用 Consume 时如果阻塞在 std::unique_lock<std::mutex> buffers_lock(_buffers_mtx); 能够取得锁,从而得以消费在本次生产之前已经生产好的 slot ——如果队列完全没有可读数据当然就“转为”阻塞在 _buffers_not_empty_cond.wait(buffers_lock, [&] { return _buffers_write_position != _buffers_read_position; });。如果是阻塞在 wait,则会在本次生产好后通过 _buffers_not_empty_cond.notify_one(); 唤醒消费者线程。

标签:std,线程,buffer,Buffer,队列,Bounded,position,buffers,size
From: https://www.cnblogs.com/alby/p/16954102.html

相关文章

  • leetcode 1687. 从仓库到码头运输箱子 动态规划 + 单调队列
    你有一辆货运卡车,你需要用这一辆车把一些箱子从仓库运送到码头。这辆卡车每次运输有 箱子数目的限制 和总重量的限制 。给你一个箱子数组 boxes 和三个整数portsCo......
  • StringBuffer类和StringBuilder类
    1.StringBuffer类基本介绍java.lang.StringBuffer代表可变的字符序列,可以对字符串内容进行增删。很多方法与String相同,但StringBuffer是可变长度的。StringBuffer是一......
  • Java基础-String、StringBuffer、StringBuilder类
    String类String的特性:String类代表字符串。Java程序中的所有字符串字面值(如"abc")都作为此类的实例实现。String是一个final类,代表不可变的字符序列。......
  • 单调队列学习笔记
    用处:滑动窗口维护区间最值核心思想:双端队列,队首放最大值/最小值的下标,1.清除不是更优的(队尾弹出)。2.清除过期的(队首弹出)。例题:https://www.luogu.com.cn/problem/P3088......
  • node js中的buffer
    Node中Buffer的深度解析Node中Buffer的深度解析在Node中,应用需要处理网络协议、操作数据库、处理图片、接收上传文件等,在网络流和文件的操作中,还要处理大量二进制数据,Jav......
  • 三阶段:第13周 分布式消息队列-Kafka 没用
                   ......
  • 算法--数组、链表、栈、队列
    一、数组1、删除有序数组中的重复项(简单)题目地址:https://leetcode.cn/problems/remove-duplicates-from-sorted-array/给你一个升序排列的数组nums,请你原地删除重......
  • java常见的延迟队列实现方式
    参考如下:https://www.jianshu.com/p/977466020144/redission延迟队列的实现https://www.cnblogs.com/better-farther-world2099/articles/15216447.html......
  • [DPDK] 多队列同时从网卡接收数据
    [DPDK]多队列同时从网卡接收数据在DPDK中,如何让多个核/多个接收队列同时从一个网卡接收数据呢?其实很简单,在port_conf里开启一个mq_mode选项就行了。structrte_eth_c......
  • 基于Redis的Stream结构作为消息队列,实现异步秒杀下单
    需求:创建一个Stream类型的消息队列,名为stream.orders修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、order......