1、线程与协程的关系
- 创建时默认的栈空间大小,JDK5以后 Java Thread Stack 默认大小为1M;Goroutine 的 Stack 初始化大小为 2K;
- Java Thread 和 内核系统线程是 1 : 1 关系;Goroutine 和 内核系统线程关系是 M : N关系;
Java操作的用户态线程实际是映射到了内核态的系统线程,当发生线程之间的切换时会引起内核系统线程之间的切换,就会涉及到运行状态(寄存器地址、程序计数器地址、栈空间变量地址等)的保存和切换;
而多个Goroutine对应一个或者多个系统线程,映射到同一个系统线程的协程间的切换消耗就很少了。
2、协程机制
Go 语言实现的线程处理器来控制多个协程G 在 系统线程 M 上的运行,多个协程任务存储在队列中按序执行。
-
那么如果某一个协程任务占用时间过长占住整个Process,其它协程任务就不能执行?
当Process开始处理任务时 Go会生成一个守护线程去统计每个Process处理完的任务数,如果某个Process统计的值长时间不变,那么就会给该协程任务栈中添加一个标记,当协程运行时遇到非内联函数就会遇到该标记,它会把自己中断下来放入到任务队尾等待执行。
-
另外当任务栈中某个协程需要中断等待系统调用(比如IO),为了提高并发率,Process会把自己移动到另外一个可执行队列中去处理任务,而当被中断的协程被唤醒时,它会将自己加入到某个可执行任务队列中等待执行。协程被中断时的一些数据状态会保存到协程数据栈中,等待再次被调用时恢复到寄存器中。
3、共享内存并发机制
3.1 锁控制访问数据的安全性
// 通过互斥锁 sync.Mutex + sync.WaitGroup
func TestCounterWaitGroup(t *testing.T) {
var mux sync.Mutex
var wg sync.WaitGroup
count := 0
for i := 0; i < 5000; i++ {
wg.Add(1)
go func() {
defer func() {
mux.Unlock()
}()
mux.Lock()
count++
wg.Done()
}()
}
wg.Wait()
t.Log(count)
}
3.2 异步返回结果的方式
func service() string {
time.Sleep(time.Millisecond * 50)
return "Done"
}
func otherTask() {
fmt.Println("working on something else")
time.Sleep(time.Millisecond * 100)
fmt.Println("Task is done.")
}
func AsyncService() chan string { //执行完后数据写入通道并返回通道,需要结果时从通道中获取,实现了异步请求
retCh := make(chan string, 1) //这里不指定通道大小就会一直阻塞等待数据从通道中拿走;指定后就是缓冲通道发送完立即结束
go func() {
ret := service()
fmt.Println("returned result.")
retCh <- ret
fmt.Println("service exited.")
}()
return retCh
}
func TestAsyncTask(t *testing.T) {
retCh := AsyncService()
otherTask()
fmt.Println(<-retCh) //从通道中获取数据
}
3.3 多路选择和超时机制
func service() string {
time.Sleep(time.Millisecond * 500) //这里设置为睡眠500
return "Done"
}
func AsyncService() chan string {
retCh := make(chan string, 1)
go func() {
ret := service()
fmt.Println("returned result.")
retCh <- ret
fmt.Println("service exited.")
}()
return retCh
}
func TestAsyncTask(t *testing.T) {
select {
case ret := <-AsyncService():
t.Log(ret)
case <-time.After(time.Millisecond * 100): //超时就输出time out
t.Error("time out")
}
}
//output:
=== RUN TestAsyncTask
SyncTest_test.go:53: time out
--- FAIL: TestAsyncTask (0.11s)
FAIL
三种方式实现超时退出:
方法一: ctx.Done() 搭配 time.After()
原理:
1、通过context的WithTimeout设置一个有效时间为800毫秒的context。
2、该context会在耗尽800毫秒后或者方法执行完成后结束,结束的时候会向通道ctx.Done发送信号。
3、有人可能要问,你这里已经设置了context的有效时间,为什么还要加上这个time.After呢?
这是因为该方法内的context是自己申明的,可以手动设置对应的超时时间,但是在大多数场景,这里的ctx是从上游一直传递过来的,对于上游传递过来的context还剩多少时间,我们是不知道的,所以这时候通过time.After设置一个自己预期的超时时间就很有必要了。
4、注意,这里要记得调用cancel(),不然即使提前执行完了,还要傻傻等到800毫秒后context才会被释放。
func AsyncCall() {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*800))
defer cancel()
go func(ctx context.Context) {
// 发送HTTP请求
}()
select {
case <-ctx.Done():
fmt.Println("call successfully!!!")
return
case <-time.After(time.Duration(time.Millisecond * 900)):
fmt.Println("timeout!!!")
return
}
}
方法二:time.NewTimer()
原理:
这里的主要区别是将time.After换成了time.NewTimer,也是同样的思路如果接口调用提前完成,则监听到Done信号,然后关闭定时器。
否则的话,会在指定的timer即900毫秒后执行超时后的业务逻辑。
func AsyncCall() {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Millisecond * 800))
defer cancel()
timer := time.NewTimer(time.Duration(time.Millisecond * 900))
go func(ctx context.Context) {
// 发送HTTP请求
}()
select {
case <-ctx.Done():
timer.Stop()
timer.Reset(time.Second)
fmt.Println("call successfully!!!")
return
case <-timer.C:
fmt.Println("timeout!!!")
return
}
}
方法三:使用通道
func AsyncCall() {
ctx := context.Background()
done := make(chan struct{}, 1)
go func(ctx context.Context) {
// 发送HTTP请求
done <- struct{}{}
}()
select {
case <-done:
fmt.Println("call successfully!!!")
return
case <-time.After(time.Duration(800 * time.Millisecond)):
fmt.Println("timeout!!!")
return
}
}
3.4 Channel 的关闭和广播
一个生产者对应多个消费者,生产者往通道中发送数据,消费者从通道中接收数据。那么如何保证在数据发送完毕时通知消费者呢?
- 可以通过关闭 Channel,向关闭的 Channel 发送数据会导致 panic;
v,ok := <-ch
,ok 为 bool 值,true 表示正常接收,false 表示通道关闭;- 所有 channel 接收者都会在 channel 关闭时,立刻从阻塞等待中返回且上述 ok 值为 false。这个广播机制常被用来向多个订阅者发送信号,如退出信号。
//生产者-消费者-通道
func procedureMsg(ch chan int, wg *sync.WaitGroup) {
go func() {
for i := 0; i < 10; i++ {
ch <- i
}
close(ch) //注意这里发送完数据需要关闭channel
wg.Done()
}()
}
func consumerMsg(name string, ch chan int, wg *sync.WaitGroup) {
go func() {
for {
if num, ok := <-ch; ok { //ok为true表示正常接收; ok为false表示发送方关闭了通道
fmt.Printf("%s : %d\n", name, num)
} else {
break
}
}
wg.Done()
}()
}
func TestChannel(t *testing.T) {
var wg sync.WaitGroup
ch := make(chan int)
wg.Add(1)
procedureMsg(ch, &wg)
wg.Add(1)
consumerMsg("consumer1", ch, &wg)
wg.Add(1)
consumerMsg("consumer2", ch, &wg)
wg.Wait()
}
3.5 任务的取消
方式一:关闭通道
取消任务可以考虑单独向通道 cancelledChannel
中发送数据来通知正在进行任务的取消;但是有多少个消费者就需要发送多少份数据,这样会在代码中耦合消费者数量。如果采用关闭通道,那么每个消费者从通道中获取的都为数据的零值,加以逻辑处理就能够实现广播通知任务取消了。
//任务的取消
func TestTaskCancel(t *testing.T) {
cancelChannel := make(chan struct{}, 2)
for i := 0; i < 5; i++ {
go func(i int, ch chan struct{}) {
for {
if isCancelled(ch) {
break
}
time.Sleep(time.Millisecond * 5)
}
fmt.Println(i, " Cancelled")
}(i, cancelChannel)
}
sendCancelled2(cancelChannel) //方式二关闭
//sendCancelled1(cancelChannel) //方式一关闭
time.Sleep(time.Second * 1)
}
func isCancelled(ch chan struct{}) bool { //接收到关闭通知就关闭通道
select {
case ret := <-ch:
fmt.Println(ret)
return true
default:
return false
}
}
func sendCancelled1(ch chan struct{}) {
ch <- struct{}{}
}
func sendCancelled2(ch chan struct{}) {
close(ch) //通道关闭后,从通道中获取的是数据的默认零值
}
方式二:context上下文
如果存在这样的情况,某个goroutine执行时又开启了新的 goroutine 执行,那么结束上层 goroutine 时是否应该先结束底层 goroutine,显然这一点仅仅关闭通道是做不到的。由此 Go 实现了 context调度树来追踪各个执行中的 goroutine,实现如下:
//Context任务的取消
func TestTaskCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) //返回上下文、当前goroutine结束的方法
for i := 0; i < 5; i++ {
go func(i int, context context.Context) {
for {
if isCancelled(ctx) {
break
}
time.Sleep(time.Millisecond * 5)
}
fmt.Println(i, " Cancelled")
}(i, ctx)
}
cancel()
time.Sleep(time.Second * 1)
}
func isCancelled(ctx context.Context) bool {
select {
case <-ctx.Done(): //能够获取数据说明 goroutine结束了
return true
default:
return false
}
}
3.6 单例模式只执行一次
public class Singleton {
private static Singleton INSTANCE=null;
private Singleton() {}
public static Singleton getInstance() {
if (INSTANCE==null) {
synchronized(Singleton.class) {
if (INSTANCE==null) {
INSTANCE=new Singleton();
}
}
}
return INSTANCE;
}
}
上述是 Java 方式的单例模式,通过懒汉式 double check 来生成单例对象。Go 语言中通过 sync.Once 对象的 Do() 方法来执行一次方法。
type Singleton struct {
}
var instance *Singleton
var once sync.Once
func GetSingletonObj() *Singleton {
once.Do(func() {
fmt.Println("Create Obj")
instance = new(Singleton)
})
return instance
}
func TestSingletonObj(t *testing.T) {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
obj := GetSingletonObj()
fmt.Printf("%x\n", unsafe.Pointer(&obj))
wg.Done()
}()
}
}
可以发现对象只创建了一次,实现了 Go 语言的单例模式。
3.7 仅需任意任务完成
有时候我们需要这样的场景,多个任务并发执行,我们仅需任意一个任务完毕时返回结果就结束。比如我们采用谷歌、百度等搜索引擎查询某信息,任意一个搜索引擎得到结果时就返回结束。
Go 中通过通道实现,多个 goroutine 并发执行,任意任务执行结束时就往通道中传入数据,通道中一旦有数据就消费返回。
func runTask(i int) string {
var id int
time.Sleep(10 * time.Millisecond)
return fmt.Sprintf("The result is from %d", i)
}
//多个任务执行,当其中任意一个任务执行完毕就返回
func FirstResponse() string {
numOfRunner := 10
ch := make(chan string, numOfRunner)
for i := 0; i < numOfRunner; i++ {
go func(i int) {
ret := runTask(i)
ch <- ret
}(i)
}
return <-ch
}
func TestFirstResponse(t *testing.T) {
t.Log("Before:", runtime.NumGoroutine())
t.Log(FirstResponse())
time.Sleep(1 * time.Second)
t.Log("After:", runtime.NumGoroutine())
}
注意通道大小必须设置,这样采用缓冲通道保证在函数结束时不会有协程阻塞住,如果将代码中的通道换成非缓冲通道,结果如下:
3.8 等待所有任务完成后一起返回
有两种实现方式,一种是采用 sync.WaitGroup 等待所有 goroutine 处理完毕后返回;另一种是采用 CSP机制,通过从通道中获取指定数量的消息后才返回。
//方式一:CSP机制
func runTask(i int) string {
time.Sleep(3 * time.Second)
return fmt.Sprintf("The result is from %d", i)
}
func FirstResponse() string {
numOfRunner := 10
ch := make(chan string, numOfRunner)
for i := 0; i < numOfRunner; i++ {
go func(i int) {
ret := runTask(i)
ch <- ret
}(i)
}
finalRet := ""
for j := 0; j < numOfRunner; j++ { //从通道中循环获取到numOfRunner个对象后才返回
finalRet += <-ch + "\n"
}
return finalRet
}
func TestFirstResponse(t *testing.T) {
t.Log("Before:", runtime.NumGoroutine())
t.Log(FirstResponse())
t.Log("After:", runtime.NumGoroutine())
}
//方式二:sync.WaitGroup
func runTask(i int) string {
time.Sleep(3 * time.Second)
return fmt.Sprintf("The result is from %d", i)
}
func FirstResponse(outWg *sync.WaitGroup) string {
defer outWg.Done()
res := make([]string, 0, 20)
count := 0
var innerWg sync.WaitGroup
for i := 0; i < 10; i++ {
innerWg.Add(1)
go func(i int) {
tmp := runTask(i)
res = append(res, tmp)
count++
defer innerWg.Done()
}(i)
}
innerWg.Wait()
for _, item := range res {
fmt.Println(item)
}
return ""
}
func TestFirstResponse(t *testing.T) {
var outWg sync.WaitGroup
outWg.Add(1)
go FirstResponse(&outWg) //注意外部传入的是 WaitGroup的引用
outWg.Wait()
fmt.Println("全部任务执行完毕.")
}
3.9 创建对象池
对于一些难以创建的对象,为了避免多次重复创建对象的性能消耗,可以将创建好的对象存储到对象池,使用时取出,使用完再放回对象池中。
//对象池中的存储对象
type ReusableObj struct {
}
type ObjPool struct {
bufChan chan *ReusableObj //利用通道存储
}
//创建对象池
func NewObjPool(numOfObj int) *ObjPool {
objPool := ObjPool{}
objPool.bufChan = make(chan *ReusableObj, numOfObj)
for i := 0; i < numOfObj; i++ {
objPool.bufChan <- &ReusableObj{}
}
return &objPool
}
//取对象
func (p *ObjPool) GetObj(timeout time.Duration) (*ReusableObj, error) {
select {
case ret := <-p.bufChan:
return ret, nil
case <-time.After(timeout): //注意需要添加超时操作,防止取不到对象引起的超时
return nil, errors.New("time out")
}
}
//还对象
func (p *ObjPool) ReleaseObj(obj *ReusableObj) error {
select {
case p.bufChan <- obj:
return nil
default: //放错对象或者对象池已满
return errors.New("overflow")
}
}
func TestObjPool(t *testing.T) {
pool := NewObjPool(10)
for i := 0; i < 11; i++ {
if obj, err := pool.GetObj(time.Second * 1); err != nil {
t.Error(err)
} else {
fmt.Println(obj)
if err := pool.ReleaseObj(obj); err != nil {
t.Error(err)
}
}
}
fmt.Println("Done")
}
在 Go 并发包中有一个 sync.Pool,它能否用作 Go 实现了的对象池呢?先来了解下 sync.Pool 的结构:
sync.Pool 对象获取:
每个 Processor 实际上包含了一个私有对象、一个共享对象池;在从 Processor 中获取对象时,如果私有对象不存在,那么就会尝试从当前 Processor 的共享池中获取;如果当前 Processor 共享池是空的,那么就尝试从其它 Processor 共享池中获取;如果所有 Processor 的共享池都是空的,就用用户指定的 New 函数产生一个新的对象返回。
私有对象是协程安全的,共享池是协程不安全的,从共享池中获取对象需要结合锁来实现。
sync.Pool 对象返回:
如果私有对象不存在,就保存为私有对象;
如果私有对象存在,则放入当前 Processor 子池的共享池中;
sync.Pool 对象的生命周期:
每一次 GC 都会清理掉 sync.Pool 缓存的对象,对象缓存的有效期为下一次 GC 之前。GC由系统自动触发,我们无法控制GC的触发时间,因此 sync.Pool 不适合作为对象池来使用。
func TestSyncPool(t *testing.T) {
pool := &sync.Pool{
New: func() interface{} { //定义好创建对象的New方法
fmt.Println("Create new Object.")
return 100
},
}
v := pool.Get().(int)
fmt.Println(v)
pool.Put(3)
v1, _ := pool.Get().(int) //调用默认New方法创建好的对象并不会放在对象池中
fmt.Println(v1)
v2, _ := pool.Get().(int)
fmt.Println(v2)
}
总结:
sync.Pool 适合于通过复用,降低复杂对象的创建和GC代价;但是 sync.Pool 共享池为非协程安全的,使用时需要结合锁操作实现,会有锁方面的开销。sync.Pool 生命周期受 GC 影响,不适合用作连接池。
参考
标签:10,协程,context,fmt,sync,管道,func,time From: https://www.cnblogs.com/istitches/p/17748646.html