暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

塞完数据之后close(chan)会怎样

堆栈future 2022-12-18
268

塞完数据之后close(chan)会怎样

一、close(chan)之后程序继续读取会怎样

看代码

type taskFunc func(ctx context.Context) error

func Run(ctx context.Context, tasks ...taskFunc) (e error) {
 ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
 defer cancel()

  // 开辟tasks长度的buf chan
 queue := make(chan taskFunc, len(tasks)) 
 for _, task := range tasks {
  queue <- task
 }
  
  // 关闭chan ???这里关闭 对后面读取有影响吗
 close(queue)

 numCPU := runtime.NumCPU()
 wg := sync.WaitGroup{}
 wg.Add(numCPU)
 for i:=0; i<numCPU; i++ {
  go func() {
   defer wg.Done()
   for {
    select {
    case task, ok := <- queue: // 读取
     if !ok {
      fmt.Println("chan closed!")
      return
     }
     if ctx.Err() != nil {
      fmt.Println("context error")
      return
     }
     if err := task(ctx); err != nil {
      e = err
      fmt.Println("task run error: ", err)
      return
     }
    case <-ctx.Done():
     e = ctx.Err()
     fmt.Println("Done !")
     return
    }
   }
  }()
 }
 wg.Wait()
 fmt.Println("end")
 return nil
}

func T1(ctx context.Context) error {
 return nil
}

func T2(ctx context.Context) error {
 return nil
}

func T3(ctx context.Context) error {
 time.Sleep(10*time.Second)
 return errors.New("T3 超时了")
}

func T4(ctx context.Context) error {
 return errors.New("T4 error")
}

func main() {
   Run(context.Background(), T1, T2,T3, T4)
}

复制

测试:


import (
 "context"
 "fmt"
 "go.uber.org/goleak"
 "testing"
)

func TestMain(m *testing.M)  {
 fmt.Println("start")
 goleak.VerifyTestMain(m)
}

func TestRun(t *testing.T) {
 Run(context.Background(), T1, T2,T3, T4)
}

复制

输出:

start
=== RUN   TestRun
chan closed!
chan closed!
chan closed!
chan closed!
task run error:  T4 error
chan closed!
chan closed!
task run error:  T3 超时了
done
--- PASS: TestRun (10.00s)

复制

发现程序正常运行输出结果,这是为什么呢?

二、原因在select的接受处理上

看select源码发现,selectgo在循环处理case上有独特的顺序,且看:

loop:
    for i := 0; i < ncases; i++ {
        // 根据 `pollorder` 记录的随机 scases 索引来获取 cas
        casi = int(pollorder[i])
        cas = &scases[casi]

        c = cas.c
        switch case.kind {
        case caseNil:
            continue
        case caseRecv:
            // ...
        case caseSend:
            // ...
        case caseDefault:
            // ...
        }
    }
    // ...
}

复制

根据pollorder
记录的随机scases
索引来遍历处理case
,然后根据case.kind
来查看channel
是否准备好,然后goto
跳转到相应逻辑。

case.kind
caseNil
,说明channel
nil
,那么continue
,不进行任何处理。

caseRecv
: channel
接收操作:

switch case.kind {
        case caseRecv:
            sg = c.sendq.dequeue()
            if sg != nil {
                goto recv
            }
            if c.qcount > 0 {
                goto bufrecv
            }
            if c.closed != 0 {
                goto rclose
            }
        ...
        }

复制
  • 看第一个if:如果channel
    中有待发送的goroutine
    , 跳转到recv
    ,调用recv
    完成接收操作。
  • 看第二个if:如果channel
    中有缓冲数据,那么跳转到bufrecv
    ,从缓冲区中获取数据。
  • 看第三个if:如果channel
    已关闭,跳转到rclose
    , 将接收值置为空值,recvOK
    置为false

看到了吧,在recv的时候,判断接受操作优先于判断关闭操作,所以我们上述程序在塞完task之后就直接关闭chan

三、小结

掌握channel
for select
用法至关重要,稍不留意就会造成程序bug,所以平时多看看源码,想一想为什么。

文章转载自堆栈future,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论