首页 > 编程语言 >C++小练习:字符串分割的高性能实现

C++小练习:字符串分割的高性能实现

时间:2023-10-29 22:13:28浏览次数:32  
标签:std zero C++ tokens start token 高性能 字符串 input

字符串分割是日常工作中比较常见的基础函数,通常大家会使用现成的基础库,基础库的性能是否是最佳的?本文基于一个周末小练习,研究如何最大限度的提升字符串分割的性能。   

1、背景

字符串按照分隔符拆成多个子串在日常工作中很常见,譬如:后台服务对配置的解析、模型服务对输入特征的拆分、磁盘格式索引文件转内存格式等等,通常最简单的实现是使用 boost 库:
std::vector<std::string> output_tokens;
boost::split(output_tokens, input_sentence, boost::is_any_of(" "), boost::token_compress_off); 

这是最简单也是性能最差的写法,在一些频繁做字符串拆分的场景下,部分开发者会发现用 string 会触发字符串拷贝的代价,于是改为自己使用 string_view,这会显著提高性能,在使用 string_view 的基础上仍然有两个点需要特别注意:

  1. 尽可能的减少分割时的计算量
  2. 使用 SIMD 指令

本文将对此做细致分析。

2、目标

针对单字符分割和任意字符分割两类场景,本文以代码案例为主,讲解如何写出高性能的字符串分割函数。在考虑高性能的情况下,最基本的要求是字符串分割不应触发拷贝,因此本文实现的多个字符串分割函数,都基于 string_view 消除了拷贝,在此基础上再进行性能优化分析。本文两类场景的函数签名如下:

std::vector<std::string_view> SplitString(std::string_view input, char delimiter);
std::vector<std::string_view> SplitString(std::string_view input, std::string_view delimiters);

3、高性能实现

单字符分割和任意字符分割这两类场景分开介绍。

3.1、单字符分割字符串的5个版本

下面提供 5 种单字符分割字符串的实现,并对其性能做总结分析。

(1)版本1-简单版本

单字符分割是最常见的场景,譬如用于日志解析、特征解析等,首先我们考虑基于遍历自行实现,实现如下:
std::vector<std::string_view> SplitStringV1(std::string_view input, char delimiter) {
  std::vector<std::string_view> tokens;
  int start_pos = 0;
  int size = 0;
  for (size_t i = 0; i < input.size(); ++i) {
    if (input[i] == delimiter) {
      if (size != 0) {
        tokens.emplace_back(input.data() + start_pos, size);
        size = 0;
      }
      start_pos = i + 1;
    } else {
      ++size;
    }
  }
  if (size > 0) {
    tokens.emplace_back(input.data() + start_pos, size);
  }
  return tokens;
}

    这样的实现很好理解:遍历 input 字符串,发现有分隔符 delimiter 时,考虑生成结果子字符串,生成子字符串需要知道起点和长度,因此定义了 token_start 和 size 两个临时变量,并在遍历过程中维护这两临时变量。但仔细观察这个函数的实现,可以发现有三个变量:token_start、size、i,但通常来说,定位一个子字符串只需要两个变量(或者叫游标):start、end 即可,这里存在性能优化空间。

(2)版本2-优化版本

    基于两个游标的实现:

std::vector<std::string_view> SplitStringV2(std::string_view input, char delimiter) {
  std::vector<std::string_view> tokens;
  const char* token_start = input.data();
  const char* p = token_start;
  const char* end_pos = input.data() + input.size();
  for (; p != end_pos; ++p) {
    if (*p == delimiter) {
      if (p > token_start) {
        tokens.emplace_back(token_start, p - token_start);
      }
      token_start = p + 1;
      continue;
    }
  }
  if (p > token_start) {
    tokens.emplace_back(token_start, p - token_start);
  }
  return tokens;
}

    这里 token_start 作为子串的起始位置,p 作为递增游标,上一份代码的 size 可以通过 p - token_start 获得。这个实现少维护 size 变量,因此性能更好。

(3)版本3-STL 版本

有时候,我们也会考虑使用标准库的 find_first_of 函数实现,代码量更少,代码如下:
std::vector<std::string_view> SplitStringV3(std::string_view input, char delimiter) {
  std::vector<std::string_view> tokens;
  size_t token_start = 0;
  while (token_start < input.size()) {
    auto token_end = input.find_first_of(delimiter, token_start);
    if (token_end > token_start) {
      tokens.emplace_back(input.substr(token_start, token_end - token_start));
    }
    if (token_end == std::string_view::npos) {
      break;
    }
    token_start = token_end + 1;
  }
  return tokens;
}

    但上述实现,性能比我们自己实现遍历的 SplitStringV2 版本性能要差,毕竟 find_first_of 的每次查找都要重新初始化其实位置,相比自己实现遍历有性能浪费。

(4)版本4-SIMD 最佳版本

字符串分割很重要的一步是逐个字符对字符串做比较,是否可以并行比较呢?是可以的,SIMD 指令可以加速这里的比较,当前大多数机器都已支持 AVX2,但还未普遍支持 AVX512,下面以 AVX2 为例。代码如下:

// 编译:g++ split_string_by_char.cc -mavx2 -o split_string_by_char
std::vector<std::string_view> SplitStrintV4(std::string_view input, char delimiter) { if (input.size() < 32) { return SplitStringV2(input, delimiter); } std::vector<std::string_view> tokens; uint32_t end_pos = input.size() >> 5 << 5; __m256i cmp_a = _mm256_set1_epi8(delimiter); // 8bit的分隔重复32次扩充到256bit const char* p = input.data(); const char* end = p + end_pos; uint32_t last_lead_zero = 0; // 上一轮256bit(32个字符)处理后剩下的未拷贝进结果集的字符串个数 while (p < end) { __m256i cmp_b = _mm256_loadu_si256(reinterpret_cast<const __m256i*>(p)); // 32个字符加载进内存 __m256i cmp = _mm256_cmpeq_epi8(cmp_a, cmp_b); // 256 bit 一次比较 uint32_t mask = _mm256_movemask_epi8(cmp); if (mask == 0) { last_lead_zero += 32; p += 32; continue; } // 记录本次的头部0个数,注:mask的序和字符串序是相反的,所以这里头部的0对应字符串尾部的不匹配字符 uint32_t lead_zero = __builtin_clz(mask); // 补上一次未拷贝的字符串 uint32_t tail_zero = __builtin_ctz(mask); if (last_lead_zero != 0 || tail_zero != 0) { tokens.emplace_back(p - last_lead_zero, last_lead_zero + tail_zero); } mask >>= (tail_zero + 1); p += tail_zero + 1; // 补完,继续处理 while (mask != 0) { uint32_t tail_zero = __builtin_ctz(mask); if (tail_zero != 0) { tokens.emplace_back(p, tail_zero); } mask >>= (tail_zero + 1); p += tail_zero + 1; } last_lead_zero = lead_zero; p += lead_zero; } // 256 bit(32字节) 对齐之后剩下的部分 const char* token_start = input.data() + end_pos - last_lead_zero; const char* pp = token_start; const char* sentence_end = input.data() + input.size(); for (; pp != sentence_end; ++pp) { if (*pp == delimiter) { if (pp > token_start) { tokens.emplace_back(token_start, pp - token_start); } token_start = pp + 1; continue; } } if (pp > token_start) { tokens.emplace_back(token_start, pp - token_start); } return tokens; }

 这里使用了 5 个关键的函数:

  1. _mm256_loadu_si256,用于加载 256 位(32 字节)数据
  2. _mm256_cmpeq_epi8,用于比较 256 位数据
  3. _mm256_movemask_epi8,用于将比较的结果压缩到 32 位中,一位代表一个字节
  4. __builtin_clz,获取头部的 0 bit 个数
  5. __builtin_ctz,获取尾部的 0  bit 个数

同时在代码实现上需要考虑多个细节点:

  1. 待比较字符串小于 32 字节,此时不需要用 SIMD 指令,直接逐个字符比较
  2. 在比较过程中,
  3. 在每一次比较结果的处理时,除了逐个判断 32 个字符中的分隔符之外,还要考虑上一轮 32 字节比较的尾部有部分字符没有生成结果子字符串,要和本轮次的头部字符串拼成一个子字符串
  4. 多轮的 SIMD 指令比较都完成后,考虑:(1)末尾轮有部分字符没有进入结果子字符串;(2)部分字符没有对齐 32 字节,有尾巴部分的数据需要逐个字符比较垂类

要考虑好这些细节点,代码很复杂,考虑到 SIMD 指令是单条指令,而我们代码中对 SIMD 比较完成后的 32 bit(4字节) 的逐个比较是由多个单条指令祖传,因此尝试让 SIMD 指令多算一些,而减少对 32 bit 的轮询判断。

(5)版本5- SIMD 较差版本

减少 SIMD 比较指令执行完后的 32 bit 遍历处理,改成每次 SIMD 比较指令完成后,只取头一个分隔符的结果。代码如下:

// 编译:g++ split_string_by_char.cc -mavx2 -o split_string_by_char
std::vector<std::string_view> SplitStrintV5(std::string_view input, char delimiter) {
  if (input.size() < 32) {
    return SplitStringV2(input, delimiter);
  }

  std::vector<std::string_view> tokens;
  __m256i cmp_a = _mm256_set1_epi8(delimiter);  // 8bit的分隔重复32次扩充到256bit
  const char* p = input.data();
  uint32_t last_lead_zero = 0;  // 上一轮256bit(32个字符)处理后剩下的未拷贝进结果集的字符串个数
  while (p + 32 < input.data() + input.size()) {
    __m256i cmp_b = _mm256_loadu_si256(reinterpret_cast<const __m256i*>(p));  // 32个字符加载进内存
    __m256i cmp = _mm256_cmpeq_epi8(cmp_a, cmp_b);  // 256 bit 一次比较
    uint32_t mask = _mm256_movemask_epi8(cmp);
    if (mask == 0) {
      last_lead_zero += 32;
      p += 32;
      continue;
    }

    // 补上一次未拷贝的字符串
    uint32_t tail_zero = __builtin_ctz(mask);
    if (last_lead_zero != 0 || tail_zero != 0) {
      tokens.emplace_back(p - last_lead_zero, last_lead_zero + tail_zero);
      last_lead_zero = 0;
    }
    p += tail_zero + 1;
  }

  // 不足 256 bit(32字节)部分
  const char* token_start = p - last_lead_zero;
  const char* sentence_end = input.data() + input.size();
  for (; p != sentence_end; ++p) {
    if (*p == delimiter) {
      if (p > token_start) {
        tokens.emplace_back(token_start, p - token_start);
      }
      token_start = p + 1;
      continue;
    }
  }
  if (p > token_start) {
    tokens.emplace_back(token_start, p - token_start);
  }
  return tokens;
}
在长文本下评测发现本代码性能较差。

(6)性能对比

以上 5 个版本的代码,再加上 google 开源的 absl 基础库 absl::StrSplit 函数,对 2K+ 的长文本循环 10000 次做压测,性能对比如下:

性能对比 V1-简单版 V2-优化版 V3-stl版本 V4-SIMD最佳版本 V5-SIMD较差版本 absl
耗时(ms) 552 426 508 413 501 709
可以看到,absl::StrSplit 性能最差,可能是因为其会返回空字符串导致。性能最佳的是需要复杂处理的 SIMD 版本,性能次之的是采用两个游标的非 SIMD 版本,两者的差异非常小。考虑到 SIMD 实现的复杂性,且性能收益较小,在实际业务场景中,可以在复杂性和性能收益之间做个权衡。

3.2、任意字符分割字符串

    任意字符分割字符串也很常见,譬如有些日志支持空格、tab、逗号任意一个作为分隔符。和 3.1 章的单字符分割类似,提供三种实现,并对其性能做总结。

(1)STL 版

// 不使用 SIMD,使用标准库的查找,是性能最差的方式
std::vector<std::string_view> SplitString(std::string_view input, std::string_view delimiters) {
  if (delimiters.empty()) {
    return {input};
  }
  std::vector<std::string_view> tokens;
  std::string_view::size_type token_start = input.find_first_not_of(delimiters, 0);
  std::string_view::size_type token_end = input.find_first_of(delimiters, token_start);
  while (token_start != std::string_view::npos || token_end != std::string_view::npos) {
    tokens.emplace_back(input.substr(token_start, token_end - token_start));
    token_start = input.find_first_not_of(delimiters, token_end);
    token_end = input.find_first_of(delimiters, token_start);
  }
  return tokens;
}

这个实现和 3.1 里的 stl 版本类似,代码量比较少,性能比较差。

(2)自行遍历版

// 不使用 SIMD,使用遍历,是非 SIMD 模式下性能最好的方式
std::vector<std::string_view> SplitStringV2(std::string_view input, std::string_view delimiters) {
  if (delimiters.empty()) {
    return {input};
  }
  std::vector<std::string_view> tokens;
  const char* token_start = input.data();
  const char* p = token_start;
  const char* end_pos = input.data() + input.size();
  for (; p != end_pos; ++p) {
    bool match_delimiter = false;
    for (auto delimiter : delimiters) {
      if (*p == delimiter) {
        match_delimiter = true;
        break;
      }
    }
    if (match_delimiter) {
      if (p > token_start) {
        tokens.emplace_back(token_start, p - token_start);
      }
      token_start = p + 1;
      continue;
    }
  }
  if (p > token_start) {
    tokens.emplace_back(token_start, p - token_start);
  }
  return tokens;
}

自行遍历版本,和 3.1 里的版本 2 代码很像,性能也是非 SIMD 模式下最好的。

(3)SIMD 版本

std::vector<std::string_view> SplitStringWithSimd256(std::string_view input, std::string_view delimiters) {
  if (delimiters.empty()) {
    return {input};
  }

  if (input.size() < 32) {
    return SplitStringV2(input, delimiters);
  }

  std::vector<std::string_view> tokens;
  uint32_t end_pos = input.size() >> 5 << 5;
  const char* p = input.data();
  const char* end = p + end_pos;
  uint32_t last_lead_zero = 0;  // 上一轮256bit(32个字符)处理后剩下的未拷贝进结果集的字符串个数
  while (p < end) {
    __m256i cmp_a = _mm256_loadu_si256(reinterpret_cast<const __m256i*>(p));  // 32个字符加载进内存
    __m256i cmp_result_a = _mm256_cmpeq_epi8(cmp_a, _mm256_set1_epi8(delimiters[0]));

    for (int i = 1; i != delimiters.size(); ++i) {
      __m256i cmp_result_b = _mm256_cmpeq_epi8(cmp_a, _mm256_set1_epi8(delimiters[i]));
      cmp_result_a = _mm256_or_si256(cmp_result_a, cmp_result_b);
    }
    uint32_t mask = _mm256_movemask_epi8(cmp_result_a);
    if (mask == 0) {
      last_lead_zero += 32;
      p += 32;
      continue;
    }

    // 记录本次的头部0个数,注:mask的序和字符串序是相反的,所以这里头部的0对应字符串尾部的不匹配字符
    uint32_t lead_zero = __builtin_clz(mask);

    // 补上一次未拷贝的字符串
    uint32_t tail_zero = __builtin_ctz(mask);
    if (last_lead_zero != 0 || tail_zero != 0) {
      tokens.emplace_back(p - last_lead_zero, last_lead_zero + tail_zero);
    }
    mask >>= (tail_zero + 1);
    p += tail_zero + 1;

    // 补完,继续处理
    while (mask != 0) {
      uint32_t tail_zero = __builtin_ctz(mask);
      if (tail_zero != 0) {
        tokens.emplace_back(p, tail_zero);
      }
      mask >>= (tail_zero + 1);
      p += tail_zero + 1;
    }

    last_lead_zero = lead_zero;
    p += lead_zero;
  }

  // 256 bit(32字节) 对齐之后剩下的部分
  const char* token_start = input.data() + end_pos - last_lead_zero;
  const char* pp = token_start;
  const char* sentence_end = input.data() + input.size();
  for (; pp != sentence_end; ++pp) {
    bool match_delimiter = false;
    for (auto delimiter : delimiters) {
      if (*pp == delimiter) {
        match_delimiter = true;
        break;
      }
    }
    if (match_delimiter) {
      if (pp > token_start) {
        tokens.emplace_back(token_start, pp - token_start);
      }
      token_start = pp + 1;
      continue;
    }
  }
  if (pp > token_start) {
    tokens.emplace_back(token_start, pp - token_start);
  }
  return tokens;
}

处理任意分隔符和处理单分隔符大部分代码类似,只有两部分有变化:

  1. 一个待比较字符串和多个分隔符比较得到的多个 mask 码,使用 _mm256_or_si256 做合并
  2. 非 SIMD 部分使用 for 循环遍历判断多个分隔符

(4)性能对比

以上 3 个版本的代码,,再加上 google 开源的 absl 基础库 absl::StrSplit 函数,对 2K+ 的长文本循环 10000 次做压测,性能对比如下:

性能对比 stl版 自行遍历版 SIMD版 absl
耗时(ms) 768 658 480 1054
absl::StrSplit 性能最差,SIMD 版本有多个分隔符的场景下,性能提升非常明显,SIMD 应用代码的高复杂度付出是值得的。

4、思考

字符串分割是很常见的功能,通常其实现代码也很简洁,这就使得开发者容易忽略其性能,写出非最佳性能的代码,譬如:没有使用现代 C++ 中的 string_view、对遍历过程没有精细考虑。通过精细的控制计算量以及应用 SIMD 指令可以获得比较好的收益,特别是 SIMD 指令在任意多分隔符场景下性能优化效果非常明显。

本文地址:https://www.cnblogs.com/cswuyg/p/17794816.html

标签:std,zero,C++,tokens,start,token,高性能,字符串,input
From: https://www.cnblogs.com/cswuyg/p/17794816.html

相关文章

  • 字符串、线性表、队列、栈、哈希表、dfs、bfs
    题目列表:1.字符串无重复字符的最长子串(中等难度)给定一个字符串s,请你找出其中不含有重复字符的最长子串的长度。AC代码,展开查看classSolution{public:intlengthOfLongestSubstring(strings){intres=0;unordered_map<char,int>heap......
  • SQL Server数据库连接字符串的几种写法整理
     SQLServer数据库连接字符串的几种写法整理一、远程连接SQLServer数据库1.sqlserver身份验证连接字符串:privatestringConnstrSqlServer="server=数据库地址及实例;uid=数据库账号;pwd=数据库密码;database=数据库名";2.windows身份验证连接字符串:privatestr......
  • C++基础(2)
    将数组传入函数禁止修改数组的值函数的地址与函数的指针函数的指针数组函数的static与inline引用左值和引用传参C++11的数组for循环64位Linux操作系统中C++中常见基本类型所占字节数C++11类成员变量的初始化默认成员初始化器成员变量初始化列表委......
  • C++基础(3)
    类的继承基类与派生类之间的构造行为在派生类中使用基类方法protected的访问权限多态公有继承关键字virtual示例抽象基类(ABC)私有继承和保护继承多重继承类的继承基类与派生类之间的构造行为派生类可以调用基类的公共成员,但无法调用基类的私有成员。所......
  • 对于字符串,例如d(d)jjhd{f},判断括号是否符合规范
    publicbooleanisValid(Strings){Stack<Character>stack=newStack<>();Map<Character,Character>map=newHashMap<>();char[]chars=s.toCharArray();map.put(')','(');......
  • MinIO 高性能分布式存储最新版单机与分布式部署
    目录一、概述二、单机部署(单主机,多硬盘模式)1)磁盘初始化2)创建服务启动用户并设置磁盘属主3)下载minio安装包4)修改配置5)配置systemctl启动6)客户端工具mc三、分布式集群部署(多主机、多硬盘模式)1)磁盘初始化2)创建服务启动用户并设置磁盘属主3)下载minio安装包4)修改配置5)配置syste......
  • 一个字符串 AbAbcBaB 这种 消除驼峰字段 AbA aBa 这种 只留下非驼峰比如刚才这个字符
    publicclassSolution{char[]c=s.toCharArray();intlen=c.length;if(len<=2){System.out.println(c[len]);}intj=-1;for(inti=0;i<len-2;i++){if(c[i]==c[i+2]&&c[i]!=c[i+1]){j=i+2;......
  • 简单c++构建第一人称
    本文内容为UE4.27的文档教程GameMode确定新建的项目会自动生成GameMode,如果有更改,而不是使用默认的GameMode类,就需要在引擎的设置中更改角色的实现前后左右移动//前后MoveForwardvoidAFPSCharacter::MoveForward(floatvalue){ //利用控制器方向寻找前进的方向就是角色......
  • C++多线程编程——线程的基本概念和使用方法
    什么是线程?在计算机科学中,线程是进程中的一个执行控制单元,也被称为执行路径。每个进程可以包含多个线程,每条线程并行执行不同的任务。线程是操作系统可识别的最小执行和调度单位。进程和线程的区别进程是程序在某个数据集合上的一次运行活动,也是操作系统进行资源分配和保护的......
  • 【C++】继承 ⑬ ( 虚继承原理 | 虚继承解决继承二义性问题 | 二义性产生的原因分析 )
    文章目录一、虚继承原理1、虚继承解决继承二义性问题2、二义性产生的原因分析3、虚继承原理二、代码示例-虚继承原理1、完整代码示例2、执行结果一、虚继承原理1、虚继承解决继承二义性问题继承的二义性:如果一个子类(派生类)继承多个父类(基类),这些父类都继......