go-zero源码浅析2:core\syncx\cond.go
核心作用
Cond是go-zero扩展的一个关于条件变量的并发原语,核心在于扩展实现了一个带超时时间的Wait函数:WaitWithTimeout。其特点是极其简洁精炼,仅使用一个无缓存的通道实现条件变量的基本功能。但是舍弃了广播接口。
使用示例
直接以源码自带的单元测试代码为例,展示带超时的等待函数的功能:
func TestTimeoutCondWaitTimeoutRemain(t *testing.T) {
var wait sync.WaitGroup
cond := NewCond()
wait.Add(2)
ch := make(chan time.Duration, 1) // 容量为1,用于协程间传递剩余超时时间(符合典型准则:通过通信共享内存)
defer close(ch)
timeout := time.Duration(2000) * time.Millisecond // 2秒
go func() { // 执行等待的协程
remainTimeout, _ := cond.WaitWithTimeout(timeout) // 最多等待2秒
ch <- remainTimeout // 输出剩余时间到ch
wait.Done()
}()
sleep(200) // 睡眠200ms
go func() { // 执行唤醒的协程
cond.Signal() // 200ms后由第二个协程唤醒了第一个协程
wait.Done()
}()
wait.Wait()
remainTimeout := <-ch // 读取剩余时间(实际时间在1.8秒左右)
t.Logf("remainTimeout: %v\n", remainTimeout) // 打印剩余时间(自己加的代码)
assert.True(t, remainTimeout < timeout, "expect remainTimeout %v < %v", remainTimeout, timeout)
assert.True(t, remainTimeout >= time.Duration(200)*time.Millisecond,
"expect remainTimeout %v >= 200 millisecond", remainTimeout)
}执行结果:
PS D:\read\go-zero-master\core\syncx> go test -run TestTimeoutCondWaitTimeoutRemain -v
=== RUN TestTimeoutCondWaitTimeoutRemain
cond_test.go:57: remainTimeout: 1.7998663s【剩余时间接近1.8秒,符合预期】
--- PASS: TestTimeoutCondWaitTimeoutRemain (0.20s)
PASS
ok github.com/zeromicro/go-zero/core/syncx 0.223s
PS D:\read\go-zero-master\core\syncx>源码分析
package syncx
import (
"time"
"github.com/zeromicro/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/timex"
)
// A Cond is used to wait for conditions.
type Cond struct {
signal chan lang.PlaceholderType // == chan struct{} 无缓冲通道
}
// NewCond returns a Cond.
func NewCond() *Cond {
return &Cond{
signal: make(chan lang.PlaceholderType), // 创建chan
}
}
// WaitWithTimeout wait for signal return remain wait time or timed out.
func (cond *Cond) WaitWithTimeout(timeout time.Duration) (time.Duration, bool) { // 带超时的等待
timer := time.NewTimer(timeout)
defer timer.Stop()
begin := timex.Now()
select {
case <-cond.signal: // 有其他协程调用了Signal()函数,Wait成功
elapsed := timex.Since(begin) // 本次Wait的耗时
remainTimeout := timeout - elapsed
return remainTimeout, true // 返回剩余的超时时间,和true(表示Wait成功)
case <-timer.C: // 超时了
return 0, false // 返回剩余的超时时间0,和false(表示Wait超时了)
}
}
// Wait waits for signals.
func (cond *Cond) Wait() { // 等待:无限期阻塞,直到通道可读时返回(说明有其他协程调用了Signal()函数尝试唤醒)
<-cond.signal
}
// Signal wakes one goroutine waiting on c, if there is any.
// 如果存在多个正在执行Wait()的协程,只唤醒一个(最先抢占读取到通道的协程,其余协程继续阻塞);
// 如果存在一个正在执行Wait()的协程,那么唤醒它;否者什么也不干(0个协程在Wait)
func (cond *Cond) Signal() { // 唤醒一个协程
select {
case cond.signal <- lang.Placeholder: // 如果存在一个正在执行Wait()的协程,那么唤醒它
default: // 缺省分支,不存在正在执行Wait()的协程,什么都不做,直接返回
}
}核心流程
可能存在的问题
1.缺少广播唤醒接口Broadcast
可能的方案是采用close(cond.signal),然后重建cond.signal的方式。但是需要增加互斥锁进行保护(同时对已有接口增加互斥锁代码,防止对已经closed的通道进行发送等panic)。
func (cond *Cond) Broadcast() {
cond.mu.Lock()
defer cond.mu.Unlock()
close(cond.signal) // 关闭通道,广播唤醒所有的等待协程(close函数的广播特性)
cond.signal = make(chan lang.PlaceholderType) // 重建通道
}另外一个方法是对调用Wait()或WaitWithTimeout()函数的等待协程进行计数,在广播接口中循环该计数次Signal()函数,达到广播唤醒所有的效果。这个方法看上去更好。
2.可能还存在其他问题
该方案在执行Wait系列函数时,对通道cond.signal的读取是抢占式的,谁最先读取到通道谁wait返回,即谁被唤醒是随机的。