限流算法(一)滑动时间窗口算法(Golang实现)
滑动时间窗口算法是固定时间窗口算法的改良版本,用于弥补固定时间窗口无法平滑记录请求次数的缺陷。在固定时间窗口算法中,时间窗口只记录固定时间范围内的客户端请求次数,达到了指定的时间范围就会清空所有的请求记录并重新开始记录,如果一个客户端平滑的发送大量请求,在即将达到限流阈值时负责记录的时间窗口重置了,那么针对该客户端永远也达不到我们想要的限流效果。
算法介绍
滑动事件窗口通过将时间段内分为多个时间窗口,达到指定的更新时间后向前滑动时间窗口即可,其它的时间窗口里面依然记录着客户端的在某一时段内的请求记录。使用的时间窗口越多,那么请求记录的更新就会更平滑。

到达指定的更新时间后(这里是 200ms),丢弃最前面的时间窗口,同时新建一个窗口用于记录当前的请求。

这样就好像窗口一直在时间轴上滑动一样,可以更加平滑的记录请求。一般时间窗口都使用数组和链表来时间,但是在 go中有一个更适合的结构,那就是 ring.Ring环形链表。在滑动时间窗口的时候,相较于数组来说,不存在扩容和整体位移的操作,存在一定的性能优势。
准备工作
定义一个限流接口方便后面扩展,接口有两个方法,一个使用执行限流逻辑的 DoLimit,另一个是返回限流器名称的 Name方法。
// Limiter 限流器接口
type Limiter interface {
// DoLimit 限流方法 返回该请求是否被限流
DoLimit(key string) bool
// Name 返回限流器的名称
Name() string
}
时间窗口主要就是维护某个 key的请求次数,直接使用 sync.Map确保并发安全
type Window struct {
sync.Map
}
// Count 统计当前窗口中该key的数据
func (b *Window) Count(key string) int {
if value, ok := b.Load(key); ok {
return value.(int)
}
return 0
}
// Add 将key添加到窗口 不存在就添加 存在就加1
func (b *Window) Add(key string) {
var newValue int
if value, ok := b.Load(key); ok {
newValue = value.(int) + 1
} else {
newValue = 1
}
b.Store(key, newValue)
}
滑动时间窗口限流配置
// SlidingConfig 滑动时间窗口限流器配置
type SlidingConfig struct {
Enable bool `json:"enable" yaml:"enable"`
Name string `json:"name" yaml:"name"` // 自定义限流器的名称
WindowNum int `json:"windowNum" yaml:"window-num"` // 时间窗口的数量 滑动窗口的统计时间 = 窗口数量 * 更新间隔
Interval time.Duration `json:"interval" yaml:"interval"` // 时间窗口的更新间隔
Threshold int64 `json:"threshold" yaml:"threshold"` // 客户端在时间端内的最大请求数量
}
实现
定义结构体
// SlidingWindow 基于ring环形列表的滑动时间窗口
// 用于统计一段时间内客户端的请求次数以达到限流的目的
type SlidingWindow struct {
name string // 名称
length int // 时间窗口的数量
interval time.Duration // 每次时间窗口滑动的间隔
queue *ring.Ring // 环形列表
mutex sync.RWMutex // 读写锁
threshold int64 // 限流阈值
}
新建滑动先例窗口函数,需要传入配置信息
// NewSlidingWindow 创建滑动时间窗口
// config 配置
func NewSlidingWindow(config SlidingConfig) *SlidingWindow {
config = slidingConfigDefault(config)
window := &SlidingWindow{
name: config.Name,
length: config.WindowNum,
interval: config.Interval,
threshold: config.Threshold,
}
// 初始化环形列表
queue := ring.New(config.WindowNum)
for i := 0; i < config.WindowNum; i++ {
queue.Value = new(Window)
queue = queue.Next()
}
window.queue = queue
return window
}
添加记录请求和统计请求方法
// 记录请求
func (w *SlidingWindow) record(key string) {
w.mutex.RLock()
defer w.mutex.RUnlock()
bucket, _ := w.queue.Value.(*Window)
bucket.Add(key)
}
// 统计请求 获取当前所有窗口内该key的请求次数
func (w *SlidingWindow) stats(key string) int64 {
w.mutex.RLock()
defer w.mutex.RUnlock()
var sum int64
// 遍历环形链表 不存在数据竞态 无需使用atomic
w.queue.Do(func(i any) {
bucket, _ := i.(*Window)
sum += int64(bucket.Count(key))
})
return sum
}
添加更新时间窗口的方法,可以传入 context.Context用于退出
// TimingSideWindow 定时更新时间窗口,使用 interval 参数做为间隔时间
// ctx 监听Done事件 停止定时器并退出任务
func (w *SlidingWindow) TimingSideWindow(ctx context.Context) {
go func(ctx context.Context) {
// 定时器
ticker := time.NewTicker(w.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
w.mutex.Lock()
// 设置为链表的下一个元素
w.queue = w.queue.Next()
// 更新为新的时间窗口
w.queue.Value = new(Window)
w.mutex.Unlock()
case <-ctx.Done():
return
}
}
}(ctx)
}
实现 Limiter接口
// DoLimit 限流函数 判断当前key的请求次数是不是达到了指定的请求阈值
// 如果没有达到那么就继续记录请求并返回false 否则返回true
func (w *SlidingWindow) DoLimit(key string) bool {
stats := w.stats(key)
if stats < w.threshold {
w.record(key)
return false
}
return true
}
func (w *SlidingWindow) Name() string {
return w.name
}
测试
func TestSlidingWindow(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
// 创建一个滑动时间窗口限流器
window := NewSlidingWindow(SlidingConfig{
Interval: 3 * time.Second,
WindowNum: 5,
interval: 1 * time.Second,
})
// 启动定时更新时间窗口
window.TimingSideWindow(ctx)
go func() {
// 每30ms记录一次
ticker := time.NewTicker(30 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
fmt.Printf("demo: %v\n", window.DoLimit("demo"))
case <-ctx.Done():
return
}
}
}()
go func() {
ticker := time.NewTicker(80 * time.Millisecond)
// 每80ms记录一次
defer ticker.Stop()
for {
select {
case <-ticker.C:
fmt.Printf("test: %v\n", window.DoLimit("test"))
case <-ctx.Done():
return
}
}
}()
time.Sleep(30 * time.Second)
cancel()
}






0 条评论