首页 > 其他分享 >Golang常见的并发模式

Golang常见的并发模式

时间:2024-07-26 11:30:43浏览次数:18  
标签:订阅 sub 常见 chan Golang 并发 线程 func

Golang常见的并发模式


Go语言最吸引人的地方是它内建的并发支持

首先要明确一个概念:并发不是并行

  • 并发 更关注的是程序的设计层面,并发的程序完全是可以顺序执行的,只有在真正的多核CPU上才可能真正地同时运行。
  • 并行 更关注的是程序的运行层面,并行一般是简单的大量重复,例如GPU中对图像处理都会有大量的并行运算。

为更好的编写并发程序,从设计之初Go语言就注重如何在编程语言层级上设计一个简洁安全高效的抽象模型,让程序员专注于分解问题和组合方案,而且不用被线程管理信号互斥这些繁琐的操作分散精力

在并发编程中,对共享资源的正确访问需要精确的控制,在目前的绝大多数语言中,都是通过加锁等线程同步方案来解决这一困难问题,而Go语言却另辟蹊径,它将共享的值通过Channel传递(实际上多个独立执行的线程很少主动共享资源)。在任意给定的时刻,最好只有一个Goroutine能够拥有该资源。数据竞争从设计层面上就被杜绝了。为了提倡这种思考方式,Go语言将其并发编程哲学化为一句口号:

Do not communicate by sharing memory; instead, share memory by communicating.
不要通过共享内存来通信,而应通过通信来共享内存。

这是更高层次的并发编程哲学(通过管道来传值是Go语言推荐的做法)。虽然像引用计数这类简单的并发问题通过原子操作或互斥锁就能很好地实现,但是通过Channel来控制访问能够让你写出更简洁正确的程序。

1.6.1 并发版本的Hello world

我们先以在一个新的Goroutine中输出“Hello world”,main等待后台线程输出工作完成之后退出,这样一个简单的并发程序作为热身

func main() {
    var mu sync.Mutex

    mu.Lock()
    go func(){
        fmt.Println("你好, 世界")
        mu.Unlock()
    }()

    mu.Lock()
}

使用sync.Mutex互斥锁同步是比较低级的做法。我们现在改用无缓存的管道来实现同步:

func main() {
    done := make(chan int)

    go func(){
        fmt.Println("你好, 世界")
        <-done
    }()

    done <- 1
}

基于带缓存的管道,我们可以很容易将打印线程扩展到N个。下面的例子是开启10个后台线程分别打印:

func main() {
    var wg sync.WaitGroup

    // 开N个后台打印线程
    for i := 0; i < 10; i++ {
        wg.Add(1)

        go func() {
            fmt.Println("你好, 世界")
            wg.Done()
        }()
    }

    // 等待N个后台线程完成
    wg.Wait()
}

1.6.2 生产者消费者模型

并发编程中最常见的例子就是生产者消费者模式,该模式主要通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。简单地说,就是生产者生产一些数据,然后放到成果队列中,同时消费者从成果队列中来取这些数据。这样就让生产消费变成了异步的两个过程。当成果队列中没有数据时,消费者就进入饥饿的等待中;而当成果队列中数据已满时,生产者则面临因产品挤压导致CPU被剥夺的下岗问题

// 生产者: 生成 factor 整数倍的序列
func Producer(factor int, out chan<- int) {
    for i := 0; ; i++ {
        out <- i*factor
    }
}

// 消费者
func Consumer(in <-chan int) {
    for v := range in {
        fmt.Println(v)
    }
}
func main() {
    ch := make(chan int, 64) // 成果队列

    go Producer(3, ch) // 生成 3 的倍数的序列
    go Producer(5, ch) // 生成 5 的倍数的序列
    go Consumer(ch)    // 消费 生成的队列

    // 运行一定时间后退出
    time.Sleep(5 * time.Second)
}

我们可以让main函数保存阻塞状态不退出,只有当用户输入Ctrl-C时才真正退出程序:

func main() {
    ch := make(chan int, 64) // 成果队列

    go Producer(3, ch) // 生成 3 的倍数的序列
    go Producer(5, ch) // 生成 5 的倍数的序列
    go Consumer(ch)    // 消费 生成的队列

    // Ctrl+C 退出
    sig := make(chan os.Signal, 1)
    signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
    fmt.Printf("quit (%v)\n", <-sig)
}

1.6.3 发布订阅模型

发布订阅(publish-and-subscribe)模型通常被简写为pub/sub模型。在这个模型中,消息生产者成为发布者(publisher),而消息消费者则成为订阅者(subscriber),生产者和消费者是M:N的关系。在传统生产者和消费者模型中,是将消息发送到一个队列中,而发布订阅模型则是将消息发布给一个主题。

为此,我们构建了一个名为pubsub的发布订阅模型支持包:

// Package pubsub implements a simple multi-topic pub-sub library.
package pubsub

import (
    "sync"
    "time"
)

type (
    subscriber chan interface{}         // 订阅者为一个管道
    topicFunc  func(v interface{}) bool // 主题为一个过滤器
)

// 发布者对象
type Publisher struct {
    m           sync.RWMutex             // 读写锁
    buffer      int                      // 订阅队列的缓存大小
    timeout     time.Duration            // 发布超时时间
    subscribers map[subscriber]topicFunc // 订阅者信息
}

// 构建一个发布者对象, 可以设置发布超时时间和缓存队列的长度
func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
    return &Publisher{
        buffer:      buffer,
        timeout:     publishTimeout,
        subscribers: make(map[subscriber]topicFunc),
    }
}

// 添加一个新的订阅者,订阅全部主题
func (p *Publisher) Subscribe() chan interface{} {
    return p.SubscribeTopic(nil)
}

// 添加一个新的订阅者,订阅过滤器筛选后的主题
func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
    ch := make(chan interface{}, p.buffer)
    p.m.Lock()
    p.subscribers[ch] = topic
    p.m.Unlock()
    return ch
}

// 退出订阅
func (p *Publisher) Evict(sub chan interface{}) {
    p.m.Lock()
    defer p.m.Unlock()

    delete(p.subscribers, sub)
    close(sub)
}

// 发布一个主题
func (p *Publisher) Publish(v interface{}) {
    p.m.RLock()
    defer p.m.RUnlock()

    var wg sync.WaitGroup
    for sub, topic := range p.subscribers {
        wg.Add(1)
        go p.sendTopic(sub, topic, v, &wg)
    }
    wg.Wait()
}

// 关闭发布者对象,同时关闭所有的订阅者管道。
func (p *Publisher) Close() {
    p.m.Lock()
    defer p.m.Unlock()

    for sub := range p.subscribers {
        delete(p.subscribers, sub)
        close(sub)
    }
}

// 发送主题,可以容忍一定的超时
func (p *Publisher) sendTopic(
    sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup,
) {
    defer wg.Done()
    if topic != nil && !topic(v) {
        return
    }

    select {
    case sub <- v:
    case <-time.After(p.timeout):
    }
}

下面的例子中,有两个订阅者分别订阅了全部主题和含有"golang"的主题:

import "path/to/pubsub"

func main() {
    p := pubsub.NewPublisher(100*time.Millisecond, 10)
    defer p.Close()

    all := p.Subscribe()
    golang := p.SubscribeTopic(func(v interface{}) bool {
        if s, ok := v.(string); ok {
            return strings.Contains(s, "golang")
        }
        return false
    })

    p.Publish("hello,  world!")
    p.Publish("hello, golang!")

    go func() {
        for  msg := range all {
            fmt.Println("all:", msg)
        }
    } ()

    go func() {
        for  msg := range golang {
            fmt.Println("golang:", msg)
        }
    } ()

    // 运行一定时间后退出
    time.Sleep(3 * time.Second)
}

在发布订阅模型中,每条消息都会传送给多个订阅者。发布者通常不会知道、也不关心哪一个订阅者正在接收主题消息。订阅者和发布者可以在运行时动态添加,是一种松散的耦合关系,这使得系统的复杂性可以随时间的推移而增长。在现实生活中,像天气预报之类的应用就可以应用这个并发模式。

1.6.4 控制并发数

很多用户在适应了Go语言强大的并发特性之后,都倾向于编写最大并发的程序,因为这样似乎可以提供最大的性能。在现实中我们行色匆匆,但有时却需要我们放慢脚步享受生活,并发的程序也是一样:有时候我们需要适当地控制并发的程度,因为这样不仅仅可给其它的应用/任务让出/预留一定的CPU资源,也可以适当降低功耗缓解电池的压力。

在Go语言自带的godoc程序实现中有一个vfs的包对应虚拟的文件系统,在vfs包下面有一个gatefs的子包,gatefs子包的目的就是为了控制访问该虚拟文件系统的最大并发数gatefs包的应用很简单:

import (
    "golang.org/x/tools/godoc/vfs"
    "golang.org/x/tools/godoc/vfs/gatefs"
)

func main() {
    fs := gatefs.New(vfs.OS("/path"), make(chan bool, 8))
    // ...
}

其中vfs.OS("/path")基于本地文件系统构造一个虚拟的文件系统,然后gatefs.New基于现有的虚拟文件系统构造一个并发受控的虚拟文件系统。并发数控制的原理在前面一节已经讲过,就是通过带缓存管道的发送和接收规则来实现最大并发阻塞:

var limit = make(chan int, 3)

func main() {
    for _, w := range work {
        go func() {
            limit <- 1
            w()
            <-limit
        }()
    }
    select{}
}

不过gatefs对此做一个抽象类型gate,增加了enterleave方法分别对应并发代码的进入和离开。当超出并发数目限制的时候,enter方法会阻塞直到并发数降下来为止。

type gate chan bool

func (g gate) enter() { g <- true }
func (g gate) leave() { <-g }

gatefs包装的新的虚拟文件系统就是将需要控制并发的方法增加了enterleave调用而已:

type gatefs struct {
    fs vfs.FileSystem
    gate
}

func (fs gatefs) Lstat(p string) (os.FileInfo, error) {
    fs.enter()
    defer fs.leave()
    return fs.fs.Lstat(p)
}

1.6.5 赢者为王

采用并发编程的动机有很多:

  • 并发编程可以简化问题,比如一类问题对应一个处理线程会更简单;
  • 并发编程还可以提升性能,在一个多核CPU上开2个线程一般会比开1个线程快一些。
  • 其实对于提升性能而言,程序并不是简单地运行速度快就表示用户体验好的;很多时候程序能快速响应用户请求才是最重要的,当没有用户请求需要处理的时候才合适处理一些低优先级的后台任务。

假设我们想快速地搜索“golang”相关的主题,我们可能会同时打开Bing、Google或百度等多个检索引擎。当某个搜索最先返回结果后,就可以关闭其它搜索页面了,我们可以采用类似的策略来编写这个程序:

func main() {
    ch := make(chan string, 32)

    go func() {
        ch <- searchByBing("golang")
    }()
    go func() {
        ch <- searchByGoogle("golang")
    }()
    go func() {
        ch <- searchByBaidu("golang")
    }()

    fmt.Println(<-ch)
}

最终我们只从管道取第一个结果,也就是最先返回的结果。通过适当开启一些冗余的线程,尝试用不同途径去解决同样的问题,最终以赢者为王的方式提升了程序的相应性能。

标签:订阅,sub,常见,chan,Golang,并发,线程,func
From: https://www.cnblogs.com/zakun/p/18324789

相关文章

  • conda常见命令
    以下是一些常见的conda命令及其用途,它们可以帮助你管理Conda环境和包:环境管理创建新环境:condacreate--nameenv_namepython=3.8创建一个名为env_name的环境,并指定Python版本(如3.8)。激活环境:condaactivateenv_name激活名为env_name的环境。停用当前环境......
  • 常见Linux命令
    重要(1)top:查看内存/显示系统当前进程信息(2)df-h:查看磁盘存储状况(3)iotop:查看IO读写(4)iotop-o:直接查看比较高的磁盘读写(5)netstat-tunlp|grep端口好:查看端口占用情况(6)netstat-a:列出所有端口(7)lsof-i:端口号:查看端口号占用情况(8)uptime: 查看报告系统运行时长及平均......
  • Java并发编程(一)
    Java并发编程(一)1、在java中守护线程和本地线程区别java中的线程分为两种:守护线程(Daemon)和用户线程(User)任何线程都可以设置为守护线程和用户线程,通过方法Thread.setDaemon(boolon);true则把该线程设置为守护线程,反之则为用户线程。Thread.setDaemon()必须在Thread.st......
  • 上、下拉电阻(定义、强弱上拉、常见作用、吸电流、拉电流、灌电流)
    上、下拉电阻(定义、强弱上拉、常见作用、吸电流、拉电流、灌电流)上、下拉电阻定义上拉电阻是把一个信号通过一个电阻接到电源(Vcc),下拉电阻是一个信号通过一个电阻接到地(GND)。强上拉、弱上拉强上拉、弱上拉的强弱只是上拉电阻的阻值不同,没有什么严格区分。例如:50Ω上拉,一般......
  • 常见日志输出目标(Logback | Log4j2 | Java Util Logging)
    常见日志输出目标控制台:日志可以被输出到控制台(终端),通常用于开发和调试阶段。在日志框架中,控制台输出通常由ConsoleAppender(例如Log4j、Logback)配置。日志文件:日志也可以被写入到日志文件中,以便于长期存储和分析。在日志框架中,文件输出通常由FileAppender(例如Log4j、......
  • C语言常见操作符(补充)
    前言:在这篇文章中,我们将会认识到更多的C语言操作符,并通过简单的介绍和相应的代码,来更好的了解它们。位操作符:位操作符在计算机中以二进制的形式,进行运算。&按位与                                         |按位或   ......
  • 首页高并发案例遇到的问题以及解决办法
    要通过openresty,ngin,MySQL和redis来部署一个项目首先需要安装以上工具安装openrestyyumintsallyum-utilsyum-config-manager--add-repohttps://openresty.org/package/centos/openresty.repo首先执行这两行代码然后就可以执行安装命令yuminstallopenresty-y在安......
  • 【golang设计模式】—— 简单工厂模式
    模式定义简单工厂模式(SimpleFactoryPattern):又称为静态工厂方法(StaticFactoryMethod)模式,它属于类创建型模式。在简单工厂模式中,可以根据参数的不同返回不同类的实例。简单工厂模式专门定义一个类来负责创建其他类的实例,被创建的实例通常都具有共同的父类。模式结构工......
  • Kotlin协程:现代并发编程的艺术
    引言随着软件系统变得越来越复杂,处理异步操作的需求也随之增加。传统的多线程模型虽然有效,但在某些情况下会导致过多的资源消耗和复杂的控制流。Kotlin协程提供了一种优雅的方式来管理这些异步任务,并且极大地简化了代码结构。协程简介协程是一种轻量级的线程,允许程序在执行过......
  • 高并发场景下,布隆过滤器+缓存基本步骤
    在高并发场景下,布隆过滤器与缓存的结合使用可以显著提升系统的响应速度和降低后端数据库的负载。以下是布隆过滤器与缓存(如Redis)结合使用的一般配置步骤和原理:1.原理布隆过滤器作为预检查:在查询缓存或数据库之前,先使用布隆过滤器检查请求的键是否可能存在于缓存或数据库中......