首页 > 其他分享 >Go组件库总结之协程睡眠唤醒

Go组件库总结之协程睡眠唤醒

时间:2023-03-03 16:46:33浏览次数:54  
标签:之协程 waker Waker 组件 atomic func sleeper Go 唤醒

本篇文章我们用Go封装一个利用gopark和goready实现协程睡眠唤醒的库。文章参考自:https://github.com/brewlin/net-protocol

1.gopark和goready的声明

//go:linkname gopark runtime.gopark
func gopark(unlockf func(uintptr, *uintptr) bool, wg *uintptr, reason string, traceEv byte, traceskip int)

//go:linkname goready runtime.goready
func goready(g uintptr, traceskip int)

2.sleeper的封装

type Sleeper struct {
    sharedList unsafe.Pointer  // wakers共享的唤醒队列
    localList *Waker           // 只供sleeper访问的唤醒队列
    allWakers *Waker           // 所有wakers
    waitingG  uintptr          // 用于持有正在睡眠的G
}

3.waker的封装

type Waker struct {
    // s 是waker能唤醒的sleeper,有三种取值:
    // nil -- waker未和sleeper绑定
    // &assertedSleeper -- waker处于唤醒状态
    // otherwise -- 和sleeper绑定,但未被唤醒
    s unsafe.Pointer
    next *Waker           // 用于链接sharedList的wakers
    allWakersNext *Waker  // 用于链接所有wakers
    id int                // waker唤醒sleeper后返回给sleeper的标识
}

4.辅助变量和函数

const (
    preparingG = 1  // 用于表明sleeper准备睡眠
)

var (
    assertedSleeper Sleeper // 哨兵sleeper,其指针被存储在asserted waker中,即w.s
)

func usleeper(s *Sleeper) unsafe.Pointer {
    return unsafe.Pointer(s)
}

func uwaker(w *Waker) unsafe.Pointer {
    return unsafe.Pointer(w)
}

5.sleeper添加waker

func (s *Sleeper) AddWaker(w *Waker, id int) {
    // 添加waker到allwakers队列
    w.allWakersNext = s.allWakers
    s.allWakers = w
    w.id = id

    for {
        p := (*Sleeper)(atomic.LoadPointer(&w.s))
        // 如果waker处于唤醒状态, 则准备唤醒sleeper
        if p == &assertedSleeper {
            s.enqueueAssertedWaker(w)
            return
        }
        // 关联waker到sleeper
        if atomic.CompareAndSwapPointer(&w.s, usleeper(p), usleeper(s)) {
            return
        }
    }
}

6.sleeper获取唤醒状态的waker

// Fetch 取下一个唤醒状态的waker, 返回其关联的id
func (s *Sleeper) Fetch(block bool) (id int, ok bool) {
    for {
        w := s.nextWaker(block)
        if w == nil {
            return -1, false
        }

        // 判断waker是否处于唤醒状态(可能被Clear清除唤醒状态)
        old := (*Sleeper)(atomic.SwapPointer(&w.s, usleeper(s)))
        if old == &assertedSleeper {
            return w.id, true
        }
    }
}

// nextWaker 返回唤醒队列的下一个waker,block决定是否阻塞
func (s *Sleeper) nextWaker(block bool) *Waker {
    // 如果locallist为空,尝试填充
    if s.localList == nil {
        // 如果sharedList为空
        for atomic.LoadPointer(&s.sharedList) == nil {
            // 非阻塞直接返回
            if !block {
                return nil
            }

            // 告知wakers准备睡眠
            atomic.StoreUintptr(&s.waitingG, preparingG)

            // 睡眠前检查是否有新的waker
            if atomic.LoadPointer(&s.sharedList) != nil {
                atomic.StoreUintptr(&s.waitingG, 0)
                break
            }

            // Try to commit the sleep and report it to the
            // tracer as a select.
            //
            // gopark puts the caller to sleep and calls
            // commitSleep to decide whether to immediately
            // wake the caller up or to leave it sleeping.
            const traceEvGoBlockSelect = 24
            gopark(commitSleep, &s.waitingG, "sleeper", traceEvGoBlockSelect, 0)
        }

        // 将shared list转移到local list. (注意两次都是头插法,所以最早的waker在队列最前)
        v := (*Waker)(atomic.SwapPointer(&s.sharedList, nil))
        for v != nil {
            cur := v
            v = v.next

            cur.next = s.localList
            s.localList = cur
        }
    }

    // 移除localList队首的waker并返回
    w := s.localList
    s.localList = w.next

    return w
}

7.waker唤醒

func (w *Waker) Assert() {
    if atomic.LoadPointer(&w.s) == usleeper(&assertedSleeper) {
        return
    }

    // 标记waker为唤醒状态
    switch s := (*Sleeper)(atomic.SwapPointer(&w.s, usleeper(&assertedSleeper))); s {
    case nil:
    case &assertedSleeper:
    default:
        s.enqueueAssertedWaker(w)
    }
}

func (s *Sleeper) enqueueAssertedWaker(w *Waker) {
    // 将一个唤醒的waker添加到sharedList
    for {
        v := (*Waker)(atomic.LoadPointer(&s.sharedList))
        w.next = v
        if atomic.CompareAndSwapPointer(&s.sharedList, uwaker(v), uwaker(w)) {
            break
        }
    }

    for {
        // 如果G不在等待状态,不用唤醒
        g := atomic.LoadUintptr(&s.waitingG)
        if g == 0 {
            return
        }

        // 唤醒处于等待状态(非准备睡眠状态)的G
        if atomic.CompareAndSwapUintptr(&s.waitingG, g, 0) {
            if g != preparingG {
                goready(g, 0)
            }
        }
    }
}

8.其他功能方法

// Done 停用sleeper,并释放其wakers以遍其他sleeper复用
func (s *Sleeper) Done() {
    // 移除所有w.s指向sleeper的waker, 其他的移入pending队列
    var pending *Waker
    w := s.allWakers
    for w != nil {
        next := w.allWakersNext
        for {
            t := atomic.LoadPointer(&w.s)
            if t != usleeper(s) {
                w.allWakersNext = pending
                pending = w
                break
            }

            if atomic.CompareAndSwapPointer(&w.s, t, nil) {
                break
            }
        }
        w = next
    }

    // 等待其他的waker唤醒sleeper
    for pending != nil {
        pulled := s.nextWaker(true)

        prev := &pending
        for w := *prev; w != nil; w = *prev {
            if pulled == w {
                *prev = w.allWakersNext
                break
            }
            prev = &w.allWakersNext
        }
    }
    s.allWakers = nil
}

// Clear 清除waker的唤醒状态
func (w *Waker) Clear() bool {
    if atomic.LoadPointer(&w.s) != usleeper(&assertedSleeper) {
        return false
    }
    return atomic.CompareAndSwapPointer(&w.s, usleeper(&assertedSleeper), nil)
}

// IsAsserted 返回waker是否处于唤醒状态
func (w *Waker) IsAsserted() bool {
    return (*Sleeper)(atomic.LoadPointer(&w.s)) == &assertedSleeper
}

9.commitSleep的定义

commit_asm.go

// +build amd64

package sleep

// See commit_noasm.go for a description of commitSleep.
func commitSleep(g uintptr, waitingG *uintptr) bool

10.commitSleep的实现

commit_amd64.s

#include "textflag.h"

#define preparingG 1

// See commit_noasm.go for a description of commitSleep.
//
// func commitSleep(g uintptr, waitingG *uintptr) bool
TEXT ·commitSleep(SB),NOSPLIT,$0-24
    MOVQ waitingG+8(FP), CX
    MOVQ g+0(FP), DX

    // Store the G in waitingG if it's still preparingG. If it's anything
    // else it means a waker has aborted the sleep.
    MOVQ $preparingG, AX
    LOCK
    CMPXCHGQ DX, 0(CX)

    SETEQ AX
    MOVB AX, ret+16(FP)

    RET

11.使用示例

const (
    wakerForA = iota
    wakerForB
)

func main() {
    s := sleep.Sleeper{}
    wakerA := &sleep.Waker{}
    wakerB := &sleep.Waker{}
    s.AddWaker(wakerA, wakerForA)
    s.AddWaker(wakerB, wakerForB)
    defer s.Done()

    go func() {
        for {
            wakerA.Assert()
            time.Sleep(time.Second)
        }
    }()

    go func() {
        for {
            wakerB.Assert()
            time.Sleep(time.Second * 2)
        }
    }()


    for {
        index, _ := s.Fetch(true)

        switch index {
        case wakerForA:
            fmt.Println("wakeA")
        case wakerForB:
            fmt.Println("wakeB")
        }
    }
}

标签:之协程,waker,Waker,组件,atomic,func,sleeper,Go,唤醒
From: https://www.cnblogs.com/qxcheng/p/17176181.html

相关文章

  • 【前端开发】一个滑动滑块校验登录的组件思路(用vue写的)
    <template><el-dialog:visible.sync="dialogVisible"custom-class="slideVerifyDialog":close-on-click-modal="false"title="身份验证"widt......
  • SpringMVC_核心组件
    基础的四个组件。  一、DisapatcherServlet前端控制器,接受所有的请求。(配置为/则为所有不包括jsp的请求。/*则为所有请求)配置:在web.xml中配置一个前端控......
  • protobuf golang&&python序列化反序列化测试
    1.概要最近考虑采用protobuf来实现kafka消息传递,所以先测试一下golang和python之前序列化互通问题。由于go和python对于二进制的表示在ide层面是无法统一的,直接把python......
  • Django+vue 上传execl文件并解析
    Django+vue上传execl文件并解析VUE<template><el-buttontype="primary"class="but_list_but1"><inputtype="file"name="avatar"id="avatar"style="display......
  • Study for Go! Chapter one - Type
    StudyforGo!-Type1.Variable关键字为"var"自动初始化为二进制零值编译器推测数据类型变量类型放在变量名之后可以一次性定义多个变量且可以是不同......
  • 适用于 .NET 的开源文本差异对比组件
    适用于.NET的开源文本差异对比组件DotNet大王源码资料,微信zhaoxi965,有问必复​关注他 1人赞同了该文章对于开发人员来说,Git是我们经常使用......
  • 从0搭建Vue3组件库(三): 组件库的环境配置
    本篇文章将在项目中引入typescript,以及手动搭建一个用于测试组件库组件Vue3项目因为我们是使用Vite+Ts开发的是Vue3组件库,所以我们需要安装typescript、vue3,同......
  • React+ReactToolKit中,如何在组件外部访问Redux的store,Uncaught ReferenceError: Cann
    之前在项目开发中,遇到一个问题,需要在axios的二次封装文件中,进行拦截,需要使用到redux的dispath派发但是在axios封装的文件中,直接引入store,出现了如下报错:UncaughtReferen......
  • vue3组件透传
    <template><divclass="empty-box"><el-emptydescription="暂无数据"v-bind="$attrs"><templatev-for="(item,key)in$slots"#[key]><slot......
  • Gossip
    共识性算法GossipGossip也叫EpidemicProtocol(流行病协议),这个协议基于最终一致性以及去中心化设计思想。主要用于分布式节点之间进行信息交换和数据同步,这种场景的一......