塞完数据之后close(chan)会怎样
一、close(chan)之后程序继续读取会怎样
看代码
type taskFunc func(ctx context.Context) error
func Run(ctx context.Context, tasks ...taskFunc) (e error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
// 开辟tasks长度的buf chan
queue := make(chan taskFunc, len(tasks))
for _, task := range tasks {
queue <- task
}
// 关闭chan ???这里关闭 对后面读取有影响吗
close(queue)
numCPU := runtime.NumCPU()
wg := sync.WaitGroup{}
wg.Add(numCPU)
for i:=0; i<numCPU; i++ {
go func() {
defer wg.Done()
for {
select {
case task, ok := <- queue: // 读取
if !ok {
fmt.Println("chan closed!")
return
}
if ctx.Err() != nil {
fmt.Println("context error")
return
}
if err := task(ctx); err != nil {
e = err
fmt.Println("task run error: ", err)
return
}
case <-ctx.Done():
e = ctx.Err()
fmt.Println("Done !")
return
}
}
}()
}
wg.Wait()
fmt.Println("end")
return nil
}
func T1(ctx context.Context) error {
return nil
}
func T2(ctx context.Context) error {
return nil
}
func T3(ctx context.Context) error {
time.Sleep(10*time.Second)
return errors.New("T3 超时了")
}
func T4(ctx context.Context) error {
return errors.New("T4 error")
}
func main() {
Run(context.Background(), T1, T2,T3, T4)
}复制
测试:
import (
"context"
"fmt"
"go.uber.org/goleak"
"testing"
)
func TestMain(m *testing.M) {
fmt.Println("start")
goleak.VerifyTestMain(m)
}
func TestRun(t *testing.T) {
Run(context.Background(), T1, T2,T3, T4)
}复制
输出:
start
=== RUN TestRun
chan closed!
chan closed!
chan closed!
chan closed!
task run error: T4 error
chan closed!
chan closed!
task run error: T3 超时了
done
--- PASS: TestRun (10.00s)复制
发现程序正常运行输出结果,这是为什么呢?
二、原因在select的接受处理上
看select源码发现,selectgo在循环处理case上有独特的顺序,且看:
loop:
for i := 0; i < ncases; i++ {
// 根据 `pollorder` 记录的随机 scases 索引来获取 cas
casi = int(pollorder[i])
cas = &scases[casi]
c = cas.c
switch case.kind {
case caseNil:
continue
case caseRecv:
// ...
case caseSend:
// ...
case caseDefault:
// ...
}
}
// ...
}复制
根据pollorder
记录的随机scases
索引来遍历处理case
,然后根据case.kind
来查看channel
是否准备好,然后goto
跳转到相应逻辑。
case.kind
为caseNil
,说明channel
为nil
,那么continue
,不进行任何处理。
caseRecv
: channel
接收操作:
switch case.kind {
case caseRecv:
sg = c.sendq.dequeue()
if sg != nil {
goto recv
}
if c.qcount > 0 {
goto bufrecv
}
if c.closed != 0 {
goto rclose
}
...
}复制
看第一个if:如果 channel
中有待发送的goroutine
, 跳转到recv
,调用recv
完成接收操作。看第二个if:如果 channel
中有缓冲数据,那么跳转到bufrecv
,从缓冲区中获取数据。看第三个if:如果 channel
已关闭,跳转到rclose
, 将接收值置为空值,recvOK
置为false
。
看到了吧,在recv的时候,判断接受操作优先于判断关闭操作,所以我们上述程序在塞完task之后就直接关闭chan
三、小结
掌握channel
的for select
用法至关重要,稍不留意就会造成程序bug,所以平时多看看源码,想一想为什么。
文章转载自堆栈future,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。
评论
相关阅读
golang实现mcp的sse传输方式
golang算法架构leetcode技术php
28次阅读
2025-04-21 10:35:07
golang langchan mcp adapter
golang算法架构leetcode技术php
22次阅读
2025-04-17 10:41:53
golang实现deepseek 聊天功能
golang算法架构leetcode技术php
16次阅读
2025-04-07 09:44:53
golang实现langchain graph
golang算法架构leetcode技术php
14次阅读
2025-04-16 10:33:44
golang实现mcp client(2)
golang算法架构leetcode技术php
12次阅读
2025-04-14 09:40:19