同步(Synchronization)涉及到在多线程或多进程环境中协调多个执行线程的执行顺序,以确保数据的一致性和完整性。同步的目的是防止多个线程同时访问同一资源(如内存、文件、数据库等)时发生的冲突和数据不一致问题。
实现方法:
-
互斥锁(Mutex):一种同步机制,用于保护共享资源不被多个线程同时访问。当一个线程获取了互斥锁,其他线程必须等待直到互斥锁被释放。
-
信号量(Semaphore):一种计数器,用于控制对共享资源的访问数量。信号量可以用于控制同时访问特定资源的线程数量。
-
条件变量(Condition Variable):与互斥锁一起使用的同步机制,允许线程在某些条件不满足时挂起(等待),并在条件满足时被唤醒。
一、互斥锁和条件变量
一个简单的现打印A再打印B的同步程序:
package main
import (
"fmt"
"sync"
)
func main() {
var mu sync.Mutex
cond := sync.NewCond(&mu)
var wg sync.WaitGroup
wg.Add(1)
flag := 0
go func() {
mu.Lock()
defer mu.Unlock()
if flag == 0 {
cond.Wait()
}
fmt.Println("B")
wg.Done()
}()
mu.Lock()
fmt.Println("A")
flag = 1
cond.Signal()
mu.Unlock()
wg.Wait()
}
生产者消费者问题:
package main
import (
"fmt"
"math/rand/v2"
"sync"
"time"
)
type ShareRes struct {
mu sync.Mutex
cond sync.Cond
queue []int //假设上限为10
}
func NewShareRes() *ShareRes {
ret := &ShareRes{
queue: make([]int, 0),
}
ret.cond = *sync.NewCond(&ret.mu)
return ret
}
func product(res *ShareRes) {
for {
res.mu.Lock()
for len(res.queue) > 10 {
res.cond.Wait()
}
randInt := rand.IntN(100)
res.queue = append(res.queue, randInt)
fmt.Println("i am producer,i push ", randInt, " to queue")
res.cond.Broadcast()
res.mu.Unlock()
time.Sleep(1 * time.Second)
}
}
func consume(id int, res *ShareRes) {
for {
res.mu.Lock()
for len(res.queue) == 0 {
res.cond.Wait()
}
num := res.queue[0]
res.queue = res.queue[1:]
fmt.Println("i am comsumer,i get ", num, " from queue,id :", id)
res.cond.Broadcast()
res.mu.Unlock()
}
}
func main() {
var wg sync.WaitGroup
wg.Add(1)
res := NewShareRes()
go product(res)
for i := 0; i < 3; i++ {
go consume(i, res)
}
wg.Wait()
}
二、信号量
Wait操作:这个操作会把信号量减去 1,相减后如果信号量 < 0,则表明资源已被占⽤,进程需阻塞等
待;相减后如果信号量 >= 0,则表明还有资源可使⽤,进程可正常继续执⾏。
Post操作:这个操作会把信号量加上 1,相加后如果信号量 <= 0,则表明当前有阻塞中的进程,于是
会将该进程唤醒运⾏;相加后如果信号量 > 0,则表明当前没有阻塞中的进程;
#include<semaphore.h>
#include<thread>
#include<iostream>
void ThreadFunc(void *arg){
sem_t *sem=(sem_t *)arg;
sem_wait(sem);//-1
std::cout<<"B"<<"\n";
}
int main(){
sem_t sem;
sem_init(&sem,0,0);
std::thread th(ThreadFunc,&sem);
std::cout<<"A"<<"\n";
sem_post(&sem);//+1
th.join();
sem_destroy(&sem);
return 0;
}
具体过程:
如果进程 B ⽐进程 A 先执⾏了,那么执⾏到 P 操作时,由于信号量初始值为 0,故信号量会变为 -1,表示进
程 A 还没⽣产数据,于是进程 B 就阻塞等待;
接着,当进程 A ⽣产完数据后,执⾏了 V 操作,就会使得信号量变为 0,于是就会唤醒阻塞在 P 操作的进程
B;
最后,进程 B 被唤醒后,意味着进程 A 已经⽣产了数据,于是进程 B 就可以正常读取数据了。
可以发现,信号初始化为 0 ,就代表着是同步信号量,它可以保证进程 A 应在进程 B 之前执⾏。
相关函数:
1. sem_init
用于初始化一个匿名信号量。
int sem_init(sem_t *sem, int pshared, unsigned int value);
- 参数:
sem
:指向信号量结构的指针。pshared
:共享标志位,0表示不共享(线程间同步),非0表示共享(进程间同步)。value
:信号量的初始值。
- 返回值:
- 成功:返回0。
- 失败:返回-1,并设置
errno
。
- 说明:此函数用于初始化一个无名信号量,通常用于线程间的同步。
2. sem_destroy
用于销毁一个匿名信号量。
- 函数原型:
-
int sem_destroy(sem_t *sem);
- 参数:
sem
:指向要销毁的信号量的指针。
- 返回值:
- 成功:返回0。
- 失败:返回-1,并设置
errno
。
- 说明:此函数用于销毁一个已经初始化的匿名信号量。
3. sem_post
用于释放(增加)信号量的值。
- 函数原型:
-
int sem_post(sem_t *sem);
- 参数:
sem
:指向具体信号量的指针。
- 返回值:
- 成功:返回0。
- 失败:返回-1,并设置
errno
。
- 说明:此函数用于增加信号量的值,如果有进程或线程因等待信号量的值大于0而被阻塞,此函数会唤醒它们。
4. sem_wait
用于等待(减少)信号量的值。
- 函数原型:
-
int sem_wait(sem_t *sem);
- 参数:
sem
:指向信号量的指针。
- 返回值:
- 成功:返回0。
- 失败:返回-1,并设置
errno
。
- 说明:此函数用于减少信号量的值,如果信号量的值大于0,则立即返回;如果信号量的值为0,则阻塞等待,直到信号量的值变为大于0。
5. sem_trywait
用于尝试等待信号量的值,非阻塞。
- 函数原型:
-
int sem_trywait(sem_t *sem);
- 参数:
sem
:指向信号量的指针。
- 返回值:
- 成功:返回0。
- 失败:返回-1,并设置
errno
(例如EAGAIN
表示信号量的值为0,无法减少)。
- 说明:此函数用于尝试减少信号量的值,如果信号量的值大于0,则减少并返回成功;如果信号量的值为0,则返回失败,不会阻塞。
6. sem_getvalue
用于获取信号量的当前值。
-
int sem_getvalue(sem_t *sem, int *valp);
- 参数:
sem
:指向信号量的指针。valp
:指向整数的指针,用于存储信号量的当前值。
- 返回值:
- 成功:返回0。
- 失败:返回-1,并设置
errno
。
- 说明:此函数用于获取信号量的当前值,并将其存储在
valp
指向的整数中。