image

限流算法(一)滑动时间窗口算法(Golang实现)

  • WORDS 4464

限流算法(一)滑动时间窗口算法(Golang实现)

滑动时间窗口算法是固定时间窗口算法的改良版本,用于弥补固定时间窗口无法平滑记录请求次数的缺陷。在固定时间窗口算法中,时间窗口只记录固定时间范围内的客户端请求次数,达到了指定的时间范围就会清空所有的请求记录并重新开始记录,如果一个客户端平滑的发送大量请求,在即将达到限流阈值时负责记录的时间窗口重置了,那么针对该客户端永远也达不到我们想要的限流效果。

算法介绍

滑动事件窗口通过将时间段内分为多个时间窗口,达到指定的更新时间后向前滑动时间窗口即可,其它的时间窗口里面依然记录着客户端的在某一时段内的请求记录。使用的时间窗口越多,那么请求记录的更新就会更平滑。

Description

到达指定的更新时间后(这里是 200ms),丢弃最前面的时间窗口,同时新建一个窗口用于记录当前的请求。

Description

这样就好像窗口一直在时间轴上滑动一样,可以更加平滑的记录请求。一般时间窗口都使用数组和链表来时间,但是在 go中有一个更适合的结构,那就是 ring.Ring环形链表。在滑动时间窗口的时候,相较于数组来说,不存在扩容和整体位移的操作,存在一定的性能优势。

准备工作

定义一个限流接口方便后面扩展,接口有两个方法,一个使用执行限流逻辑的 DoLimit,另一个是返回限流器名称的 Name方法。

// Limiter 限流器接口
type Limiter interface {
	// DoLimit 限流方法 返回该请求是否被限流
	DoLimit(key string) bool

	// Name 返回限流器的名称
	Name() string
}

时间窗口主要就是维护某个 key的请求次数,直接使用 sync.Map确保并发安全

type Window struct {
	sync.Map
}

// Count 统计当前窗口中该key的数据
func (b *Window) Count(key string) int {
	if value, ok := b.Load(key); ok {
		return value.(int)
	}
	return 0
}

// Add 将key添加到窗口 不存在就添加 存在就加1
func (b *Window) Add(key string) {
	var newValue int
	if value, ok := b.Load(key); ok {
		newValue = value.(int) + 1
	} else {
		newValue = 1
	}
	b.Store(key, newValue)
}

滑动时间窗口限流配置

// SlidingConfig 滑动时间窗口限流器配置
type SlidingConfig struct {
	Enable    bool          `json:"enable" yaml:"enable"`
	Name      string        `json:"name" yaml:"name"`            // 自定义限流器的名称
	WindowNum int           `json:"windowNum" yaml:"window-num"` // 时间窗口的数量 滑动窗口的统计时间 = 窗口数量 * 更新间隔
	Interval  time.Duration `json:"interval" yaml:"interval"`   // 时间窗口的更新间隔
	Threshold int64         `json:"threshold" yaml:"threshold"` // 客户端在时间端内的最大请求数量
}

实现

定义结构体

// SlidingWindow 基于ring环形列表的滑动时间窗口
// 用于统计一段时间内客户端的请求次数以达到限流的目的
type SlidingWindow struct {
	name      string        // 名称
	length    int           // 时间窗口的数量
	interval  time.Duration // 每次时间窗口滑动的间隔
	queue     *ring.Ring    // 环形列表
	mutex     sync.RWMutex  // 读写锁
	threshold int64         // 限流阈值
}

新建滑动先例窗口函数,需要传入配置信息

// NewSlidingWindow 创建滑动时间窗口
// config 配置
func NewSlidingWindow(config SlidingConfig) *SlidingWindow {
	config = slidingConfigDefault(config)
	window := &SlidingWindow{
		name:      config.Name,
		length:    config.WindowNum,
		interval:  config.Interval,
		threshold: config.Threshold,
	}
    // 初始化环形列表
	queue := ring.New(config.WindowNum)
	for i := 0; i < config.WindowNum; i++ {
		queue.Value = new(Window)
		queue = queue.Next()
	}
	window.queue = queue
	return window
}

添加记录请求和统计请求方法

// 记录请求
func (w *SlidingWindow) record(key string) {
	w.mutex.RLock()
	defer w.mutex.RUnlock()
	bucket, _ := w.queue.Value.(*Window)
	bucket.Add(key)
}

// 统计请求 获取当前所有窗口内该key的请求次数
func (w *SlidingWindow) stats(key string) int64 {
	w.mutex.RLock()
	defer w.mutex.RUnlock()
	var sum int64
    // 遍历环形链表 不存在数据竞态 无需使用atomic
	w.queue.Do(func(i any) {
		bucket, _ := i.(*Window)
		sum += int64(bucket.Count(key))
	})
	return sum
}

添加更新时间窗口的方法,可以传入 context.Context用于退出

// TimingSideWindow 定时更新时间窗口,使用 interval 参数做为间隔时间
// ctx 监听Done事件 停止定时器并退出任务
func (w *SlidingWindow) TimingSideWindow(ctx context.Context) {
	go func(ctx context.Context) {
        // 定时器
		ticker := time.NewTicker(w.interval)
		defer ticker.Stop()
		for {
			select {
			case <-ticker.C:
				w.mutex.Lock()
                // 设置为链表的下一个元素
				w.queue = w.queue.Next()
                // 更新为新的时间窗口
				w.queue.Value = new(Window)
				w.mutex.Unlock()
			case <-ctx.Done():
				return
			}
		}
	}(ctx)
}

实现 Limiter接口

// DoLimit 限流函数 判断当前key的请求次数是不是达到了指定的请求阈值
// 如果没有达到那么就继续记录请求并返回false 否则返回true
func (w *SlidingWindow) DoLimit(key string) bool {
	stats := w.stats(key)
	if stats < w.threshold {
		w.record(key)
		return false
	}
	return true
}

func (w *SlidingWindow) Name() string {
	return w.name
}

测试

func TestSlidingWindow(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
    // 创建一个滑动时间窗口限流器
	window := NewSlidingWindow(SlidingConfig{
		Interval:  3 * time.Second,
		WindowNum: 5,
        interval:   1 * time.Second,
	})
    // 启动定时更新时间窗口
	window.TimingSideWindow(ctx)
	go func() {
        // 每30ms记录一次
		ticker := time.NewTicker(30 * time.Millisecond)
		defer ticker.Stop()
		for {
			select {
			case <-ticker.C:
				fmt.Printf("demo: %v\n", window.DoLimit("demo"))
			case <-ctx.Done():
				return
			}
		}
	}()
	go func() {
		ticker := time.NewTicker(80 * time.Millisecond)
        // 每80ms记录一次
		defer ticker.Stop()
		for {
			select {
			case <-ticker.C:
				fmt.Printf("test: %v\n", window.DoLimit("test"))
			case <-ctx.Done():
				return
			}
		}
	}()
	time.Sleep(30 * time.Second)
	cancel()
}

关联文章

0 条评论