限流算法
限流,是一种自我保护的机制,服务端通过一些限流手段控制并发访问请求,目的是为了防止系统因突发流量导致服务不可用或者崩溃。
常用的限流方法:
- 计数器限流
- 滑动窗口限流
- 漏桶限流
- 令牌桶限流
计数器限流
在一段时间间隔内,对请求进行计数,与阈值进行比较判断是否需要限流 时间结束后重置计数器 因此计数器限流也叫固定时间限流

如图所示,假设每秒钟限流5次,那么接下来的第6次、第7次...会被拒绝,但下一秒钟又可以继续访问,直到达到限流5次后,又被拒绝。
单机版
type CounterLimiter struct {
	mux     sync.Mutex
	ctime   time.Time
	cycle   time.Duration
	counter int
	limit   int
}
func NewCounterLimiter(limit int, cycle time.Duration) *CounterLimiter {
	return &CounterLimiter{
		mux:     sync.Mutex{},
		ctime:   time.Now(),
		cycle:   cycle,
		counter: 0,
		limit:   limit,
	}
}
func (c *CounterLimiter) Reset() {
	c.ctime = time.Now()
	c.counter = 0
}
func (c *CounterLimiter) Allow() bool {
	c.mux.Lock()
	defer c.mux.Unlock()
	now := time.Now()
	if now.Sub(c.ctime) >= c.cycle {
		c.Reset()
		return true
	}
	if c.counter >= c.limit {
		return false
	}
	c.counter++
	return true
}测试
func TestCounterLimiter_Allow(t *testing.T) {
	c := NewCounterLimiter(5, time.Second)
	var wg sync.WaitGroup
	wg.Add(10)
	for i := 0; i < 10; i++ {
		go func() {
			defer wg.Done()
			if allow := c.Allow(); allow {
				t.Log("allow")
				time.Sleep(time.Millisecond * 200) //模拟处理耗时
			} else {
				t.Log("unallow")
			}
		}()
		time.Sleep(time.Millisecond * 100)
	}
	wg.Wait()
}测试结果,由于我们限制了每秒种只能处理5个请求,前5个运行通过,后5个由于限流,不允许通过
=== RUN   TestCounterLimiter_Allow
    counter_test.go:17: allow
    counter_test.go:17: allow
    counter_test.go:17: allow
    counter_test.go:17: allow
    counter_test.go:17: allow
    counter_test.go:20: unallow
    counter_test.go:20: unallow
    counter_test.go:20: unallow
    counter_test.go:20: unallow
    counter_test.go:20: unallow但是计数器限流存在一个问题,如下图所示:

假设限制每秒钟限流5个请求 虽然在00:00:30-00:00:31之前有5个请求,但没有超过5个,00:00:31-00:00:32之间有5个请求,但也没有超过5个。 但是在00:00:31秒附近确有10次请求。 我们模拟这种场景测试如下:
func TestCounterLimiter_Allow2(t *testing.T) {
	c := NewCounterLimiter(5, time.Second)
	var wg sync.WaitGroup
	wg.Add(10)
	worker := func() {
		defer wg.Done()
		if allow := c.Allow(); allow {
			t.Log("allow")
			time.Sleep(time.Millisecond * 200) //模拟处理耗时
		} else {
			t.Log("unallow")
		}
	}
	time.Sleep(time.Millisecond * 900)
	for i := 0; i < 5; i++ {
		go worker()
	}
	time.Sleep(time.Millisecond * 100)
	for i:=0;i<5;i++{
		go worker()
	}
	time.Sleep(time.Millisecond*900)
	wg.Wait()
}测试结果:
=== RUN   TestCounterLimiter_Allow2
    counter_test.go:35: allow
    counter_test.go:35: allow
    counter_test.go:35: allow
    counter_test.go:35: allow
    counter_test.go:35: allow
    counter_test.go:35: allow
    counter_test.go:35: allow
    counter_test.go:35: allow
    counter_test.go:35: allow
    counter_test.go:35: allow很显然计数器限流存在临界值问题,限流不够均衡。假设极端情况下,在临界值附近有2倍的流量,很显然我们的系统会处理不过来。
分布式
分布式是当前的主流模式,在分布式环境下我们可以使用redis+lua脚本实现计数器限流
-- KEYS[1] target key
-- KEYS[2] max count key from redis config
-- ARGV[n = 3] max count, duration(milliseconds), current timestamp
-- HASH: KEYS[1]
--   field:count
--   field:limit
--   field:duration
--   field:reset
local res = {}
local limit = redis.call('hmget', KEYS[1], 'count', 'limit', 'duration', 'reset')
if limit[1] then
  res[1] = tonumber(limit[1]) - 1
  res[2] = tonumber(limit[2])
  res[3] = tonumber(limit[3]) or ARGV[3]
  res[4] = tonumber(limit[4])
  if res[1] >= -1 then
    redis.call('hincrby', KEYS[1], 'count', -1)
  else
    res[1] = -1
  end
else
  local total = tonumber(redis.call('get', KEYS[2]) or ARGV[2])
  res[1] = total - 1
  res[2] = total
  res[3] = tonumber(ARGV[3])
  res[4] = tonumber(ARGV[1]) + res[3]
  redis.call('hmset', KEYS[1], 'count', res[1], 'limit', res[2], 'duration', res[3], 'reset', res[4])
  redis.call('pexpire', KEYS[1], res[3])
end
return res测试
package main
import (
	"fmt"
	"html"
	"log"
	"net/http"
	"strconv"
	"time"
	"github.com/garyburd/redigo/redis"
)
var conn redis.Conn
const lua string = `
-- KEYS[1] target key
-- KEYS[2] max count key from redis config
-- ARGV[n = 3] max count, duration(milliseconds), current timestamp
-- HASH: KEYS[1]
--   field:count
--   field:limit
--   field:duration
--   field:reset
local res = {}
local limit = redis.call('hmget', KEYS[1], 'count', 'limit', 'duration', 'reset')
if limit[1] then
  res[1] = tonumber(limit[1]) - 1
  res[2] = tonumber(limit[2])
  res[3] = tonumber(limit[3]) or ARGV[3]
  res[4] = tonumber(limit[4])
  if res[1] >= -1 then
    redis.call('hincrby', KEYS[1], 'count', -1)
  else
    res[1] = -1
  end
else
  local total = tonumber(redis.call('get', KEYS[2]) or ARGV[2])
  res[1] = total - 1
  res[2] = total
  res[3] = tonumber(ARGV[3])
  res[4] = tonumber(ARGV[1]) + res[3]
  redis.call('hmset', KEYS[1], 'count', res[1], 'limit', res[2], 'duration', res[3], 'reset', res[4])
  redis.call('pexpire', KEYS[1], res[3])
end
return res
`
func init() {
	var err error
	conn, err = redis.Dial("tcp", "localhost:6379")
	if err != nil {
		panic(err)
	}
}
func main() {
	defer conn.Close()
	total := "5"
	s := redis.NewScript(2, lua)
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		if r.URL.Path == "/" {
			now := strconv.FormatInt(time.Now().UnixNano()/1e6, 10)
			duration := strconv.FormatInt(10*time.Second.Milliseconds(), 10)
			log.Println(total, now, duration)
			v, err := redis.Ints(s.Do(conn, "user-1001", "total-user", now, total, duration))
			if err != nil {
				http.Error(w, err.Error(), 500)
				return
			}
			if v[0] >= 0 {
				w.WriteHeader(200)
				fmt.Fprintf(w, "Path: %q\n", html.EscapeString(r.URL.Path))
				fmt.Fprintf(w, "Remaining: %d\n", v[0])
				fmt.Fprintf(w, "Total: %d\n", v[1])
				fmt.Fprintf(w, "Duration: %v\n", v[2])
				fmt.Fprintf(w, "Reset: %v\n", v[3])
			} else {
				w.WriteHeader(429)
				fmt.Fprintf(w, "Rate limit exceeded, retry seconds.\n")
			}
		}
	})
	log.Fatal(http.ListenAndServe(":8080", nil))
}滑动窗口限流
滑动窗口(Sliding Window)限流在每次检查时,查看最近窗口(例如10秒钟)的请求次数,如果超过最大次数(例如5次)就拒绝,反之则允许 由于时间窗口是滑动的即没有像计数器限流固定的临界点,解决了计数器的临界问题

分布式
local res = {}
local key = KEYS[1]
local score = tonumber(ARGV[1])
local limit = tonumber(redis.call('get', KEYS[2]) or ARGV[2])
local duration = tonumber(ARGV[3])
local max = tonumber(score - duration*1000000)
redis.call("zremrangebyscore", key, 0, max)
local count = redis.call("zcard", key)
if count >= limit then
	res[1] = -1
else
	res[1] = limit - count - 1
	redis.call("zadd", key, score, score)
	redis.call("pexpire", key, duration)
end
res[2] = limit
res[3] = duration
res[4] = duration*100000 + score
return res测试
package main
import (
	"fmt"
	"html"
	"log"
	"net/http"
	"strconv"
	"time"
	"github.com/garyburd/redigo/redis"
)
const limitWindow  =`
local res = {}
local key = KEYS[1]
local score = tonumber(ARGV[1])
local limit = tonumber(redis.call('get', KEYS[2]) or ARGV[2])
local duration = tonumber(ARGV[3])
local max = tonumber(score - duration*1000000)
redis.call("zremrangebyscore", key, 0, max)
local count = redis.call("zcard", key)
if count >= limit then
	res[1] = -1
else
	res[1] = limit - count - 1
	redis.call("zadd", key, score, score)
	redis.call("pexpire", key, duration)
end
res[2] = limit
res[3] = duration
res[4] = duration*100000 + score
return res
`
var connWindow redis.Conn
func init() {
	var err error
	connWindow, err = redis.Dial("tcp", "localhost:6379")
	if err != nil {
		panic(err)
	}
}
func main() {
	defer connWindow.Close()
	total := "5"
	s := redis.NewScript(2, limitWindow)
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		if r.URL.Path == "/" {
			now := strconv.FormatInt(time.Now().UnixNano()/1e6, 10)
			duration := strconv.FormatInt(10*time.Second.Milliseconds(), 10)
			log.Println(total, now, duration)
			v, err := redis.Ints(s.Do(connWindow, "user-1001", "total-user", now, total, duration))
			if err != nil {
				http.Error(w, err.Error(), 500)
				return
			}
			if v[0] >= 0 {
				w.WriteHeader(200)
				fmt.Fprintf(w, "Path: %q\n", html.EscapeString(r.URL.Path))
				fmt.Fprintf(w, "Remaining: %d\n", v[0])
				fmt.Fprintf(w, "Total: %d\n", v[1])
				fmt.Fprintf(w, "Duration: %v\n", v[2])
				fmt.Fprintf(w, "Reset: %v\n", v[3])
			} else {
				w.WriteHeader(429)
				fmt.Fprintf(w, "Rate limit exceeded, retry seconds.\n")
			}
		}
	})
	log.Fatal(http.ListenAndServe(":8080", nil))
}漏桶限流
漏桶算法(Leaky Bucket)主要目的是控制数据注入到网络的速率,平滑网络上的突发流量。 漏桶可以看作是一个带有常量服务时间的单服务器队列,如果漏桶(包缓存)溢出,那么数据包会被丢弃。 如图所示,把请求比作是谁,水来了都先放进桶里,并以限定的速度出水,当水来的过猛而出水不够快时就会导致水直接溢出。

单机版
uber团队开源的漏桶,使用比较简单。
package main
import (
	"fmt"
	"time"
	"go.uber.org/ratelimit"
)
func main() {
	rl := ratelimit.New(100) // per second
	prev := time.Now()
	for i := 0; i < 10; i++ {
		now := rl.Take()
		fmt.Println(i, now.Sub(prev))
		prev = now
	}
}令牌桶限流
令牌桶算法包含三个过程:
- 生产令牌:以固定的速率向令牌桶中增加令牌,如果桶中令牌个数达到上限,则丢弃多余令牌;
- 消费令牌:每来一次请求,就会消耗桶中的令牌,可以对耗时高的请求增加每次消费的令牌个数;
- 判断是否通过:请求到来,根据消费的令牌个数是否小于等于桶中令牌个数,如果满足则允许通过,否则不允许通过;

分布式
-- KEYS[1] target key
-- ARGV[n = 3] current timestamp, max count, interval
-- HASH: KEYS[1]
--   field: count
--   field: limit
--   field: interval
--   field: timestamp
local res = {}
local curr_timestamp = tonumber(ARGV[1])
local limit = redis.call('hmget', KEYS[1], 'count', 'limit', 'interval', 'timestamp')
if limit[1] then
 local count = tonumber(limit[1])
 local total = tonumber(limit[2])
 local interval = tonumber(limit[3])
 local last_timestamp = tonumber(limit[4])
 --计算上一次放令牌到现在的时间间隔中,一共应该放入多少令牌
 local increase_token = math.max(0, math.floor((curr_timestamp - last_timestamp) / interval))
 local token = math.min(total, count + increase_token)
 res[1] = token
 --取令牌
 if token >0 then
  res[1] = token - 1
 end
 --更新当前桶中的令牌数量 
 redis.call('hset', KEYS[1], 'count', res[1])
    res[2] = total
 res[3] = interval
 res[4] = last_timestamp
 if increase_token >0 then
  --如果这次有放入令牌,则更新时间
  res[4] = curr_timestamp
  redis.call('hset', KEYS[1], 'timestamp', res[4])
 end
else
 local total = tonumber(ARGV[2])
 res[1] = total - 1
 res[2] = total
 res[3] = tonumber(ARGV[3])
 res[4] = curr_timestamp
 redis.call('hmset',KEYS[1],'count',res[1],'limit',res[2],'interval',res[3],'timestamp',res[4])
end
return res