Versions Compared
Key
- This line was added.
- This line was removed.
- Formatting was changed.
...
基本介绍
...
Go
语言中的goroutine
虽然相对于系统线程来说比较轻量级(初始栈大小仅2KB
...
),(并且支持动态扩容),而正常采用Java
、C++
等语言启用的线程一般都是内核态的占用的内存资源一般在4m
左右,而假设我们的服务器CPU
内存为4G
,那么很明显内核态线程的并发总数量也就1024
个,相反Go
语言的协程则可以达到4*1024*1024/2=200w
,这么一看就明白了为什么Go语言天生支持高并发。
痛点描述
协程执行的资源消耗大
但是在高并发量下的goroutine
频繁创建和销毁对于性能损耗以及GC
来说压力也不小。充分将goroutine
复用,减少goroutine
的创建/销毁的性能损耗,这便是grpool
对goroutine
进行池化封装的目的。例如,针对于100W
个执行任务,使用goroutine
的话需要不停创建并销毁100W
个goroutine
,而使用grpool
也许底层只需要几万个goroutine
便能充分复用地执行完成所有任务。
经测试,goroutine
池对于业务逻辑的执行效率(降低执行时间/CPU使用率)提升不大,甚至没有原生的goroutine
执行快速(池化goroutine
...
执行调度并没有底层Go
调度器高效,因为池化goroutine
的执行调度也是基于底层Go
调度器),但是由于采用了复用的设计,池化后对内存的使用率得到极大的降低。
大量协程影响全局协程调度
某些业务模块需要动态创建协程来执行,例如异步采集任务、指标计算任务等等。这些业务逻辑不是服务的核心逻辑,并且会产生协程。在极端情况下可能会引起协程大暴涨,影响底层Go
引擎全局的写成调度,造成服务整体执行延迟较大。
拿异步采集任务来举个例子,每隔5
秒执行一次,每次创建100
个协程来采集不同的目标端。当采集出现网络延迟时,上一步的任务并未执行完,下一次的任务又新创建协程开始执行。当积累的任务越来越多,会造成协程的暴涨,影响全局的服务执行。针对这一类场景,我们可以通过池化的技术来将任务进行定量执行,当池中的任务堆积到达一定量时,后续的任务应当阻塞。例如,我们设定池中任务的最大数量为10000
个,后续不停将任务丢到池中执行,当超过池的最大数量时,任务执行将会阻塞,但并不会影响全局的服务执行。
概念介绍
Pool
goroutine
池,用于管理若干可复用的goroutine
协程资源。
Job
添加到池对象的任务队列中等待执行的任务,是一个Func
的方法,一个Job
同时只能被一个Worker
获取并执行。Func
的定义如下:
Code Block | ||
---|---|---|
| ||
type Func func(ctx context.Context) |
Worker
池对象中参与任务执行的goroutine
,一个Worker
可以执行若干个Job
,直到队列中再无等待的Job
...
。
使用介绍
使用方式:
import "github.com/gogf/gf/v2/os/grpool"
使用场景:
管理大量异步任务的场景、需要异步协程复用的场景、需要降低内存使用率的场景。
接口文档:
...
Code Block | ||
---|---|---|
| ||
func Add(ctx context.Context, f Func) error func AddWithRecover( |
...
ctx context.Context, userFunc Func, recoverFunc RecoverFunc) error func Jobs() int func Size() int |
...
|
...
func New(limit ...int) *Pool func (p *Pool) Add(ctx context.Context, f Func) error func (p *Pool) AddWithRecover(ctx context.Context, userFunc Func, recoverFunc RecoverFunc) error func (p *Pool) Cap() int func (p *Pool) Close() func (p *Pool) IsClosed() bool func (p *Pool) Jobs() int func (p *Pool) Size() int |
...
通过grpool.New
方法创建一个goroutine池
对象,参数limit
为非必需参数,用于限定池中的工作goroutine
数量,默认为不限制。需要注意的是,任务可以不停地往池中添加,没有限制,但是工作的goroutine
是可以做限制的。我们可以通过Size()
方法查询当前的工作goroutine
数量,使用Jobs()
方法查询当前池中待处理的任务数量。
同时,为便于使用,grpool
包提供了默认的goroutine
池,默认的池对象不限制goroutine
数量,直接通过grpool.Add
即可往默认的池中添加任务,任务参数必须是一个 func()
类型的函数/方法。
Note |
---|
这个模块大家问得最多的是外部如何给 |
使用示例
...
1、使用默认的goroutine池,限制100个工作goroutine执行1000个任务
...
使用默认的goroutine
池,限制100
个goroutine
执行1000
个任务
Code Block | ||
---|---|---|
| ||
package main import ( " |
...
context" "fmt" "github.com/gogf/gf/v2/os/gctx" "github.com/gogf/gf/v2/os/grpool" "github.com/gogf/gf/v2/os/gtimer" "time" ) var ( ctx = gctx.New() ) func job(ctx context.Context) { time.Sleep(1*time.Second) } func main() { pool := grpool.New(100) for i := 0; i < 1000; i++ { |
...
pool.Add(ctx,job) } fmt.Println("worker:", pool.Size()) fmt.Println(" |
...
jobs:", pool.Jobs()) gtimer.SetInterval(ctx,time.Second, func(ctx context.Context) { |
...
fmt.Println("worker:", pool.Size())
|
...
fmt.Println(" |
...
jobs:", pool.Jobs()) |
...
fmt.Println() }) select {} } |
...
这段程序中的任务函数的功能是sleep 1秒钟
,这样便能充分展示出goroutine数量限制功能。其中,我们使用了gtime.SetInterval
...
定时器每隔1秒钟打印出当前默认池中的工作goroutine
数量以及待处理的任务数量。
2、我们再来看一个新手经常容易出错的例子
...
异步传参:来个新手容易出错的例子
这个例子在go版本≥1.22时不再生效,即go 1.22以后不再有循环变量陷阱。
Code Block | ||
---|---|---|
| ||
package main
import (
|
...
"context" "fmt" |
...
"github.com/gogf/gf/v2/os/gctx" |
...
"github.com/gogf/gf/v2/os/grpool" "sync" ) var ( ctx = gctx.New() ) func main() { |
...
wg := sync.WaitGroup{} |
...
for i := 0; i < 10; i++ { |
...
wg.Add(1) |
...
grpool.Add(ctx,func(ctx context.Context) { |
...
fmt.Println(i) |
...
wg.Done() |
...
}) |
...
} |
...
wg.Wait() } |
...
|
我们这段代码的目的是要顺序地打印出0-9,然而运行后却输出:
Code Block |
---|
10
10
10
10
10
10
10
10
10
10 |
...
为什么呢?这里的执行结果无论是采用go
关键字来执行还是grpool
来执行都是如此。原因是,对于异步线程/协程来讲,函数进行异步执行注册时,该函数并未真正开始执行(
...
注册时只在goroutine
的栈中保存了变量i
的内存地址),而一旦开始执行时函数才会去读取变量i
的值,而这个时候变量i
的值已经自增到了10
。 清楚原因之后,改进方案也很简单了,就是在注册异步执行函数的时候,把当时变量i
的值也一并传递获取;或者把当前变量i的值赋值给一个不会改变的临时变量,在函数中使用该临时变量而不是直接使用变量i
。
改进后的示例代码如下:
1)、使用go关键字
...
package main
import (
"fmt"
"sync"
)
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func(v int){
fmt.Println(v)
wg.Done()
}(i)
}
wg.Wait()
}
执行后,输出结果为:
Code Block |
---|
...
0
|
...
9 |
...
3 4 5 6 7 8 1 2 |
注意,异步执行时并不会保证按照函数注册时的顺序执行,以下同理。
2)、使用临时变量
...
Code Block | ||
---|---|---|
| ||
package main
import (
|
...
"context" "fmt" |
...
"github.com/gogf/gf/v2/os/gctx" |
...
"github.com/gogf/gf/v2/os/grpool" "sync" ) var ( ctx = gctx.New() ) func main() { |
...
wg := sync.WaitGroup{} |
...
for i := 0; i < 10; i++ { |
...
wg.Add(1) |
...
v := i |
...
grpool.Add(ctx, func(ctx context.Context) { |
...
fmt.Println(v) |
...
wg.Done() |
...
}) |
...
} |
...
wg.Wait() } |
...
|
执行后,输出结果为:
Code Block |
---|
9
0
1
2
3
4
5
6
7
8 |
...
这里可以看到,使用grpool进行任务注册时,只能使用func()类型的参数,因此无法在任务注册时把变量i的值注册进去,因此只能采用临时变量的形式来传递当前变量i的值。
3、最后我们使用程序测试一下grpool和原生的goroutine之间的性能
...
这里可以看到,使用grpool
进行任务注册时,注册方法为func(ctx context.Context)
,因此无法在任务注册时把变量i
的值注册进去(请尽量不要通过ctx
传递业务参数),因此只能采用临时变量的形式来传递当前变量i
的值。
自动捕获goroutine
错误:AddWithRecover
AddWithRecover
将新作业推送到具有指定恢复功能的池中。当userFunc
执行过程中出现panic
时,会调用可选的Recovery Func
。如果没有传入Recovery Func
或赋空,则忽略userFunc
引发的panic
。该作业将异步执行。
Code Block | ||
---|---|---|
| ||
package main import ( "context" "fmt" "github.com/gogf/gf/v2/container/garray" "github.com/gogf/gf/v2/os/gctx" "github.com/gogf/gf/v2/os/grpool" "time" ) var ( ctx = gctx.New() ) func |
...
main() {
array := garray.NewArray(true)
grpool.AddWithRecover(ctx, func(ctx context.Context) {
array.Append(1)
array.Append(2)
panic(1)
}, func(err error) {
array.Append(1)
})
grpool.AddWithRecover(ctx, func(ctx context.Context) {
panic(1)
array.Append(1)
})
time.Sleep(500 * time.Millisecond)
fmt.Print(array.Len())
} |
测试一下grpool
和原生的goroutine
之间的性能
1)、grpool
Code Block | ||
---|---|---|
| ||
package main import ( "context" "fmt" "github.com/gogf/gf/v2/os/ |
...
gctx" |
...
"github.com/gogf/gf/v2/os/grpool" "github.com/gogf/gf/v2/os/ |
...
gtime" "sync" "time" ) var ( ctx = gctx.New() ) func main() { |
...
start := gtime. |
...
TimestampMilli() |
...
wg |
...
:= sync.WaitGroup{}
|
...
for i := 0; i < 10000000; i++ { |
...
wg.Add(1) |
...
grpool.Add(ctx,func(ctx context.Context) { |
...
time.Sleep(time.Millisecond) |
...
wg.Done() |
...
}) |
...
} |
...
wg.Wait() |
...
fmt.Println(grpool.Size()) |
...
fmt.Println("time spent:", gtime. |
...
TimestampMilli() - start) } |
...
|
2)
...
、goroutine
Code Block | ||
---|---|---|
| ||
package main
import (
|
...
"fmt" |
...
"github.com/gogf/gf/v2/os/gtime" "sync" "time" ) func main() { |
...
start := gtime. |
...
TimestampMilli() |
...
wg |
...
:= sync.WaitGroup{}
|
...
for i := 0; i < 10000000; i++ { |
...
wg.Add(1) |
...
go func() { |
...
time.Sleep(time.Millisecond) |
...
wg.Done() |
...
}() |
...
} |
...
wg.Wait() |
...
fmt.Println("time spent:", gtime. |
...
TimestampMilli() - start) } |
...
|
3)、运行结果比较
...
测试结果为两个程序各运行3
次取平均值。
grpool:
goroutine count: 847313
memory spent: ~2.1 G
time spent: 37792 ms
goroutine:
goroutine count: 1000W
memory spent: ~4.8 GB
time spent: 27085 ms
可以看到池化过后,执行相同数量的任务,goroutine
数量减少很多,相对的内存也降低了一倍以上,CPU时间耗时也勉强可以接受。
Panel | ||
---|---|---|
| ||
|