当多个协程同时访问和修改同一个共享资源(如切片)时,如果没有适当的同步机制,可能会导致数据竞争和不一致的结果。
package main import ( "fmt" "sync" ) func processChunk(chunk []int64, wg *sync.WaitGroup, failedList []int64) { defer wg.Done() fmt.Println("failedList======open1======", failedList) for _, uid := range chunk { // 将失败的UIDs添加到failedList failedList = append(failedList, uid) } fmt.Println("failedList======open2======", failedList) } func main() { // 假设UID数组 uidArr := []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} chunkSize := 500 var ( wg sync.WaitGroup failedList []int64 // 发送失败的全局uids ) fmt.Println("failedList======start=======", failedList) // 每500个UID启动一个协程进行数据更新 for i := 0; i < len(uidArr); i += chunkSize { end := i + chunkSize if end > len(uidArr) { end = len(uidArr) } wg.Add(1) go processChunk(uidArr[i:end], &wg, failedList) } fmt.Println("测试前") wg.Wait() // 等待所有协程执行完毕 fmt.Println("测试后,长度为:", len(failedList)) if len(failedList) != 0 { fmt.Println("failedList======end======", failedList) } }
原因:
在 Go 语言中,切片(slice)是引用类型,当将一个切片作为参数传递给一个函数时,实际上传递的是该切片的副本,包括它的底层数组、长度和容量。但是,这个副本的底层数组指针仍然指向原始的底层数组。
在提供代码中,failedList 切片被传递给 processMailChunk 函数。在 processMailChunk 函数内部,试图通过 append 函数修改 failedList。然而,这个修改仅影响了函数内部的 failedList 副本,并没有影响到函数外部的原始 failedList 切片。
这是因为 append 函数可能会返回一个新的切片,如果原切片没有足够的容量来容纳新的元素。在代码中,每次 append 都会导致 failedList 副本指向一个新的底层数组(或者至少是修改长度后的同一个底层数组的不同部分)。
要修复这个问题,需要在 processMailChunk 函数中返回一个修改后的 failedList,然后在主函数中更新全局的 failedList。但是,由于使用了多个协程,并且每个协程都试图修改同一个切片,这会导致竞态条件。
一个更好的方法是使用 sync.Map 或者 sync.Mutex 来保护对共享资源的访问。由于 failedList 是一个切片,使用 sync.Map 可能不太方便,所以我们可以使用 sync.Mutex 来保护对 failedList 的访问。
【processMailChunk 函数接收一个指向 failedList 的指针(*[]int64),以及一个指向 sync.Mutex 的指针】----通过传递地址实现共享资源的线程安全访问。
正解:
package main import ( "fmt" "sync" ) func processChunk(chunk []int64, wg *sync.WaitGroup, failedList *[]int64) { defer wg.Done() fmt.Println("failedList======open1======", failedList) for _, uid := range chunk { // 将失败的UIDs添加到failedList *failedList = append(*failedList, uid) // 关键位置 } fmt.Println("failedList======open2======", failedList) } func main() { // 假设UID数组 uidArr := []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} chunkSize := 500 var ( wg sync.WaitGroup failedList []int64 // 发送失败的全局uids ) fmt.Println("failedList======start=======", failedList) // 每500个UID启动一个协程进行数据更新 for i := 0; i < len(uidArr); i += chunkSize { end := i + chunkSize if end > len(uidArr) { end = len(uidArr) } wg.Add(1) go processChunk(uidArr[i:end], &wg, &failedList) } fmt.Println("测试前") wg.Wait() // 等待所有协程执行完毕 fmt.Println("测试后") if len(failedList) != 0 { fmt.Println("failedList======end======", failedList) } }
总结:
这个问题与在Go中使用sync.WaitGroup时传递其地址是类似的。在Go中,当使用sync.WaitGroup来等待一组协程完成时,需要传递*sync.WaitGroup(即WaitGroup的地址)给这些协程,以便它们能够调用Add和Done方法来增加或减少等待计数。这是因为Add和Done方法都是修改WaitGroup的内部状态,而这个状态是共享的。如果直接传递WaitGroup的值而不是它的地址,那么每个协程将会操作它自己的副本,而不是全局的WaitGroup实例,这将导致主函数无法正确等待所有协程完成。同样地,当需要在多个协程之间共享和修改一个切片时,也需要传递这个切片的地址(即*[]int64),以便所有协程都能操作同一个切片实例。否则,每个协程可能会操作它自己的切片副本,从而导致主函数无法看到其他协程所做的修改。
为了解决这个问题并确保线程安全,需要做两件事:
1) 传递共享资源的地址(无论是*sync.WaitGroup还是*[]int64)给协程,以便它们能够操作同一个实例。
2) 使用某种同步机制(如sync.Mutex)来保护对共享资源的访问,以避免数据竞争。