|第4章|
并发编程
并行计算导论
基于分治原则(如二叉树查找和快速排序等)的算法经常表现出高度的行性,可通过使用并行或并发执行来提高计算速度。并行计算是一种计算方案,它尝试使用多个执行并行算法的处理器更快速地解决问题。
-
顺序算法与并行算法
用一个begin-end代码块列出代码
-
并行性与并发性
并行算法只识别可并行执行的任务。在理想情况下,并行算法中的所有任务都应该同时实施执行。
线程
-
线程的原理
一个操作系统(OS)包含许多并发进程。在进程模型中,进程是独立的执行单元。所有进程均在内核模式或用户模式下执行。在内核模式下,各进程在唯一地址空间上执行,与其他进程是分开的。虽然每个进程都是一个独立的单元,但是它只有一个执行路径。当某进程必须等待某事件时,例如I/完成事件,它就会暂停,整个进程会停止执行。线程是某进程同一地址空间上的独立执行单元。创建某个进程就是在一个唯一地址空间创建一个主线程。当某进程开始时,就会执行该进程的主线程。如果只有一个主线程,那么进程和线程实际上并没有区别。但是,主线程可能会创建其他线程。每个线程又可以创建更多的线程等某进程的所有线程都在该进程的相同地址空间中执行。但每个线程都是一个独立的执行单元。在线程模型中,如果一个线程被挂起,其他线程可以继续执行。除了共享共同的地址空间之外,线程还共享进程的许多其他资源,如用户id、打开的文件描述符和信号等。 -
线程的优点
- 线程创建和切换速度更快
- 线程的响应速度更快
- 线程更适合并行运算
- 线程的缺点
- 由于地址空间共享,线程需要来自用户的明确同步。
- 许多函数可能对线程不安全,例如传统strtok0)函数将一个字符分成一连串令牌。通常,任何使用全局变量或依赖于静态内存内容的函数,线程都不安全。为了使库函数适应线程环境,还需要做大量的工作。
- 在单CPU系统上,使用线程解决问题实际上要比使用顺序程序慢,这是由在运行时创建线程和切换上下文的系统开销造成的。
线程操作
线程的执行轨迹与进程美似。线程可在内核误式或用E市S程在进程的相同地址空间中执行,但每个线程都有自己的执行堆栈。线程是独立的执行单元,可根据操作系统内核的调度策略,对内核进行系统调用,变为挂起、激活以继续执行等。为了利用线程的共享地址空间,操作系统内核的调度策略可能会优先选择同一进程中的线程,而不是不同进程中的线程。
线程管理函数
- 创建线程
使用pthread_create()创建线程,成功返回0,失败返回错误代码。
int pthread_create(pthread_t *pthread_id,pthread_attr_t *attr,void *(*func)(void*),void *arg)
pthreadid是指向pthreadt类型变量的指针。它会被操作系统内核分配的唯一线程ID填充。在POSIX中,pthread t是一种不透明的类型。程序员应该不知道不透明对象的内容,因为它可能取决于实现情况。线程可通过pthread selfO函数获得自己的ID。在Linux中pthread t类型被定义为无符号长整型,因此线程ID可以打印为%lu。
- attr 是指向另一种不透明数据类型的指针,它指定线程属性。
- func是要执行的新线程函数的入口地址。
- arg是指向线程函数参数的指针,可写为:
void *func(void *arg)
其中,attr参数最复杂。下面给出了atr参数的使用步骤
(1)定义一个pthread属性变量pthreadattr tattr。
(2)用pthread attrinit (&attr)初始化属性变量。
(3)设置属性变量并在pthread create0调用中使用。
(4)必要时,通过pthread attr destroy (&attr)释放attr 资源。
-
线程ID
线程ID是一种不透明的数据类型,取决于实现情况。因此,不应该直接比较线程ID。如果需要,可以使用pthread equal0函数对它们进行比较。
int pthread_equal(pthread_t t1,pthread_t t2);
如果是不同的线程,则返回0,否则返回非0。 -
线程终止
线程函数结束后,线程即终止。或者,线程可以调用函数
int pthread_exit (void *status);
进行显式终止,其中状态是线程的退出状态。通常,0退出值表示正常终止,非0值表示异常终止。 -
线程连接
一个线程可以等待另一个线程的终止,通过:
int pthread_join (pthread _t thread, void **status ptr);
终止线程的退出状态以status_ptr返回。
线程示例程序
- 线程计算矩阵的和
执行为gcc p451.c -pthread
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define N 4
int A[N][N], sum[N];
void *func(void *arg)
{
int j, row;
pthread_t tid = pthread_self(); // get thread ID number
row = (int)arg; // get row number from arg
printf("Thread %d [%lu] computes sum of row %d\n", row, tid, row);
for (j=0; j<N; j++)
{
sum[row] += A[row][j];
}
printf("Thread %d [%lu] done sum[%d] = %d\n",row, tid, row, sum[row]);
pthread_exit((void*)0); // thread exit: 0=normal termination
}
int main (int argc, char *argv[])
{
pthread_t thread[N]; // thread IDs
int i, j, r, total = 0;
void *status;
printf("Main: initialize A matrix\n");
for (i=0; i<N; i++)
{
sum[i] = 0;
for (j=0; j<N; j++)
{
A[i][j] = i*N + j + 1;
printf("%4d" ,A[i][j]);
}
printf("\n");
}
printf("Main: create %d threads\n", N);
for(i=0; i<N; i++)
{
pthread_create(&thread[i], NULL, func, (void *)i);
}
printf("Main: try to join with threads\n");
for(i=0; i<N; i++)
{
pthread_join(thread[i], &status);
printf("Main: joined with %d [%lu]: status=%d\n",i, thread[i], (int)status);
}
printf("Main: compute and print total sum:");
for (i=0; i<N; i++)
{
total += sum[i];
}
printf("tatal = %d\n", total);
pthread_exit(NULL);
}
- 用线程快速排序
通过线程实现一个并行快速排序程序。当程序启动时,它作为进程的主线程运行。主线程调用qsort(&arg),其中arg =[lowerbound=0,upperbound=N-1]。qsort()函数实现一个N个整数数组的快速排序。在qsort()中线程会选择一个基准元素将数组分成两部分,这样左边部分的所有元素都小于基准元素,右边部分的所有元素都大于基准元素。然后,它会创建两个子线程来对这两部分进行排序,并等待子线程完成。每个子线程通过相同的递归算法对自己的部分进行排序。当所有的子线程工作完成后,主线程继续工作。它会打印排序后的数组并终止。众所周知,快速排序的排序步骤数取决于未排序数据的量级,这将影响qsort程序所需的线程数。
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define N 10
typedef struct{
int upperbound;
int lowerbound;
}PARM;
int A[N]={5,1,6,4,7,2,9,8,0,3};
int print() // print current a[] contents
{
int i;
printf("[ ");
for (i=0; i<N; i++)
{
printf("%d ", A[i]);
}
printf("]\n");
}
void *qsort_1(void *aptr)
{
PARM *ap, aleft, aright;
int pivot, pivotIndex, left, right, temp;
int upperbound, lowerbound;
pthread_t me, leftThread, rightThread;
me = pthread_self();
ap = (PARM *)aptr;
upperbound = ap->upperbound;
lowerbound = ap->lowerbound;
pivot = A[upperbound];
left = lowerbound - 1;
right = upperbound;
if (lowerbound >= upperbound)
pthread_exit(NULL);
while (left < right)
{
do { left++;} while (A[left] < pivot);
do { right--;}while (A[right] > pivot);
if (left < right )
{
temp = A[left];
A[left] = A[right];
A[right] = temp;
}
}
print();
pivotIndex = left;
temp = A[pivotIndex];
A[pivotIndex] = pivot;
A[upperbound] = temp; // start the "recursive threads"
aleft.upperbound = pivotIndex - 1;
aleft.lowerbound = lowerbound;
aright.upperbound = upperbound;
aright.lowerbound = pivotIndex + 1;
printf("%lu: create left and right threads\n", me);
pthread_create(&leftThread, NULL, qsort_1, (void *)&aleft);
pthread_create(&rightThread, NULL, qsort_1, (void *)&aright);// wait for left and right threads
pthread_join(leftThread, NULL);
pthread_join(rightThread, NULL);
printf("%lu: joined with left & right threads\n", me);
}
int main(int argc, char *argv[])
{
PARM arg;
int i, *array;
pthread_t me, thread;
me = pthread_self();
printf("main %lu: unsorted array =" ,me);
print();
arg.upperbound = N-1;
arg.lowerbound = 0;
printf("main %lu create a thread to do QS\n", me);
pthread_create(&thread, NULL, qsort_1, (void *)&arg); // wait for QS thread to finish
pthread_join(thread, NULL);
printf("main %lu sorted array = ", me);
print();
}
线程同步
由于线程在进程的同一地址空间中执行,它们共享同一地址空间中的所有全局变量和数据结构。当多个线程试图修改同一共享变量或数据结构时,如果修改结果取决于线程的执行顺序,则称之为竞态条件。在并发程序中,绝不能有竞态条件。否则,结果可能不一致。除了连接操作之外,并发执行的线程通常需要相互协作。为了防止出现竞态条件并且支持线程协作,线程需要同步。通常,同步是一种机制和规则,用于确保共享数据对象的完整性和并发执行实体的协调性。它可以应用于内核模式下的进程,也可以应用于用户模式下的线程。
-
互斥量
最简单的同步工具是锁,它允许执行实体仅在有锁的情况下才能继续执行。在Pthread中,锁被称为互斥量,意思是相互排斥。互斥变量是用pthread mutex t类型声明的,在使用之前必须对它们进行初始化。有两种方法可以初始化互斥量。
(1)一种是静态方法,如:
pthread_mutex_t m =PTHREAD_MUTEX_INITIALIZER;
定义互斥量m,并使用默认属性对其进行初始化。
(2)另一种是动态方法,使用pthread mutex init0函数可通过attr参数设置互斥属性。 -
死锁防御
死锁是一个状态,在这种状态下,许多执行实体相互等待,无法继续进行下去。
解决死锁的方法:包括死锁预防、死锁规避、死锁检测和恢复等。
在实际系统中,唯一可行的是死锁预防。 -
条件变量
和互斥量一样,条件变量可以通过两种方法进行初始化。
(1)一种是静态方法,在声明时,如:
pthread_cond_t con = PTHREAD_COND_INITIALIZER;
定义一个条件变量con,并使用默认属性对其进行初始化。
(2)另一种是动态方法,使用pthread_cond_init()函数,可通过attr参数设置条件变量。为简便起见,我们总是使用NULL attr参数作为默认属性. -
生产者-消费者问题
使用线程和条件变量来实现一个简化版的生产者-消费者问题,也称有限缓冲问题。生产者-消费者问题通常将进程定义为执行实体,可看作当前上下文中的线程。下面是该问题的定义。一系列生产者和消费者进程共享数量有限的缓冲区。每个缓冲区每次有一个特定的项目。最开始,所有缓冲区都是空的。当一个生产者将一个项目放人一个空缓冲区时,该缓冲区就会变满。当一个消费者从一个满的缓冲区中获取一个项目时,该缓冲区就会变空。如果没有空缓冲区,生产者必须等待。同样,如果没有满缓冲区,则消费者必须等待。此外,当等待事件发生时,必须允许等待进程继续。
共享全局变量
int buf[NBUF];
int head,tail;
int data;
一个互斥量和两个条件变量
pthread_mutex_t mutex;
pthread_cond_t empty, full;
代码
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define NBUF 5
#define N 10
// shared global variables
int buf[NBUF]; // circular buffers
int head, tail; // indices
int data; // number of full buffers
pthread_mutex_t mutex;
pthread_cond_t empty, full;
int init()
{
head = tail = data = 0;
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&full, NULL);
pthread_cond_init(&empty, NULL);
}
void *producer()
{
int i;
pthread_t me = pthread_self();
for (i=0; i<N; i++) // try to put N items into buf[]
{
pthread_mutex_lock(&mutex); // lock mutex
if (data == NBUF)
{
printf ("producer %lu: all bufs FULL: wait\n", me);
pthread_cond_wait(&empty, &mutex); // wait
}
buf[head++] = i+1;
head %= NBUF;
data++;
printf("producer %lu: data=%d value=%d\n", me, data, i+1);
pthread_mutex_unlock (&mutex);
pthread_cond_signal(&full);
}
printf("producer %lu: exit\n", me);
}
void *consumer()
{
int i, c;
pthread_t me = pthread_self();
for (i=0; i<N; i++)
{
pthread_mutex_lock(&mutex); // lock mutex
if (data == 0)
{
printf ("consumer %lu: all bufs EMPTY: wait\n", me);
pthread_cond_wait(&full, &mutex); // wait
}
c = buf[tail++]; // get an item
tail %= NBUF;
data--; // dec data by 1
printf("consumer %lu: value=%d\n", me, c);
pthread_mutex_unlock(&mutex); // unlock mutex
pthread_cond_signal(&empty); // unblock a producer, if any
}
printf("consumer %lu: exit\n", me);
}
int main ()
{
pthread_t pro, con;
init();
printf("main: create producer and consumer threads\n");
pthread_create(&pro, NULL, producer, NULL);
pthread_create(&con, NULL, consumer, NULL);
printf("main: join with threads\n");
pthread_join(pro, NULL);
pthread_join(con, NULL);
printf("main: exit\n");
}
-
信号量
信号量是进程同步的一般机制。计数信号量是一种数据结构。 -
屏障
线程连接操作允许某线程(通常是主线程)等待其他线程终止。在等待的所有线程都终止后,主线程可创建新线程来继续执行并行程序的其余部分。创建新线程需要系统开销。在某些情况下,保持线程活动会更好,但应要求它们在所有线程都达到指定同步点之前不能继续活动。在 Pthreads 中,可以采用的机制是屏障以及一系列屏障函数。首先,主线程创建一个屏障对象
pthread_barrier_t barrier;
并且调用
pthread_barrier_init(&barrier NULL, nthreads);
用屏障中同步的线程数字对它进行初始化。然后,主线程创建工作线程来执行任务。工作线
程使用
pthread_barrier_wait( &barrier)
在屏障中等待指定数量的线程到达屏障。当最后一个线程到达屏障时,所有线程重新开始执行在这种情况下,屏障是线程的集合点,而不是它们的坟墓。我们用一个例子来说明屏障的使用。 -
linux中的线程
与许多其他操作系统不同,Linux不区分进程和线程。对于Linux内核,线程只是一个与其他进程共享某些资源的进程。在 Linux 中,进程和线程都是由 clone()系统调用创建的
具有以下原型:
int clone(int (*fn)(void *), void *child stack, int flags, void *arg)
可以看出,clone()更像是一个线程创建函数。它创建一个子进程来执行带有child_stack的函数fn(arg)。
GPT苏格拉底挑战
-
并行
-
线程
遇到的问题
在实现线程计算矩阵的和时,在执行程序时,遇到如下报错
bash: ./p451:无法执行二进制文件: 可执行文件格式错误
在仔细查阅书籍时,书上指明要使用
gcc p451.c -pthread