首页 > 其他分享 >常见的并非模型

常见的并非模型

时间:2024-09-13 14:25:12浏览次数:9  
标签:wg goroutine 模型 常见 channel func go 并非 Wait

在并行编程中,处理循环迭代时常用的并发模型有几种:

  1. Worker Pool(工作池)

    • 描述:创建固定数量的工作 goroutine,这些 goroutine 从共享的任务队列中获取任务并执行。
    • 优点:控制并发量,避免过多 goroutine 导致资源耗尽。
    • 示例
      package main
      
      import (
          "fmt"
          "sync"
      )
      
      func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
          defer wg.Done()
          for job := range jobs {
              fmt.Printf("Worker %d processing job %d\n", id, job)
              results <- job * 2
          }
      }
      
      func main() {
          const numWorkers = 3
          jobs := make(chan int, 10)
          results := make(chan int, 10)
          var wg sync.WaitGroup
      
          // Start worker goroutines
          for i := 1; i <= numWorkers; i++ {
              wg.Add(1)
              go worker(i, jobs, results, &wg)
          }
      
          // Send jobs
          for j := 1; j <= 10; j++ {
              jobs <- j
          }
          close(jobs)
      
          // Wait for all workers to finish
          go func() {
              wg.Wait()
              close(results)
          }()
      
          // Collect results
          for result := range results {
              fmt.Println("Result:", result)
          }
      }
  2. Map/Reduce

    • 描述:将数据集合分成多个部分并行处理,然后将结果合并。
    • 优点:适合大规模数据处理任务。
    • 示例
      package main
      
      import (
          "fmt"
          "sync"
      )
      
      func mapWorker(data []int, results chan<- int, wg *sync.WaitGroup) {
          defer wg.Done()
          sum := 0
          for _, v := range data {
              sum += v
          }
          results <- sum
      }
      
      func main() {
          data := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
          chunkSize := 2
          var wg sync.WaitGroup
          results := make(chan int, len(data)/chunkSize)
      
          // Divide data into chunks and process
          for i := 0; i < len(data); i += chunkSize {
              end := i + chunkSize
              if end > len(data) {
                  end = len(data)
              }
              wg.Add(1)
              go mapWorker(data[i:end], results, &wg)
          }
      
          // Wait for all map workers to finish
          go func() {
              wg.Wait()
              close(results)
          }()
      
          // Collect results
          total := 0
          for result := range results {
              total += result
          }
      
          fmt.Println("Total:", total)
      }
  3. Fan-Out/Fan-In

    • 描述:多个 goroutine 处理来自同一通道的数据(Fan-Out),然后将结果汇总到一个通道中(Fan-In)。
    • 优点:可以有效利用 CPU 核心,实现数据流的处理。
    • 示例
      package main
      
      import (
          "fmt"
          "sync"
      )
      
      func generateNumbers(n int, out chan<- int) {
          for i := 0; i < n; i++ {
              out <- i
          }
          close(out)
      }
      
      func squareNumbers(in <-chan int, out chan<- int, wg *sync.WaitGroup) {
          defer wg.Done()
          for num := range in {
              out <- num * num
          }
      }
      
      func main() {
          numbers := make(chan int, 10)
          squared := make(chan int, 10)
          var wg sync.WaitGroup
      
          // Generate numbers
          go generateNumbers(10, numbers)
      
          // Start worker goroutines
          for i := 0; i < 3; i++ {
              wg.Add(1)
              go squareNumbers(numbers, squared, &wg)
          }
      
          // Close squared channel once all workers are done
          go func() {
              wg.Wait()
              close(squared)
          }()
      
          // Print squared numbers
          for result := range squared {
              fmt.Println(result)
          }
      }

这些模型可以根据任务的性质和要求选择合适的实现,来优化并发处理的性能和效率。

   
经典案例:生成图像缩略图并计算这些缩略图的总大小
func main() {
    images := []string{
        "/Users/xxx/Pictures/avatar/1.jpeg",
        "/Users/xxx/Pictures/avatar/2.jpeg",
        "/Users/xxx/Pictures/avatar/3.jpeg",
    }
    
    ch := make(chan string)
    go func() {
        defer close(ch)
        for _, filename := range images {
            ch <- filename
            log.Println(filename + " sent .")
        }
    }()
    result := makeThumbnails6(ch)
    log.Printf("Total size of thumbnails: %d bytes\n", result)

}

func makeThumbnails6(filenames <-chan string) int64 {
    sizes := make(chan int64)
    var wg sync.WaitGroup
    for f := range filenames {

        log.Printf("%v receive .", f)
        wg.Add(1)
        go func(f string) {
            defer wg.Done()
            thumb, err := thumbnail.ImageFile(f)
            if err != nil {
                log.Println(err)
                return
            }
            info, _ := os.Stat(thumb)
            sizes <- info.Size()
        }(f)
    }
    go func() {
        wg.Wait()
        close(sizes)
    }()
    var total int64
    for size := range sizes {
        total += size
    }
    return total
}

 

这里面有几点需要注意的: 1: 闭包的变量: 这里的闭包函数把f 传递进去了,防止改变外部的f
 go func(f string) {
            defer wg.Done()
            thumb, err := thumbnail.ImageFile(f)
            if err != nil {
                log.Println(err)
                return
            }
            info, _ := os.Stat(thumb)
            sizes <- info.Size()
        }(f)

 

2:为什么使用阻塞的size channel?( 外部启动第2个goroutine的时候, makeThumbnails6里面的goroutine执行比较快就结束返回了,关闭了 size channel 怎么办

有一个点很好奇:在makeThumbnails6中如果两次for循环间隔太大,是否导致该函数提前退出。
比如说filenames chan每隔1分钟发送一个消息。但 for f := range filenames 中goroutine只需要半分钟就执行完,看起来会在第二次循环前导致wg.Wait()解除,sizes chan关闭,从而makeThumbnails6未等待filenames chan关闭就返回了)

   answer:不会;for f := range filenames 中启动的 goroutine 向 sizes 中发送消息会阻塞(无缓存channel),这样只有循环结束才会开始接收size信息(主goroutine),wg.done之后才会执行,

   即wg等待的goroutine数量只会一直增加到最大,然后开始减少,直到wg.wait.

   
   makeThumbnails6
函数中,sizes channel 用于接收处理后的文件大小。由于 sizes 是一个阻塞的 channel,发送操作(sizes <- info.Size())会阻塞直到有 goroutine 从 channel 中接收数据,

     这样可以避免在未处理完数据的情况下继续添加更多数据。

 

3: 这里为什么都是go func() { wg.Wait() close(squared) }() 不能直接在主进程中执行吗?

  go func() {
        wg.Wait()
        close(sizes)
    }()
    var total int64
    for size := range sizes {
        total += size
    }
 在并发编程中,使用 go func() { wg.Wait(); close(squared) }() 的主要目的是在主 goroutine 外部执行一些操作,以避免阻塞主进程。以下是具体原因:
  1. 避免阻塞主 goroutine

    • 在主 goroutine 中直接执行 wg.Wait(); close(squared) 会阻塞主进程,直到所有工作 goroutines 完成。如果这些工作 goroutines 处理的是耗时操作,主进程会因此停滞。
    • 使用 go func() 可以异步执行这个操作,让主 goroutine 继续执行其他操作,例如接收结果或处理其他逻辑,而不会被 wg.Wait() 阻塞。
  2. 确保关闭通道时所有 goroutines 已完成

    • wg.Wait() 确保在关闭通道之前,所有的工作 goroutines 都已完成任务。关闭通道操作需要在所有数据处理完成后执行,以避免数据丢失或读取错误。
    • 通过将关闭通道的操作放在一个新的 goroutine 中,确保在所有工作 goroutines 完成后才关闭通道,这样做可以避免主 goroutine 等待过程中的复杂性。
  3. 分析代码结构

    • 使用 go func() 可以将通道关闭的逻辑与主程序的其他逻辑分开,使代码结构更清晰和简洁。主 goroutine 可以专注于接收和处理结果,而关闭通道的逻辑在一个独立的 goroutine 中完成。
    • wg.Wait(): 这个调用会阻塞当前 goroutine,直到 WaitGroup 的计数器减到 0,表示所有添加到 WaitGroup 中的 goroutines 都已完成。(相当于一个常驻进场一直判断 WaitGroup 的计数器是否减到 0,到0 才会关闭channel)

    • channel关闭了, 下面的for range 循环才会退出.( for range channel是 Go 程序中的一个循环结构,它在当前的 goroutine 中运行 ,但是这个循环退出的条件要等channel关闭才会退出)

 

 

标签:wg,goroutine,模型,常见,channel,func,go,并非,Wait
From: https://www.cnblogs.com/-cyh/p/18412119

相关文章

  • LLM小白的成长之路—零基础怎么转行大模型?
    如何转大模型这块,分享一下目前我的经验。这篇文章是我之前几个月学习LLM知识的总结,我把看过的比较好的资料罗列下来,方便大家阅读。只要照着我写的路线按顺序学习,然后自己动手多做一些项目,或者参加比赛就可以0经验跨入大模型领域。毕竟我就是个例子,嘿嘿~想学习大语言模型,......
  • 用Python实现时间序列模型实战——Day 19: 时间序列中的异常检测与处理
    一、学习内容1.时间序列中的异常检测方法在时间序列分析中,异常检测是识别时间序列中不同于正常行为的点。这些异常点可能是由于数据记录错误、极端事件或系统故障引起的,常见的异常检测方法包括:基于统计的方法:Z-score:计算每个数据点与其均值的标准差距离,判断其是否为异常......
  • MySQL中常见的存储引擎有什么?
    MySQL中常见的存储引擎有什么?MySQL中有三种常见的引擎:InnoDB(默认),MyISAM,Memory。InnoDB存储引擎作为MySQL的默认存储引擎有很多特点:B+树作为索引结构,叶子节点上存放表中的数据,非叶子节点存放索引。支持事务ACID---->原子性,一致性,隔离性,持久性。事务隔离级别。(读未提交,读......
  • AI新时代揭幕 会“思考解题逻辑”的OpenAI推理大模型登场
    北京时间周五凌晨1时许,AI时代迎来崭新的起点——能够进行通用复杂推理的大模型终于走到台前。OpenAI在官网发布公告称,开始向全体订阅用户开始推送OpenAIo1预览模型——也就是此前被广泛期待的“草莓”大模型。OpenAI表示,对于复杂推理任务而言,新模型代表着人工智能能力的崭......
  • 突发:ChatGPT最新模型【 o1 】草莓终于上线了!
    OpenAI-o1的首次总结在阅读了OpenAI的出版物后,我对其本质特点进行了总结,并得出了以下结论:1.复杂问题的推理能力显著提升:OpenAI-o1在处理复杂问题时表现出色,尤其在逻辑任务方面。2.定期更新和改进:通过不断的训练,模型学会完善自己的思维过程,尝试不同的策略,并识别和纠正......
  • 大龄焦虑?老码农逆袭之路:拥抱大模型时代,焕发职业生涯新活力!
    其实我很早就对大龄程序员这个话题感到焦虑,担心自己35岁之后会面临失业,有时和亲戚朋友聊天时,也会经常拿这个出来调侃。现在身边已经有很多35岁左右的同事,自己过两年也会步入35岁的行列,反倒多了一份淡定和从容。如何看待35岁年龄危机35岁年龄特点强调一下,35岁并不是真正......
  • Java常见报错
    NoSuchElementException:一般都是数组或者集合的索引越界ConCurrentCheck(并发修改异常):因为集合中有自己的修改次数记录的变量,还有另一个记录地变量,一般这2个变量不一致,则会报错!mapkeyisrequired怎么解决:说明:MyBatis查询一些记录,数据涉及到两个表里的数据,需要连表查......
  • 火山引擎VeDI核心产品DataTester再进化,A/B大模型应用评测功能上线
    更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群近日,火山引擎A/B测试产品DataTester上线了A/B大模型应用评测功能。此次升级不仅强化了模型上线前的基础能力评测,还新增了针对线上使用场景的全面、系统检测与评估机制,为企业在模型应用的全周期管......
  • 适合初学者的[JAVA]:Redis(2:I/O多路复用模型与事件派发)
    目录说明前言I/O多路复用模型备注:用户空间和内核空间:备注:阻塞IO:(了解)非阻塞IO:(了解)IO多路复用:(重点)常见的方式有:差异:事件派发说明:Redis网络模型总结: 说明本文适合刚刚学习Java的初学者,也可以当成阿岩~的随手笔记.接下来就请道友们和我一起来......
  • OpenAI 推出专门用于解决复杂问题的模型 OpenAI o1
    2024年9月12日(当地时间),北京时间9月13日凌晨,OpenAI推出了OpenAIo1,这是一系列致力于解决复杂问题的新型AI模型。据说,这些模型在科学、编码和数学等领域的表现比以前的模型更好。本文将详细介绍OpenAIo1的功能、价格和使用方法。OpenAIo1是什么?OpenAIo......