Go实现后台任务调度系统
一、背景
平常我们在开发API的时候,前端传递过来的大批数据需要经过后端处理,如果后端处理的速度快,前端响应就快,反之则很慢,影响用户体验。针对这种场景我们一般都是后台异步处理,不需要前端等待所有的都执行完才返回。为了解决这一问题,需要我们自己实现后台任务调度系统。
二、任务调度器实现
poll.go
package poller
import (
"context"
"fmt"
"log"
"sync"
"time"
)
type Poller struct {
routineGroup *goroutineGroup // 并发控制
workerNum int // 记录同时在运行的最大goroutine数
sync.Mutex
ready chan struct{} // 某个goroutine已经准备好了
metric *metric // 统计当前在运行中的goroutine数量
}
func NewPoller(workerNum int) *Poller {
return &Poller{
routineGroup: newRoutineGroup(),
workerNum: workerNum,
ready: make(chan struct{}, 1),
metric: newMetric(),
}
}
// 调度器
func (p *Poller) schedule() {
p.Lock()
defer p.Unlock()
if int(p.metric.BusyWorkers()) >= p.workerNum {
return
}
select {
case p.ready <- struct{}{}: // 只要满足当前goroutine数量小于最大goroutine数量 那么就通知poll去调度goroutine执行任务
default:
}
}
func (p *Poller) Poll(ctx context.Context) error {
for {
// step01
p.schedule() // 调度
select {
case <-p.ready: // goroutine准备好之后 这里就会有消息
case <-ctx.Done():
return nil
}
LOOP:
for {
select {
case <-ctx.Done():
break LOOP
default:
// step02
task, err := p.fetch(ctx) // 获取任务
if err != nil {
log.Println("fetch task error:", err.Error())
break
}
fmt.Println(task)
p.metric.IncBusyWorker() // 当前正在运行的goroutine+1
// step03
p.routineGroup.Run(func() { // 执行任务
if err := p.execute(ctx, task); err != nil {
log.Println("execute task error:", err.Error())
}
})
break LOOP
}
}
}
}
func (p *Poller) fetch(ctx context.Context) (string, error) {
time.Sleep(1000 * time.Millisecond)
return "task", nil
}
func (p *Poller) execute(ctx context.Context, task string) error {
defer func() {
p.metric.DecBusyWorker() // 执行完成之后 goroutine数量-1
p.schedule() // 重新调度下一个goroutine去执行任务 这一步是必须的
}()
return nil
}
metric.go
package poller
import "sync/atomic"
type metric struct {
busyWorkers uint64
}
func newMetric() *metric {
return &metric{}
}
func (m *metric) IncBusyWorker() uint64 {
return atomic.AddUint64(&m.busyWorkers, 1)
}
func (m *metric) DecBusyWorker() uint64 {
return atomic.AddUint64(&m.busyWorkers, ^uint64(0))
}
func (m *metric) BusyWorkers() uint64 {
return atomic.LoadUint64(&m.busyWorkers)
}
goroutine_group.go
package poller
import "sync"
type goroutineGroup struct {
waitGroup sync.WaitGroup
}
func newRoutineGroup() *goroutineGroup {
return new(goroutineGroup)
}
func (g *goroutineGroup) Run(fn func()) {
g.waitGroup.Add(1)
go func() {
defer g.waitGroup.Done()
fn()
}()
}
func (g *goroutineGroup) Wait() {
g.waitGroup.Wait()
}
三、测试
package main
import (
"context"
"fmt"
"ta/poller"
"go.uber.org/goleak"
"testing"
)
func TestMain(m *testing.M) {
fmt.Println("start")
goleak.VerifyTestMain(m)
}
func TestPoller(t *testing.T) {
producer := poller.NewPoller(5)
producer.Poll(context.Background())
}
结果:
四、总结
大家用别的方式也可以实现,核心要点就是控制并发节奏,防止大量请求打到task service
,在这里起到核心作用的就是schedule
,它控制着整个任务系统的调度。同时还封装了WaitGroup
,这在大多数开源代码中都比较常见,大家可以去尝试。另外就是test case
一定得跟上,防止goroutine
泄漏。
文章转载自堆栈future,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。