go-zero源码浅析2:core\syncx\cond.go

deer332025-07-25技术文章35

核心作用

Cond是go-zero扩展的一个关于条件变量的并发原语,核心在于扩展实现了一个带超时时间的Wait函数:WaitWithTimeout。其特点是极其简洁精炼,仅使用一个无缓存的通道实现条件变量的基本功能。但是舍弃了广播接口。

使用示例

直接以源码自带的单元测试代码为例,展示带超时的等待函数的功能:

func TestTimeoutCondWaitTimeoutRemain(t *testing.T) {
	var wait sync.WaitGroup
	cond := NewCond()
	wait.Add(2)
	ch := make(chan time.Duration, 1) // 容量为1,用于协程间传递剩余超时时间(符合典型准则:通过通信共享内存)
	defer close(ch)
	timeout := time.Duration(2000) * time.Millisecond // 2秒
	go func() { // 执行等待的协程
		remainTimeout, _ := cond.WaitWithTimeout(timeout) // 最多等待2秒
		ch <- remainTimeout                               // 输出剩余时间到ch
		wait.Done()
	}()
	sleep(200) // 睡眠200ms
	go func() { // 执行唤醒的协程
		cond.Signal() // 200ms后由第二个协程唤醒了第一个协程
		wait.Done()
	}()
	wait.Wait()
	remainTimeout := <-ch                        // 读取剩余时间(实际时间在1.8秒左右)
	t.Logf("remainTimeout: %v\n", remainTimeout) // 打印剩余时间(自己加的代码)
	assert.True(t, remainTimeout < timeout, "expect remainTimeout %v < %v", remainTimeout, timeout)
	assert.True(t, remainTimeout >= time.Duration(200)*time.Millisecond,
		"expect remainTimeout %v >= 200 millisecond", remainTimeout)
}

执行结果:

PS D:\read\go-zero-master\core\syncx> go test -run TestTimeoutCondWaitTimeoutRemain -v
=== RUN   TestTimeoutCondWaitTimeoutRemain
    cond_test.go:57: remainTimeout: 1.7998663s【剩余时间接近1.8秒,符合预期】
--- PASS: TestTimeoutCondWaitTimeoutRemain (0.20s)
PASS
ok      github.com/zeromicro/go-zero/core/syncx 0.223s
PS D:\read\go-zero-master\core\syncx>

源码分析

package syncx

import (
	"time"

	"github.com/zeromicro/go-zero/core/lang"
	"github.com/zeromicro/go-zero/core/timex"
)

// A Cond is used to wait for conditions.
type Cond struct {
	signal chan lang.PlaceholderType // == chan struct{} 无缓冲通道
}

// NewCond returns a Cond.
func NewCond() *Cond {
	return &Cond{
		signal: make(chan lang.PlaceholderType), // 创建chan
	}
}

// WaitWithTimeout wait for signal return remain wait time or timed out.
func (cond *Cond) WaitWithTimeout(timeout time.Duration) (time.Duration, bool) { // 带超时的等待
	timer := time.NewTimer(timeout)
	defer timer.Stop()

	begin := timex.Now()
	select {
	case <-cond.signal: // 有其他协程调用了Signal()函数,Wait成功
		elapsed := timex.Since(begin) // 本次Wait的耗时
		remainTimeout := timeout - elapsed
		return remainTimeout, true // 返回剩余的超时时间,和true(表示Wait成功)
	case <-timer.C: // 超时了
		return 0, false // 返回剩余的超时时间0,和false(表示Wait超时了)
	}
}

// Wait waits for signals.
func (cond *Cond) Wait() { // 等待:无限期阻塞,直到通道可读时返回(说明有其他协程调用了Signal()函数尝试唤醒)
	<-cond.signal
}

// Signal wakes one goroutine waiting on c, if there is any.
// 如果存在多个正在执行Wait()的协程,只唤醒一个(最先抢占读取到通道的协程,其余协程继续阻塞);
// 如果存在一个正在执行Wait()的协程,那么唤醒它;否者什么也不干(0个协程在Wait)
func (cond *Cond) Signal() { // 唤醒一个协程
	select {
	case cond.signal <- lang.Placeholder: // 如果存在一个正在执行Wait()的协程,那么唤醒它
	default: // 缺省分支,不存在正在执行Wait()的协程,什么都不做,直接返回
	}
}

核心流程

可能存在的问题

1.缺少广播唤醒接口Broadcast

可能的方案是采用close(cond.signal),然后重建cond.signal的方式。但是需要增加互斥锁进行保护(同时对已有接口增加互斥锁代码,防止对已经closed的通道进行发送等panic)。

func (cond *Cond) Broadcast() {
	cond.mu.Lock()
	defer cond.mu.Unlock()

	close(cond.signal)                            // 关闭通道,广播唤醒所有的等待协程(close函数的广播特性)
	cond.signal = make(chan lang.PlaceholderType) // 重建通道
}

另外一个方法是对调用Wait()或WaitWithTimeout()函数的等待协程进行计数,在广播接口中循环该计数次Signal()函数,达到广播唤醒所有的效果。这个方法看上去更好。

2.可能还存在其他问题

该方案在执行Wait系列函数时,对通道cond.signal的读取是抢占式的,谁最先读取到通道谁wait返回,即谁被唤醒是随机的。