暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Go实现后台任务调度系统

堆栈future 2022-12-13
403

Go实现后台任务调度系统

一、背景

平常我们在开发API的时候,前端传递过来的大批数据需要经过后端处理,如果后端处理的速度快,前端响应就快,反之则很慢,影响用户体验。针对这种场景我们一般都是后台异步处理,不需要前端等待所有的都执行完才返回。为了解决这一问题,需要我们自己实现后台任务调度系统。

二、任务调度器实现

poll.go

package poller

import (
 "context"
 "fmt"
 "log"
 "sync"
 "time"
)

type Poller struct {
 routineGroup *goroutineGroup // 并发控制
 workerNum    int // 记录同时在运行的最大goroutine数

 sync.Mutex
 ready  chan struct{} // 某个goroutine已经准备好了
 metric *metric // 统计当前在运行中的goroutine数量
}

func NewPoller(workerNum int) *Poller {
 return &Poller{
  routineGroup: newRoutineGroup(),
  workerNum:    workerNum,
  ready:        make(chan struct{}, 1),
  metric:       newMetric(),
 }
}

// 调度器
func (p *Poller) schedule() {
 p.Lock()
 defer p.Unlock()
 if int(p.metric.BusyWorkers()) >= p.workerNum {
  return
 }

 select {
 case p.ready <- struct{}{}: // 只要满足当前goroutine数量小于最大goroutine数量 那么就通知poll去调度goroutine执行任务
 default:
 }
}

func (p *Poller) Poll(ctx context.Context) error {
 for {
  // step01
  p.schedule() // 调度

  select {
  case <-p.ready: // goroutine准备好之后 这里就会有消息
  case <-ctx.Done():
   return nil
  }

 LOOP:
  for {
   select {
   case <-ctx.Done():
    break LOOP
   default:
    // step02
    task, err := p.fetch(ctx) // 获取任务
    if err != nil {
     log.Println("fetch task error:", err.Error())
     break
    }
    fmt.Println(task)
    p.metric.IncBusyWorker() // 当前正在运行的goroutine+1
    // step03
    p.routineGroup.Run(func() { // 执行任务
     if err := p.execute(ctx, task); err != nil {
      log.Println("execute task error:", err.Error())
     }
    })
    break LOOP
   }
  }
 }
}

func (p *Poller) fetch(ctx context.Context) (string, error) {
 time.Sleep(1000 * time.Millisecond)
 return "task", nil
}

func (p *Poller) execute(ctx context.Context, task string) error {
 defer func() {
  p.metric.DecBusyWorker() // 执行完成之后 goroutine数量-1
  p.schedule() // 重新调度下一个goroutine去执行任务 这一步是必须的
 }()
 return nil
}

metric.go

package poller

import "sync/atomic"

type metric struct {
 busyWorkers uint64
}

func newMetric() *metric {
 return &metric{}
}

func (m *metric) IncBusyWorker() uint64 {
 return atomic.AddUint64(&m.busyWorkers, 1)
}

func (m *metric) DecBusyWorker() uint64 {
 return atomic.AddUint64(&m.busyWorkers, ^uint64(0))
}

func (m *metric) BusyWorkers() uint64 {
 return atomic.LoadUint64(&m.busyWorkers)
}

goroutine_group.go

package poller

import "sync"

type goroutineGroup struct {
 waitGroup sync.WaitGroup
}

func newRoutineGroup() *goroutineGroup {
 return new(goroutineGroup)
}

func (g *goroutineGroup) Run(fn func()) {
 g.waitGroup.Add(1)

 go func() {
  defer g.waitGroup.Done()
  fn()
 }()
}

func (g *goroutineGroup) Wait() {
 g.waitGroup.Wait()
}

三、测试

package main

import (
 "context"
 "fmt"
 "ta/poller"
 "go.uber.org/goleak"
 "testing"
)

func TestMain(m *testing.M)  {
 fmt.Println("start")
 goleak.VerifyTestMain(m)
}

func TestPoller(t *testing.T) {
 producer := poller.NewPoller(5)
 producer.Poll(context.Background())
}

结果:

四、总结

大家用别的方式也可以实现,核心要点就是控制并发节奏,防止大量请求打到task service
,在这里起到核心作用的就是schedule
,它控制着整个任务系统的调度。同时还封装了WaitGroup
,这在大多数开源代码中都比较常见,大家可以去尝试。另外就是test case
一定得跟上,防止goroutine
泄漏。


文章转载自堆栈future,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论