Channel使用实践
文章目录
channel
channels 是一种类型安全的消息队列,充当两个 goroutine 之间的管道,将通过它同步的进行任意资源的交换。chan 控制 goroutines 交互的能力从而创建了 Go 同步机制。当创建的 chan 没有容量时,称为无缓冲通道。反过来,使用容量创建的 chan 称为缓冲通道。
要了解通过 chan 交互的 goroutine 的同步行为是什么,我们需要知道通道的类型和状态。根据我们使用的是无缓冲通道还是缓冲通道,场景会有所不同,所以让我们单独讨论每个场景。
Unbuffered Channels
|
|
无缓冲 chan 没有容量,因此进行任何交换前需要两个 goroutine 同时准备好。当 goroutine 试图将一个资源发送到一个无缓冲的通道并且没有goroutine 等待接收该资源时,该通道将锁住发送 goroutine 并使其等待。当 goroutine 尝试从无缓冲通道接收,并且没有 goroutine 等待发送资源时,该通道将锁住接收 goroutine 并使其等待。
无缓冲信道的本质是保证同步。
第一个 goroutine 在发送消息 foo 之后被阻塞,因为还没有接收者准备好。规范中对这种行为进行了很好的解释:https://golang.org/ref/spec#Channel_types
“If the capacity is zero or absent, the channel is unbuffered and communication succeeds only when both a sender and receiver are ready.”
https://golang.org/doc/effective_go.html#channels
“If the channel is unbuffered, the sender blocks until the receiver has received the value”
- Receive 先于 Send 发生。
- 好处: 100% 保证能收到。
- 代价: 延迟时间未知。
Buffered Channels
buffered channel 具有容量,因此其行为可能有点不同。当 goroutine 试图将资源发送到缓冲通道,而该通道已满时,该通道将锁住 goroutine并使其等待缓冲区可用。如果通道中有空间,发送可以立即进行,goroutine 可以继续。当goroutine 试图从缓冲通道接收数据,而缓冲通道为空时,该通道将锁住 goroutine 并使其等待资源被发送。
Latencies due to under-sized buffer
我们在 chan 创建过程中定义的缓冲区大小可能会极大地影响性能。我将使用密集使用 chan 的扇出模式来查看不同缓冲区大小的影响。在我们的基准测试中,一个 producer 将在通道中注入百万个整数元素,而5个 worker 将读取并将它们追加到一个名为 total 的结果变量中。
- Send 先于 Receive 发生。
- 好处: 延迟更小。
- 代价: 不保证数据到达,越大的 buffer,越小的保障到达。buffer = 1 时,给你延迟一个消息的保障。
Go Concurrency Patterns
- Timing out
- Moving on
- Pipeline
- Fan-out, Fan-in
- Cancellation
- Close 先于 Receive 发生(类似 Buffered)。
- 不需要传递数据,或者传递 nil。
- 非常适合取消和超时控制。
- Contex
https://blog.golang.org/concurrency-timeouts https://blog.golang.org/pipelines https://talks.golang.org/2013/advconc.slide#1 https://github.com/go-kratos/kratos/tree/master/pkg/sync
Design Philosophy
- If any given Send on a channel CAN cause the sending goroutine to block:
- Not allowed to use a Buffered channel larger than 1.
- Buffers larger than 1 must have reason/measurements.
- Must know what happens when the sending goroutine blocks.
- Not allowed to use a Buffered channel larger than 1.
- If any given Send on a channel WON’T cause the sending goroutine to block:
- You have the exact number of buffers for each send.
- Fan Out pattern
- You have the buffer measured for max capacity.
- Drop pattern
- 丢弃数据
- Drop pattern
- You have the exact number of buffers for each send.
- Less is more with buffers.
- Don’t think about performance when thinking about buffers.
- 不要把channel buffer 的size大小当做性能提升,它只是缓冲的变多了,阻塞延迟变小了.吞吐量依靠多个goroutine来消费
- Buffers can help to reduce blocking latency between signaling.
- buffer只能减少阻塞延迟.
- Reducing blocking latency towards zero does not necessarily mean better throughput.
- 阻塞延迟变少不意味吞吐提升
- If a buffer of one is giving you good enough throughput then keep it.
- channel的size,最好依据压测来得出.
- Question buffers that are larger than one and measure for size.
- Find the smallest buffer possible that provides good enough throughput.
- Don’t think about performance when thinking about buffers.
应用场景
Channel 的应用场景分为五种类型。
- 数据交流: 当作并发的 buffer 或者 queue,解决生产者 - 消费者问题。多个 goroutine 可以并发当作生产者(Producer)和消费者(Consumer)。
- 数据传递:一个 goroutine 将数据交给另一个 goroutine,相当于把数据的拥有权 (引用) 托付出去。
- 信号通知:一个 goroutine 可以将信号 (closing、closed、data ready 等) 传递给另一个或者另一组 goroutine 。
- 任务编排:可以让一组 goroutine 按照一定的顺序并发或者串行的执行,这就是编排的功能。
- 锁:利用 Channel 也可以实现互斥锁的机制。
Go 的开发者极力推荐使用 Channel,不过,这两年,大家意识到,Channel 并不是处理并发问题的“银弹”,有时候使用并发原语更简单,而且不容易出错。所以,我给你提供一套选择的方法:
- 共享资源的并发访问使用传统并发原语;
- 复杂的任务编排和消息传递使用 Channel;
- 消息通知机制使用 Channel,除非只想 signal 一个 goroutine,才使用 Cond;
- 简单等待所有任务的完成用 WaitGroup,也有 Channel 的推崇者用 Channel,都可以;
- 需要和 Select 语句结合,使用 Channel;
- 需要和超时配合时,使用 Channel 和 Context。
操作结果
chan 的值和状态有多种情况,而不同的操作(send、recv、close)又可能得到不同的结果,这是使用 chan 类型时经常让人困惑的地方。
为了帮助你快速地了解不同状态下各种操作的结果,我总结了一个表格,你一定要特别关注下那些 panic 的情况,另外还要掌握那些会 block 的场景,它们是导致死锁或者 goroutine 泄露的罪魁祸首。
还有一个值得注意的点是,只要一个 chan 还有未读的数据,即使把它 close 掉,你还是可以继续把这些未读的数据消费完,之后才是读取零值数据。
使用反射操作 Channel
通过反射的方式执行 select 语句,在处理很多的 case clause,尤其是不定长的 case clause 的时候,非常有用。
select 语句可以处理 chan 的 send 和 recv,send 和 recv 都可以作为 case clause。如果我们同时处理两个 chan,就可以写成下面的样子:
|
|
如果需要处理三个 chan,你就可以再添加一个 case clause,用它来处理第三个 chan。可是,如果要处理 100 个 chan 呢?一万个 chan 呢? 或者是,chan 的数量在编译的时候是不定的,在运行的时候需要处理一个 slice of chan, 这个时候,也没有办法在编译前写成字面意义的 select。那该怎么办? 这个时候,就要“祭”出我们的反射大法了。
通过 reflect.Select 函数,你可以将一组运行时的 case clause 传入,当作参数执行。Go 的 select 是伪随机的,它可以在执行的 case 中随机选择一个 case,并把选择的这个 case 的索引(chosen)返回,如果没有可用的 case 返回,会返回一个 bool 类型的返回值, 这个返回值用来表示是否有 case 成功被选择。如果是 recv case,还会返回接收的元素。
Select 的方法签名如下:
|
|
下面,我来借助一个例子,来演示一下,动态处理两个 chan 的情形。因为这样的方式可以动态处理 case 数据,所以,你可以传入几百几千几万的 chan,这就解决了不能动态处理 n 个 chan 的问题。
首先,createCases 函数分别为每个 chan 生成了 recv case 和 send case,并返回一个 reflect.SelectCase 数组。
然后,通过一个循环 10 次的 for 循环执行 reflect.Select,这个方法会从 cases 中选择一个 case 执行。第一次肯定是 send case,因为此时 chan 还没有元素,recv 还不可用。等 chan 中有了数据以后,recv case 就可以被选择了。这样,你就可以处理不定数量的 chan 了。
|
|
消息交流
从 chan 的内部实现看,它是以一个循环队列的方式存放数据,所以,它有时候也会被当成线程安全的队列和 buffer 使用。一个 goroutine 可以安全地往 Channel 中塞数据,另外一个 goroutine 可以安全地从 Channel 中读取数据,goroutine 就可以安全地实现信息交流了。
我们来看几个例子。
第一个例子是 worker 池的例子。Marcio Castilho 在 使用 Go 每分钟处理百万请求 这篇文章中,就介绍了他们应对大并发请求的设计。他们将用户的请求放在一个 chan Job 中,这个 chan Job 就相当于一个待处理任务队列。除此之外,还有一个 chan chan Job 队列,用来存放可以处理任务的 worker 的缓存队列。
dispatcher 会把待处理任务队列中的任务放到一个可用的缓存队列中,worker 会一直处理它的缓存队列。通过使用 Channel,实现了一个 worker 池的任务处理中心,并且解耦了前端 HTTP 请求处理和后端任务处理的逻辑。 我在讲 Pool 的时候,提到了一些第三方实现的 worker 池,它们全部都是通过 Channel 实现的,这是 Channel 的一个常见的应用场景。worker 池的生产者和消费者的消息交流都是通过 Channel 实现的。
第二个例子是 etcd 中的 node 节点的实现,包含大量的 chan 字段,比如 recvc 是消息处理的 chan,待处理的 protobuf 消息都扔到这个 chan 中,node 有一个专门的 run goroutine 处理这些消息。
数据传递
“击鼓传花”的游戏很多人都玩过,花从一个人手中传给另外一个人,就有点类似流水线的操作。这个花就是数据,花在游戏者之间流转,这就类似编程中的数据传递。
下面是一道任务编排的题吗,其实它就可以用数据传递的方式实现。
有 4 个 goroutine,编号为 1、2、3、4。每秒钟会有一个 goroutine 打印出它自己的编号,要求你编写程序,让输出的编号总是按照 1、2、3、4、1、2、3、4……这个顺序打印出来。
为了实现顺序的数据传递,我们可以定义一个令牌的变量,谁得到令牌,谁就可以打印一次自己的编号,同时将令牌传递给下一个 goroutine,我们尝试使用 chan 来实现,可以看下下面的代码。
|
|
我来给你具体解释下这个实现方式。
首先,我们定义一个令牌类型(Token),接着定义一个创建 worker 的方法,这个方法会从它自己的 chan 中读取令牌。哪个 goroutine 取得了令牌,就可以打印出自己编号, 因为需要每秒打印一次数据,所以,我们让它休眠 1 秒后,再把令牌交给它的下家。
接着,在第 16 行启动每个 worker 的 goroutine,并在第 20 行将令牌先交给第一个 worker。
如果你运行这个程序,就会在命令行中看到每一秒就会输出一个编号,而且编号是以 1、2、3、4 这样的顺序输出的。
这类场景有一个特点,就是当前持有数据的 goroutine 都有一个信箱,信箱使用 chan 实现,goroutine 只需要关注自己的信箱中的数据,处理完毕后,就把结果发送到下一家的信箱中。
信号通知
chan 类型有这样一个特点:chan 如果为空,那么,receiver 接收数据的时候就会阻塞等待,直到 chan 被关闭或者有新的数据到来。利用这个机制,我们可以实现 wait/notify 的设计模式。
传统的并发原语 Cond 也能实现这个功能,但是,Cond 使用起来比较复杂,容易出错, 而使用 chan 实现 wait/notify 模式就方便很多了。
除了正常的业务处理时的 wait/notify,我们经常碰到的一个场景,就是程序关闭的时候, 我们需要在退出之前做一些清理(doCleanup 方法)的动作。这个时候,我们经常要使用 chan。
比如,使用 chan 实现程序的 graceful shutdown,在退出之前执行一些连接关闭、文件 close、缓存落盘等一些动作。
|
|
有时候,doCleanup 可能是一个很耗时的操作,比如十几分钟才能完成,如果程序退出需要等待这么长时间,用户是不能接受的,所以,在实践中,我们需要设置一个最长的等待时间。只要超过了这个时间,程序就不再等待,可以直接退出。所以,退出的时候分为两个阶段:
- closing,代表程序退出,但是清理工作还没做;
- closed,代表清理工作已经做完。
所以,上面的例子可以改写如下:
|
|
锁
使用 chan 也可以实现互斥锁。
在 chan 的内部实现中,就有一把互斥锁保护着它的所有字段。从外在表现上,chan 的发送和接收之间也存在着 happens-before 的关系,保证元素放进去之后,receiver 才能读取到(关于 happends-before 的关系,是指事件发生的先后顺序关系)。
要想使用 chan 实现互斥锁,至少有两种方式。一种方式是先初始化一个 capacity 等于 1 的 Channel,然后再放入一个元素。这个元素就代表锁,谁取得了这个元素,就相当于获取了这把锁。另一种方式是,先初始化一个 capacity 等于 1 的 Channel,它的“空槽”代表锁,谁能成功地把元素发送到这个 Channel,谁就获取了这把锁。
这是使用 Channel 实现锁的两种不同实现方式,我重点介绍下第一种。理解了这种实现方式,第二种方式也就很容易掌握了,我就不多说了。
|
|
你可以用 buffer 等于 1 的 chan 实现互斥锁,在初始化这个锁的时候往 Channel 中先塞入一个元素,谁把这个元素取走,谁就获取了这把锁,把元素放回去,就是释放了锁。元素在放回到 chan 之前,不会有 goroutine 能从 chan 中取出元素的,这就保证了互斥性。
在这段代码中,还有一点需要我们注意下:利用 select+chan 的方式,很容易实现 TryLock、Timeout 的功能。具体来说就是,在 select 语句中,我们可以使用 default 实现 TryLock,使用一个 Timer 来实现 Timeout 的功能。
文章作者 Forz
上次更新 2021-05-21