Openstack rate limit

配置

1
2
3
4
5
6
7
8
9
10
11
12
文件路径:/etc/nova/api-deploy.ini
[composite:openstack_volume_api_v1]
use = call:nova.api.auth:pipeline_factory
noauth = faultwrap sizelimit noauth ratelimit osapi_volume_app_v1
keystone = faultwrap sizelimit authtoken keystonecontext ratelimit osapi_volume_app_v1
keystone_nolimit = faultwrap sizelimit authtoken keystonecontext osapi_volume_app_v1
[filter:ratelimit]
paste.filter_factory = nova.api.openstack.compute.limits:RateLimitingMiddleware.factory
limiter = nova.api.openstack.compute.limits.WsgiLimiter
limits = (POST, "*", .*, 100, MINUTE);(POST, "*/servers", ^/servers, 50, DAY);(PUT, "*", .*, 100, MINUTE);(GET, "*changes-since*", .*changes-since.*, 3, MINUTE);(DELETE, "*", .*, 100, MINUTE)
处理逻辑

流控处理逻辑入口为 RateLimitingMiddleware 类的 call() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@webob.dec.wsgify(RequestClass=wsgi.Request)
def __call__(self, req):
"""
Represents a single call through this middleware. We should record the
request if we have a limit relevant to it. If no limit is relevant to
the request, ignore it.
If the request should be rate limited, return a fault telling the user
they are over the limit and need to retry later.
"""
verb = req.method
url = req.url
context = req.environ.get("nova.context")
if context:
username = context.user_id
else:
username = None
delay, error = self._limiter.check_for_delay(verb, url, username)
if delay:
msg = _("This request was rate-limited.")
retry = time.time() + delay
return wsgi.OverLimitFault(msg, error, retry)
req.environ["nova.limits"] = self._limiter.get_limits(username)
return self.application

主要看这句 delay, error = self._limiter.check_for_delay(verb, url, username),其中的 self._limiter 在 init 方法中初始化为配置文件中的 nova.api.openstack.compute.limits.WsgiLimiter 实例,查看该类的 check_for_delay 方法(该类没有实现这个方法,直接看其父类):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Limiter(object):
def check_for_delay(self, verb, url, username=None):
"""
Check the given verb/user/user triplet for limit.
@return: Tuple of delay (in seconds) and error message (or None, None)
"""
delays = []
for limit in self.levels[username]:
delay = limit(verb, url)
if delay:
delays.append((delay, limit.error_message))
if delays:
delays.sort()
return delays[0]
return None, None

重点看 check_for_delay 方法中的循环处理,self.levels 为配置文件中 limits 转换的 dict。循环读取 limits 配置内容,调用 Limit 类的 call 方法做流量控制检查。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
self.levels = collections.defaultdict(lambda: copy.deepcopy(limits))
for limit in self.levels[username]:
delay = limit(verb, url)
if delay:
delays.append((delay, limit.error_message))
class Limit(object):
def __call__(self, verb, url):
"""
Represents a call to this limit from a relevant request.
@param verb: string http verb (POST, GET, etc.)
@param url: string URL
"""
if self.verb != verb or not re.match(self.regex, url):
return
now = self._get_time()
if self.last_request is None:
self.last_request = now
leak_value = now - self.last_request
self.water_level -= leak_value
self.water_level = max(self.water_level, 0)
self.water_level += self.request_value
difference = self.water_level - self.capacity
self.last_request = now
if difference > 0:
self.water_level -= self.request_value
self.next_request = now + difference
return difference
cap = self.capacity
water = self.water_level
val = self.value
self.remaining = math.floor(((cap - water) / cap) * val)
self.next_request = now

类 Limit 的 call 方法,请求的方法和 uri 如果符合 limits 配置项中的某一项,则按照该项配置要求做相应的检验。比如下面配置中的这一项 (POST, ““, ., 40, SECOND),call 方法则会校验每分钟的 POST 请求是否超过了100次。如果没有超过,则继续执行;如果超过,则返回异常提示信息给用户,请求超过流量限制,请过一段时间再试。

1
limits =(POST, "*", .*, 100, MINUTE);(POST, "*/servers", ^/servers, 50, DAY);(PUT, "*", .*, 100, MINUTE);(GET, "*changes-since*", .*changes-since.*, 3, MINUTE);(DELETE, "*", .*, 100, MINUTE)