第四章 并发编程
并行计算
是一种计算方案,它尝试使用多个执行并行算法的处理器更快速的解决问题
顺序算法与并行算法
并行性与并发性
并行算法只识别可并行执行的任务。CPU系统中,并发性是通过多任务处理来实现的
线程
线程的原理
某进程同一地址空间上的独立执行单元
线程的优点
- 线程创建和切换速度更快
- 线程的响应速度更快
- 线程更适合并行运算
线程的缺点
- 线程需要来自用户的明确同步
- 库函数不安全
- 单CPU系统中,线程解决问题实际上要比使用顺序程序慢
线程操作
线程可在内核模式或用户模式下执行
其中涉及Linux下的pthread并发编程
线程管理函数
创建线程
使用pthread_create()
int pthread_create(pthread_t *pthread_id,pthread_attr_t *attr,void *(*func)(void*),void *arg)
注意:
1、pthread_id是指向pthread_t类型变量的指针
2、attr如果是NULL,将使用默认属性创建线程
线程ID
int pthread_equal(pthread_t t1,pthread_t t2);
不同的线程,返回0,否则返回非0
线程终止
int pthread_exit(void *status);
线程连接
int pthread_join(pthread_t thread,void **status_ptr)
线程示例程序
用线程计算矩阵的和
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define N 4
int A[N][N], sum[N];
void *func(void *arg)
{
int j, row;
pthread_t tid = pthread_self(); // get thread ID number
row = (int)arg; // get row number from arg
printf("Thread %d [%lu] computes sum of row %d\n", row, tid, row);
for (j=0; j<N; j++)
{
sum[row] += A[row][j];
}
printf("Thread %d [%lu] done sum[%d] = %d\n",row, tid, row, sum[row]);
pthread_exit((void*)0); // thread exit: 0=normal termination
}
int main (int argc, char *argv[])
{
pthread_t thread[N]; // thread IDs
int i, j, r, total = 0;
void *status;
printf("Main: initialize A matrix\n");
for (i=0; i<N; i++)
{
sum[i] = 0;
for (j=0; j<N; j++)
{
A[i][j] = i*N + j + 1;
printf("%4d" ,A[i][j]);
}
printf("\n");
}
printf("Main: create %d threads\n", N);
for(i=0; i<N; i++)
{
pthread_create(&thread[i], NULL, func, (void *)i);
}
printf("Main: try to join with threads\n");
for(i=0; i<N; i++)
{
pthread_join(thread[i], &status);
printf("Main: joined with %d [%lu]: status=%d\n",i, thread[i], (int)status);
}
printf("Main: compute and print total sum:");
for (i=0; i<N; i++)
{
total += sum[i];
}
printf("tatal = %d\n", total);
pthread_exit(NULL);
}
用线程快速排序
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define N 10
typedef struct{
int upperbound;
int lowerbound;
}PARM;
int A[N]={5,1,6,4,7,2,9,8,0,3};
int print() // print current a[] contents
{
int i;
printf("[ ");
for (i=0; i<N; i++)
{
printf("%d ", A[i]);
}
printf("]\n");
}
void *qsort_1(void *aptr)
{
PARM *ap, aleft, aright;
int pivot, pivotIndex, left, right, temp;
int upperbound, lowerbound;
pthread_t me, leftThread, rightThread;
me = pthread_self();
ap = (PARM *)aptr;
upperbound = ap->upperbound;
lowerbound = ap->lowerbound;
pivot = A[upperbound];
left = lowerbound - 1;
right = upperbound;
if (lowerbound >= upperbound)
pthread_exit(NULL);
while (left < right)
{
do { left++;} while (A[left] < pivot);
do { right--;}while (A[right] > pivot);
if (left < right )
{
temp = A[left];
A[left] = A[right];
A[right] = temp;
}
}
print();
pivotIndex = left;
temp = A[pivotIndex];
A[pivotIndex] = pivot;
A[upperbound] = temp; // start the "recursive threads"
aleft.upperbound = pivotIndex - 1;
aleft.lowerbound = lowerbound;
aright.upperbound = upperbound;
aright.lowerbound = pivotIndex + 1;
printf("%lu: create left and right threads\n", me);
pthread_create(&leftThread, NULL, qsort_1, (void *)&aleft);
pthread_create(&rightThread, NULL, qsort_1, (void *)&aright);// wait for left and right threads
pthread_join(leftThread, NULL);
pthread_join(rightThread, NULL);
printf("%lu: joined with left & right threads\n", me);
}
int main(int argc, char *argv[])
{
PARM arg;
int i, *array;
pthread_t me, thread;
me = pthread_self();
printf("main %lu: unsorted array =" ,me);
print();
arg.upperbound = N-1;
arg.lowerbound = 0;
printf("main %lu create a thread to do QS\n", me);
pthread_create(&thread, NULL, qsort_1, (void *)&arg); // wait for QS thread to finish
pthread_join(thread, NULL);
printf("main %lu sorted array = ", me);
print();
}
线程同步
竞态条件:修改结果取决于线程执行顺序
互斥量
在pthread中,锁被称为互斥量
pthread_mutex_lock(&m);
access shared data object;
pthread_mutex_unlock(&m);
示例
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define N 4
int A[N][N];
int total = 0;
pthread_mutex_t *m;
void *func(void *arg)
{
int i, row, sum = 0;
pthread_t tid = pthread_self(); // get thread ID number
row = (int)arg; // get row number from arg
printf("Thread %d [%lu] computes sum of row %d\n", row, tid, row);
for (i=0; i<N; i++)
sum += A[row][i];
printf("Thread %d [%lu] update total with %d : Thread %d : ", row, tid, sum,row);
//pthread_mutx_lock(m);
pthread_mutex_lock(m);
total += sum;
pthread_mutex_unlock(m);
printf ("total = %d\n", total);
}
int main (int argc, char *argv[])
{
pthread_t thread[N];
int i, j, r;
void *status;
printf("Main: initialize A matrix\n");
for (i=0; i<N; i++)
{
//sum[i] = 0;
for (j=0; j<N; j++)
{
A[i][j] = i*N + j + 1;
printf("%4d ", A[i][j]);
}
printf("\n");
}
// create a mutex m
m = (pthread_mutex_t *)malloc(sizeof(pthread_mutex_t));
pthread_mutex_init(m, NULL); // initialize mutex m
printf("Main: create %d threads\n", N);
for(i=0; i<N; i++)
{
pthread_create(&thread[i], NULL, func, (void *)i);
}
printf("Main: try to join with threads\n");
for(i=0; i<N; i++)
{
pthread_join(thread[1], &status);
printf("Main: joined with %d [%lu]: status=%d\n",i, thread[i]/ (int)status);
}
printf("Main: tatal = %d\n", total);
pthread_mutex_destroy (m); // destroy mutex m
pthread_exit(NULL);
}
死锁预防
死锁是一个状态,在这种状态下,许多执行实体相互等待,无法继续进行下去
条件变量
条件变量可以通过两种方法进行初始化
- 静态方法
- 动态方法
生产者-消费者问题
共享全局变量
int buf[NBUF];
int head,tail;
int data;
示例
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define NBUF 5
#define N 10
// shared global variables
int buf[NBUF]; // circular buffers
int head, tail; // indices
int data; // number of full buffers
pthread_mutex_t mutex;
pthread_cond_t empty, full;
int init()
{
head = tail = data = 0;
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&full, NULL);
pthread_cond_init(&empty, NULL);
}
void *producer()
{
int i;
pthread_t me = pthread_self();
for (i=0; i<N; i++) // try to put N items into buf[]
{
pthread_mutex_lock(&mutex); // lock mutex
if (data == NBUF)
{
printf ("producer %lu: all bufs FULL: wait\n", me);
pthread_cond_wait(&empty, &mutex); // wait
}
buf[head++] = i+1;
head %= NBUF;
data++;
printf("producer %lu: data=%d value=%d\n", me, data, i+1);
pthread_mutex_unlock (&mutex);
pthread_cond_signal(&full);
}
printf("producer %lu: exit\n", me);
}
void *consumer()
{
int i, c;
pthread_t me = pthread_self();
for (i=0; i<N; i++)
{
pthread_mutex_lock(&mutex); // lock mutex
if (data == 0)
{
printf ("consumer %lu: all bufs EMPTY: wait\n", me);
pthread_cond_wait(&full, &mutex); // wait
}
c = buf[tail++]; // get an item
tail %= NBUF;
data--; // dec data by 1
printf("consumer %lu: value=%d\n", me, c);
pthread_mutex_unlock(&mutex); // unlock mutex
pthread_cond_signal(&empty); // unblock a producer, if any
}
printf("consumer %lu: exit\n", me);
}
int main ()
{
pthread_t pro, con;
init();
printf("main: create producer and consumer threads\n");
pthread_create(&pro, NULL, producer, NULL);
pthread_create(&con, NULL, consumer, NULL);
printf("main: join with threads\n");
pthread_join(pro, NULL);
pthread_join(con, NULL);
printf("main: exit\n");
}
信号量
信号量是进程同步的一般机制
信号量和条件变量
屏障
线程连接操作允许某线程等待其他线程终止
在pthread中可以采用的机制是屏障以及一系列屏障函数
用并发线程解线性方程组
示例
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <math.h>
#define N 4
double A[N][N+1];
pthread_barrier_t barrier;
int print_matrix()
{
int i, j;
printf("----------------------------\n");
for(i=0; i<N; i++)
{
for(j=0; j < N+1; j++)
printf("%6.2f ", A[i][j]);
printf("\n");
}
}
void *ge(void *arg) // threads function: Gauss elimination
{
int i, j, prow;
int myid = (int)arg;
double temp, factor;
for(i=0; i<N-1; i++)
{
if (i == myid)
{
printf("partial pivoting by thread %d on row %d: ", myid, i);
temp = 0.0; prow = i;
for (j=i; j<=N; j++)
{
if (fabs(A[j][i]) > temp)
{
temp = fabs(A[j][i]);
prow = j;
}
}
printf("pivot_row=%d pivot=%6.2f\n", prow, A[prow][i]);
if (prow != i) // swap rows
{
for (j=i; j<N+1; j++)
{
temp = A[i][j];
A[i][j] = A[prow][j];
A[prow][j] = temp;
}
}
}
// wait for partial pivoting done
pthread_barrier_wait(&barrier);
for(j=i+1; j<N; j++)
{
if (j == myid)
{
printf("thread %d do row %d\n", myid, j);
factor = A[j][i]/A[i][i];
for (int k=i+1; k<=N; k++)
A[j][k] -= A[i][k]*factor;
A[j][i] = 0.0;
}
}
// wait for current row reductions to finish
pthread_barrier_wait(&barrier);
if (i == myid)
print_matrix();
}
}
int main(int argc, char *argv[])
{
int i, j;
double sum;
pthread_t threads[N];
printf("main: initialize matrix A[N][N+l] as [A|B]\n");
for (i=0; i<N; i++)
for (j=0; j<N; j++)
A[i][j] = 1.0;
for (i=0; i<N; i++)
A[i][N-i-1] = 1.0*N;
for (i=0; i<N; i++)
{
A[i][N] = 2.0*N - 1;
}
print_matrix(); // show initial matrix [A|B]
pthread_barrier_init(&barrier, NULL, N); // set up barrier
printf("main: create N=%d working threads\n", N);
for (i=0; i<N; i++)
{
pthread_create(&threads[i], NULL, ge, (void *)i);
}
printf("main: wait for all %d working threads to join\n", N);
for (i=0; i<N; i++)
{
pthread_join(threads[i], NULL);
}
printf("main: back substitution :");
for (i=N-1; i>=0; i--)
{
sum = 0.0;
for (j=i+1; j<N; j++)
sum += A[i][N];
A[i][N] = (A[i][N]- sum)/A[i][i];
}
// print solution
printf("The solution is :\n");
for(i=0; i<N; i++)
{
printf("%6.2f ",A[i][N]);
}
printf("\n");
}
Linux中的线程
进程和线程都是由clone()系统调用创建的具有以下原型
int clone(int (*fn)(void*),void *child_stack,int flags,void *arg)
问题及解决方法
信号量与条件变量的区别:
条件变量、互斥锁——主要用于线程间通信
pthread_cond_wait()
pthread_cond_wait(&m_cond,&m_mutex); 指的是等待条件变量,总和一个互斥锁结合使用。
pthread_cond_wait() 函数执行时先自动释放指定的互斥锁,然后等待条件变量的变化;在函数调用返回之前(即wait成功获得cond条件的时候),会自动将指定的互斥量重新锁住(即在“等待的条件变量满足条件时,会重新锁住指定的锁”)。
苏格拉底挑战
-
线程
-
互斥量