文章目录
0. 引言
我们知道:
- 多线程:适用于处理计算密集型任务或IO操作较少的场景,但会因为线程切换和创建销毁的开销而影响性能。
- 协程:适用于处理需要大量并发且具有等待或阻塞的任务。在阻塞型任务中,协程通过上下文切换以提高资源利用率,减少开销。
本文将探讨如何通过 多线程 和 协程 来优化 阻塞型任务(例如矩阵乘法)调度性能,使用的协程库来自:state-threads
更多请看:state-threads-for-internet-applications
基本使用方法如下:
#include <stdio.h>
#include "st.h"
void* do_calc(void* arg){
int sleep_ms = (int)(long int)(char*)arg * 10;
for(;;){
printf("in sthread #%dms\n", sleep_ms);
st_usleep(sleep_ms * 1000);
}
return NULL;
}
int main(int argc, char** argv){
if(argc <= 1){
printf("Test the concurrence of state-threads!\n");
printf("Usage: %s <sthread_count>\n");
printf("eg. %s 10000\n", argv[0], argv[0]);
return -1;
}
if(st_init() < 0){
printf("st_init error!");
return -1;
}
int i;
int count = atoi(argv[1]);
for(i = 1; i <= count; i++){
if(st_thread_create(do_calc, (void*)i, 0, 0) == NULL){
printf("st_thread_create error!");
return -1;
}
}
st_thread_exit(NULL);
return 0;
}
1. 多线程 VS 多线程+协程
来看一个简单的 矩阵乘法 的实现。我们将矩阵分割成多个部分,每个线程负责计算矩阵的一部分。为模拟阻塞行为,我们在计算过程中加入了 std::this_thread::sleep_for(std::chrono::microseconds(1))
延迟1微秒,在协程中使用st_usleep(1);让出CPU 1微秒。
1.1 示例 1:使用传统的多线程进行矩阵乘法
#include <iostream>
#include <vector>
#include <thread>
#include <random>
#include <chrono>
// 生成随机矩阵
std::vector<std::vector<int>> generate_matrix(int rows, int cols) {
std::vector<std::vector<int>> matrix(rows, std::vector<int>(cols));
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(1, 100);
for (int i = 0; i < rows; ++i) {
for (int j = 0; j < cols; ++j) {
matrix[i][j] = dis(gen);
}
}
return matrix;
}
// 线程函数:矩阵乘法的一部分
void thread_matrix_multiply(const std::vector<std::vector<int>>& A, const std::vector<std::vector<int>>& B,
std::vector<std::vector<int>>& C, int start_row, int end_row) {
int m = A.size();
int n = A[0].size();
int p = B[0].size();
for (int i = start_row; i < end_row; ++i) {
for (int j = 0; j < p; ++j) {
C[i][j] = 0;
for (int k = 0; k < n; ++k) {
C[i][j] += A[i][k] * B[k][j];
}
std::this_thread::sleep_for(std::chrono::microseconds(1)); // 模拟阻塞
}
}
}
void multi_thread_matrix_multiply(const std::vector<std::vector<int>>& A, const std::vector<std::vector<int>>& B,
std::vector<std::vector<int>>& C, int num_threads) {
int m = A.size();
int n = A[0].size();
int p = B[0].size();
std::vector<std::thread> threads;
int rows_per_thread = m / num_threads;
for (int i = 0; i < num_threads; ++i) {
int start_row = i * rows_per_thread;
int end_row = (i == num_threads - 1) ? m : start_row + rows_per_thread;
threads.emplace_back(thread_matrix_multiply, std::ref(A), std::ref(B), std::ref(C), start_row, end_row);
}
for (auto& t : threads) {
t.join();
}
}
int main() {
int N = 1000;
int num_threads = 4;
auto A = generate_matrix(N, N);
auto B = generate_matrix(N, N);
std::vector<std::vector<int>> C(N, std::vector<int>(N));
// 记录开始时间
auto start = std::chrono::high_resolution_clock::now();
multi_thread_matrix_multiply(A, B, C, num_threads);
// 记录结束时间并计算耗时
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> time_spent = end - start;
std::cout << "Time spent: " << time_spent.count() << " seconds\n";
return 0;
}
执行结果:
$ g++ -o mutil_thread mutil_thread.cpp -O2 -lpthread
$ ./mutil_thread
Start multi-thread matrix multiplication with 4 threads...
Start 4 threads...
All threads finished.
Time spent: 14.91s
1.2. 示例 2:使用协程优化阻塞型任务
接下来,我们使用 协程 来优化阻塞型任务的调度。在协程中,我们不再为每个任务创建独立的线程,而是在同一个线程中切换执行上下文。
#include <pthread.h>
#include <cstdlib>
#include <ctime>
#include <iostream>
#include <random>
#include <vector>
#include "st.h"
#define THREAD_COUNT 4
#define STACK_SIZE (4* 1024) // 4k stack size
// 定义协程参数结构体
struct CoroutineParams {
const std::vector<std::vector<int>>* A;
const std::vector<std::vector<int>>* B;
std::vector<std::vector<int>>* C;
int start_row;
int end_row;
int* active_coroutines; // 指向活跃协程计数器的指针
};
// 协程函数:矩阵乘法的一部分
void* coroutine_matrix_multiply(void* arg) {
CoroutineParams* params = static_cast<CoroutineParams*>(arg);
int m = params->A->size();
int n = params->A->at(0).size();
int p = params->B->at(0).size();
for (int i = params->start_row; i < params->end_row; ++i) {
for (int j = 0; j < p; ++j) {
(*params->C)[i][j] = 0;
for (int k = 0; k < n; ++k) {
(*params->C)[i][j] += (*params->A)[i][k] * (*params->B)[k][j];
}
st_usleep(1); // 模拟阻塞
}
}
// 协程完成,减少活跃协程计数器
__sync_fetch_and_add(params->active_coroutines, -1);
delete params;
return NULL;
}
// 每个线程执行的函数
void* thread_matrix_multiply(void* arg) {
ThreadParams* params = static_cast<ThreadParams*>(arg);
// 在每个线程中初始化State Threads库
if (st_init() < 0) {
fprintf(stderr, "st_init error in thread %ld!\n", pthread_self());
pthread_exit(NULL);
}
int rows_per_coroutine = (params->end_row - params->start_row) / params->coroutine_count;
// 初始化活跃协程计数器
*params->active_coroutines = params->coroutine_count;
for (int i = 0; i < params->coroutine_count; ++i) {
int start_row = params->start_row + i * rows_per_coroutine;
int end_row = (i == params->coroutine_count - 1) ? params->end_row : start_row + rows_per_coroutine;
CoroutineParams* coroutine_params = new CoroutineParams{params->A, params->B, params->C, start_row, end_row};
coroutine_params->active_coroutines = params->active_coroutines;
if (st_thread_create(coroutine_matrix_multiply, coroutine_params, STACK_SIZE, 0) == NULL) {
fprintf(stderr, "st_thread_create error! i=%d in thread %ld\n", i, pthread_self());
pthread_exit(NULL);
}
}
// 等待所有协程完成
while (*params->active_coroutines > 0) {
st_thread_yield();
}
delete params;
return NULL;
}
int main() {
int N = 1000;
int num_threads = THREAD_COUNT;
auto A = generate_matrix(N, N);
auto B = generate_matrix(N, N);
std::vector<std::vector<int>> C(N, std::vector<int>(N));
// 记录开始时间
clock_t start = clock();
multi_thread_matrix_multiply(A, B, C, num_threads);
// 记录结束时间并计算耗时
clock_t end = clock();
double time_spent = (double)(end - start) / CLOCKS_PER_SEC;
std::cout << "Time spent:
" << time_spent << " seconds\n";
return 0;
}
执行结果:
$ g++ -o coroutine2 coroutine2.cpp -O2 -lst -lpthread -L.
$ ./coroutine2
Start multi-thread matrix multiplication with 4 threads and 10 coroutines per thread...
Start 4 threads...
All threads finished.
Time spent: 3.32s
3. 分析与对比
实验是假设计算任务中存在阻塞操作,如std::this_thread::sleep_for(std::chrono::microseconds(1))
。从结果可以看出,与传统的多线程方法相比,协程使用st_usleep(1)
让出CPU吗,可以减少线程创建和上下文切换的开销,提升性能。
多线程(14.91秒):
每个线程都运行在独立的执行单元上,导致线程之间的上下文切换开销较大,尤其是在大量小任务(例如矩阵乘法中的每次加法)并发执行时。
协程(3.06秒):
协程通过在单线程中高效切换执行上下文,减少了线程调度开销。尽管任务本身仍然存在延迟,但由于多个协程共享同一个线程的CPU资源,整体执行效率大幅提升。