Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

基本介绍

Go语言中的Go语言中的goroutine虽然相对于系统线程来说比较轻量级(初始栈大小仅2KB),(并且支持动态扩容),而正常采用java,c++等语言启用的线程一般都是内核态的占用的内存资源一般在4m左右,而假设我们的服务器CPU内存为4G,那么很明显才用的内核态线程的并发总数量也就是1024个,相反查看一下Go语言的协程则可以达到4),(并且支持动态扩容),而正常采用JavaC++等语言启用的线程一般都是内核态的占用的内存资源一般在4m左右,而假设我们的服务器CPU内存为4G,那么很明显内核态线程的并发总数量也就1024个,相反Go语言的协程则可以达到4*1024*1024/2=200w,这么一看就明白了为什么Go语言天生支持高并发。

痛点描述

协程执行的资源消耗大

但是在高并发量下的.这么一看就明白了为什么Go语言天生支持高并发。但是在高并发量下的goroutine频繁创建和销毁对于性能损耗以及GC来说压力也不小。充分将goroutine复用,减少goroutine的创建/销毁的性能损耗,这便是grpoolgoroutine进行池化封装的目的。例如,针对于100W个执行任务,使用goroutine的话需要不停创建并销毁100Wgoroutine,而使用grpool也许底层只需要几万个goroutine便能充分复用地执行完成所有任务。

经测试,goroutine池对于业务逻辑的执行效率(降低执行时间/CPU使用率)提升不大,甚至没有原生的goroutine执行快速(池化goroutine执行调度并没有底层go调度器高效,因为池化goroutine的执行调度也是基于底层go调度器),但是由于采用了复用的设计,池化后对内存的使用率得到极大的降低。在v2版本中grpool也加入了贯穿全局的链路追踪。

概念:

  • Poolgoroutine池,用于管理若干可复用的goroutine协程资源;
  • 执行调度并没有底层Go调度器高效,因为池化goroutine的执行调度也是基于底层Go调度器),但是由于采用了复用的设计,池化后对内存的使用率得到极大的降低。

    大量协程影响全局协程调度

    某些业务模块需要动态创建协程来执行,例如异步采集任务、指标计算任务等等。这些业务逻辑不是服务的核心逻辑,并且会产生协程。在极端情况下可能会引起协程大暴涨,影响底层Go引擎全局的写成调度,造成服务整体执行延迟较大。

    拿异步采集任务来举个例子,每隔5秒执行一次,每次创建100个协程来采集不同的目标端。当采集出现网络延迟时,上一步的任务并未执行完,下一次的任务又新创建协程开始执行。当积累的任务越来越多,会造成协程的暴涨,影响全局的服务执行。针对这一类场景,我们可以通过池化的技术来将任务进行定量执行,当池中的任务堆积到达一定量时,后续的任务应当阻塞。例如,我们设定池中任务的最大数量为10000个,后续不停将任务丢到池中执行,当超过池的最大数量时,任务执行将会阻塞,但并不会影响全局的服务执行。

    概念介绍

    Pool

    goroutine池,用于管理若干可复用的goroutine协程资源。

    Job

    添加到池对象的任务队列中等待执行的任务,是一个Func的方法,一个Job同时只能被一个Worker获取并执行。Func的定义如下:

    Code Block
    languagego
    type Func func(ctx context.Context)

    Worker

    Worker:池对象中参与任务执行的

    池对象中参与任务执行的

    goroutine,一个Worker可以执行若干个Job,直到队列中再无等待的Job

    Job:添加到池对象的任务队列中等待执行的任务,是一个func()的方法,一个Job同时只能被一个Worker获取并执行;

    使用介绍

    使用方式

    import "github.com/gogf/gf/v2/os/grpool"
    

    使用场景

    管理大量异步任务的场景、需要异步协程复用的场景、需要降低内存使用率的场景。

    接口文档https://godoc.org/github.com/gogf/gf/os/grpool

    Code Block
    languagego
    func Add(ctx context.Context, f Func) error
    func() AddWithRecover(ctx context.Context, userFunc Func, recoverFunc RecoverFunc) error
    func Jobs() int
    func Size() int
    type Pool
        func New(limit ...int) *Pool     
        func (p *Pool) Add(ctx context.Context, f Func) error
        func (p *Pool) AddWithRecover(ctx context.Context, userFunc Func, recoverFunc ...func(err error)) error     RecoverFunc) error
        func (p *Pool) Cap() int
        func (p *Pool) Close()
        func (p *Pool) IsClosed() bool
        func (p *Pool) Jobs() int
        func (p *Pool) Size() int


    通过grpool.New方法创建一个goroutine池对象,参数limit为非必需参数,用于限定池中的工作goroutine数量,默认为不限制。需要注意的是,任务可以不停地往池中添加,没有限制,但是工作的goroutine是可以做限制的。我们可以通过Size()方法查询当前的工作goroutine数量,使用Jobs()方法查询当前池中待处理的任务数量。

    同时,为便于使用,grpool包提供了默认的goroutine池,默认的池对象不限制goroutine数量,直接通过grpool.Add即可往默认的池中添加任务,任务参数必须是一个 func()类型的函数/方法。

    Note

    这个模块大家问得最多的是外部如何给grpool里面的任务传递参数,具体请看示例2.里面的任务传递参数,具体请看示例2。

    使用示例

    示例1. 使用默认的goroutine池,限制100个工作goroutine执行1000个任务

    https://github.com/gogf/gf/blob/master/os/gfpool/gfpool_z_unit_test.go

    使用默认的goroutine池,限制100goroutine执行1000个任务

    Code Block
    languagego
    package main
    
    import (
     	"context"
     	"fmt"
     	"github.com/gogf/gf/v2/os/gctx"
     	"github.com/gogf/gf/v2/os/grpool"
     	"github.com/gogf/gf/v2/os/gtimer"
     	"time"
    )
    
    var (
        ctx = gctx.New()
    )
    
    func job(ctx context.Context) {
     	time.Sleep(1*time.Second)
    }
    
    func main() {
     	pool := grpool.New(100)
     	for i := 0; i < 1000; i++ {
         	pool.Add(ctx,job)
     	}
     	fmt.Println("worker:", pool.Size())
     	fmt.Println(" jobs:", pool.Jobs())
     	gtimer.SetInterval(ctx,time.Second, func(ctx context.Context) {
         	fmt.Println("worker:", pool.Size())
         	fmt.Println(" jobs:", pool.Jobs())
         	fmt.Println()
     	})
    
     	select {}
    }

    这段程序中的任务函数的功能是sleep 1秒钟,这样便能充分展示出goroutine数量限制功能。其中,我们使用了gtime.SetInterval定时器每隔1秒钟打印出当前默认池中的工作goroutine数量以及待处理的任务数量。

    示例2. 我们再来看一个新手经常容易出错的例子

    https://github.com/gogf/gf/blob/master/os/gfpool/gfpool_z_unit_test.go

    定时器每隔1秒钟打印出当前默认池中的工作goroutine数量以及待处理的任务数量。

    异步传参:来个新手容易出错的例子

    这个例子在go版本≥1.22时不再生效,即go 1.22以后不再有循环变量陷阱。

    Code Block
    languagego
    package main
    
    import (
     	"context"
     	"fmt"
     	"github.com/gogf/gf/v2/os/gctx"
     	"github.com/gogf/gf/v2/os/grpool"
     	"sync"
    )
    
    var (
        ctx = gctx.New()
    )
    
    func main() {
     	wg := sync.WaitGroup{}
     	for i := 0; i < 10; i++ {
        	wg.Add(1)
        	grpool.Add(ctx,func(ctx context.Context) {
           		fmt.Println(i)
           		wg.Done()
        	})
     	}
     	wg.Wait()
    } 

    我们这段代码的目的是要顺序地打印出0-9,然而运行后却输出:

    Code Block
    10
    10
    10
    10
    10
    10
    10
    10
    10
    10

    为什么呢?这里的执行结果无论是采用go关键字来执行还是grpool来执行都是如此。原因是,对于异步线程/协程来讲,函数进行异步执行注册时,该函数并未真正开始执行(注册时只在goroutine的栈中保存了变量i的内存地址),而一旦开始执行时函数才会去读取变量i的值,而这个时候变量i的值已经自增到了10。 清楚原因之后,改进方案也很简单了,就是在注册异步执行函数的时候,把当时变量i的值也一并传递获取;或者把当前变量i的值赋值给一个不会改变的临时变量,在函数中使用该临时变量而不是直接使用变量i。注册时只在goroutine的栈中保存了变量i的内存地址),而一旦开始执行时函数才会去读取变量i的值,而这个时候变量i的值已经自增到了10。 清楚原因之后,改进方案也很简单了,就是在注册异步执行函数的时候,把当时变量i的值也一并传递获取;或者把当前变量i的值赋值给一个不会改变的临时变量,在函数中使用该临时变量而不是直接使用变量i

    改进后的示例代码如下:

    1)、使用go关键字https://github.com/gogf/gf/blob/master/os/gfpool/gfpool_z_unit_test.go

    package main
    
    import (
        "fmt"
        "sync"
    )
    
    func main() {
        wg := sync.WaitGroup{}
        for i := 0; i < 10; i++ {
            wg.Add(1)
            go func(v int){
                fmt.Println(v)
                wg.Done()
            }(i)
        }
        wg.Wait()
    }
    

    执行后,输出结果为:

    Code Block
    0
    9
    3
    4
    5
    6
    7
    8
    1
    2 

    注意,异步执行时并不会保证按照函数注册时的顺序执行,以下同理。

    2)、使用临时变量https://github.com/gogf/gf/blob/master/os/gfpool/gfpool_z_unit_test.go

    Code Block
    languagego
    package main
    
    import (
     	"context"
     	"fmt"
     	"github.com/gogf/gf/v2/os/gctx"
     	"github.com/gogf/gf/v2/os/grpool"
     	"sync"
    )
    
    var (
       ctx = gctx.New()
    )
    
    func main() {
     	wg := sync.WaitGroup{}
     	for i := 0; i < 10; i++ {
        	wg.Add(1)
        	v := i
        	grpool.Add(ctx, func(ctx context.Context) {
           		fmt.Println(v)
           		wg.Done()
        	})
     	}
     	wg.Wait()
    }   

    执行后,输出结果为:

    Code Block
    9
    0
    1
    2
    3
    4
    5
    6
    7
    8

    这里可以看到,使用grpool进行任务注册时,只能使用func()类型的参数,因此无法在任务注册时把变量i的值注册进去,因此只能采用临时变量的形式来传递当前变量i的值。

    示例3. 最后我们使用程序测试一下grpool和原生的goroutine之间的性能

    这里可以看到,使用grpool进行任务注册时,注册方法为func(ctx context.Context),因此无法在任务注册时把变量i的值注册进去(请尽量不要通过ctx传递业务参数),因此只能采用临时变量的形式来传递当前变量i的值。

    自动捕获goroutine错误:AddWithRecover

    AddWithRecover将新作业推送到具有指定恢复功能的池中。当userFunc执行过程中出现panic时,会调用可选的Recovery Func。如果没有传入Recovery Func或赋空,则忽略userFunc引发的panic。该作业将异步执行。AddWithRecover将新作业推送到具有指定恢复功能的池中。当userFunc执行过程中出现死机时,会调用可选的Recovery Func。如果没有传入`Recovery Func`或赋空,则忽略`userFunc`引发的死机。该作业将异步执行。

    Code Block
    languagego
    package main
    
    import (
    	"context"
    	"fmt"
    	"github.com/gogf/gf/v2/container/garray"
    	"github.com/gogf/gf/v2/os/gctx"
    	"github.com/gogf/gf/v2/os/grpool"
    	"time"
    )
    
    var (
    	ctx = gctx.New()
    )
    func main() {
    	array := garray.NewArray(true)
    	grpool.AddWithRecover(ctx, func(ctx context.Context) {
    		array.Append(1)
    		array.Append(2)
    		panic(1)
    	}, func(err error) {
    		array.Append(1)
    	})
    	grpool.AddWithRecover(ctx, func(ctx context.Context) {
    		panic(1)
    		array.Append(1)
    	})
    	time.Sleep(500 * time.Millisecond)
    	fmt.Print(array.Len())
    }
    示例3. 最后我们使用程序测试一下grpool和原生的goroutine之间的性能

    测试一下grpool和原生的goroutine之间的性能

    1)、grpoolgrpool

    Code Block
    languagego
    package main
    
    import (
     	"context"
     	"fmt"
     	"github.com/gogf/gf/v2/os/gctx"
     	"github.com/gogf/gf/v2/os/grpool"
     	"github.com/gogf/gf/v2/os/gtime"
     	"sync"
     	"time"
    )
    
    var (
       ctx = gctx.New()
    )
    
    func main() {
     	start := gtime.TimestampMilli()
     	wg := sync.WaitGroup{}
     	for i := 0; i < 10000000; i++ {
        	wg.Add(1)
        	grpool.Add(ctx,func(ctx context.Context) {
           		time.Sleep(time.Millisecond)
           		wg.Done()
        	})
     	}
     	wg.Wait()
     	fmt.Println(grpool.Size())
     	fmt.Println("time spent:", gtime.TimestampMilli() - start)
    } 

    2)、goroutinegoroutine

    Code Block
    languagego
    package main
    
    import (
     	"fmt"
     	"github.com/gogf/gf/v2/os/gtime"
     	"sync"
     	"time"
    )
    
    
    func main() {
     	start := gtime.TimestampMilli()
     	wg := sync.WaitGroup{}
     	for i := 0; i < 10000000; i++ {
        	wg.Add(1)
        	go func() {
           		time.Sleep(time.Millisecond)
           		wg.Done()
        	}()
     	}
     	wg.Wait()
     	fmt.Println("time spent:", gtime.TimestampMilli() - start)
    } 

    3)、运行结果比较

    测试结果为两个程序各运行3次取平均值。测试结果为两个程序各运行3次取平均值。

    grpool:
        goroutine count: 847313
         memory   spent: ~2.1 G
         time     spent: 37792 ms
    
    goroutine:
        goroutine count: 1000W
        memory    spent: ~4.8 GB
        time      spent: 27085 ms
    

    可以看到池化过后,执行相同数量的任务,goroutine数量减少很多,相对的内存也降低了一倍以上,CPU时间耗时也勉强可以接受。






    Panel
    titleContent Menu

    Table of Contents