redis限流方法

发布 : 2021-03-04

1. 利用之前的redis锁来处理队列请求

利用set nx 原子操作强行排队

2. 使用zset有序集合限定某一时间内的流量

原理

zset为有序集合, 记录下某一key值,相同分数, 不同访问时间
zadd user:10 0 1617070146897, 请求到来时, 获取到(限流总数-1)的有序数据量, 即当前限制key的次数访问记录, 判断第一个值的时间, 是否为在限制区间内

相关命令操作

记限制单个用户发送验证码的访问频率为1分钟1次, 5分钟2次,1小时5次, 一天10次

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 请求进入, 获取已存入的
zcard user:10:$url => $count
# 获取前一条数据(每分钟)
zrange user:10:$url -1 -1
# 对比时间 $time -$result < 1000
# 获取前一条数据(5分钟)
zrange user:10:$url -2 -2
# 对比时间 $time -$result < 5000
# 获取前一条数据(60分钟)
zrange user:10:$url -5 -5
# 对比时间 $time -$result < 3600000

# 获取前一条数据(一天)
zrange user:10:$url -10 -10
# 对比时间 $time -$result < 24 * 3600 * 000

# 全部通过, 记下本次
zadd user:10:$url $time $time

# 同时删除,超过24小时+1ms的数据
zremrangebyscore user:10:$url 0 ($time - 24 * 3600 * 1000 + 1)
# 可以先查询再删除,减少数据变动
zcount user:10:$url 0 ($time - 24 * 3600 * 1000 + 1)


# 注意点: 每条数据的最大保存时间为,限制幅度的最大时间, 此处为 24 * 3600 + 1

相关代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
class RateLimiter {
/**
*
* @param {String} type 限制类型, user|uri|ip, 可组合, user,uri|uri,ip|user,ip|user,uri,ip
*/
constructor(type, msg) {
// 用于判断生成 zset 的 key
// 空,则仅对user进行判断
this.types = (type || 'user').split(',');
this.msg = msg;
this.rules = [];
}

getKey(req) {
let keys = [];
for(let item of this.types) {
switch(item) {
case 'user':
key.push(`user:${req.userIfo.id}`)
break;
case 'uri':
key.push(`uri:${req.url}`)
break;
case 'ip':
key.push(`ip:${req.get('X-Real-IP') || req.ip}`)
break;
}
}
return keys.join(':');
}

rate(rules) {
if (!rules) throw new Error("ReteLimiter.rate is not rules");
this.rules = (rules || []).sort((a,b) => a.time > b.time);
return this.action
}

action(req, res, next) {
let key = this.getKey(req);
// 组织定义对象
let redis = req.redis;
for(var item of this.rules) {
let limit = await this.query(redis, key, item);
if (limit) {
res.statusCoce(429);
res.send({"msg": this.msg || 'Many too request'})
return res.end();
}
}

await next();
// 记录
this.mark(redis, key, this.rules[this.rules.length-1].time);
}

async mark(redis, key, maxRule) {
let time = new Date().getTime();
redis.zadd(key, time, time);
// 移除过期
let remTime = time - maxRule * 1000 - 1;
let count = await redis.zcount(key, 0, remTime);
if (count>0) {
await redis.zremrangebysore(key, 0, remTime);
}
}

/**
*
* @param {RedisConection} redis
* @param {Object} ops 查询单位
* @param {Number} ops.time 单位时长, 单位:s, 1s, 300s, 3600s等
* @param {Number} ops.rate 单位时间限制次数
* @returns {Boolean} 是否被限制
*/
async query(redis, key, {time, rate}) {
let now = new Date().getTime();
let result = await redis.zrange(key, -1 * rate, -1 * rate);
if (now - result[0] < time * 1000) return true;
return false
}
}

3. 使用令牌桶算法限流

  1. 漏桶

    以一定速度生成令牌, 放入桶内, 请求来临时, 桶内以固定速率释放令牌, 请求获取到令牌,则继续进行, 无峰值访问, 实际请求强制排队等候, 类似setnx, 可以用 lpush/blpop模拟实现, 假定生成与释放令牌速度相同, 即定时生成即消费

漏桶

1
2
3
4
# 生成
lpush user:10:$url $token
# 获取
blpop user:10:$url
  1. 令牌桶

    以一定速度生成令牌, 放入桶内, 请求来临时,从桶内获取令牌, 如果桶内为空则拒绝请求, 当请求速度大于令牌生成速度时, 桶内在一定时间内为空, 产生拒绝服务, 可以产生较小峰值

共同缺点, 需要额外进程/线程以固定速率生成令牌放入队列, 利用的是 lpush/blpop/llen适合通用接口流量削峰填谷

令牌桶

1
2
3
4
5
6
7
8
# 额外进程生成令牌
llen user:10:$url => 如果大于设定值如100, 则不再继续生成
lpush user:10:$url $token

# 请求进入(redis blpop先到先得,不会超发)
blpop user:10:$url
# 拿到令牌继续操作

本文作者 : 萧逸雨
原文链接 : http://qiubo.ink/2021/03/04/redis限流方法/
版权声明 : 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!