当前位置:网站首页>golang source code analysis: uber-go/ratelimit
golang source code analysis: uber-go/ratelimit
2022-08-02 23:05:00 【User 9710217】
https://github.com/uber-go/ratelimit Is a bucket of current limiter to realize,
rl := ratelimit.New(100) // per second
prev := time.Now()
for i := 0; i < 10; i++ {
now := rl.Take()
fmt.Println(i, now.Sub(prev))
prev = now
}
在这个例子中,我们给定限流器每秒可以通过 100 个请求,也就是The average request interval 10ms.因此,最终会每 10ms 打印一行数据.输出结果如下:
// Output:
// 0 0
// 1 10ms
// 2 10ms
The whole package in the source code is as follows:
example_test.go
limiter_atomic.go
limiter_mutexbased.go
ratelimit.go
ratelimit_bench_test.go
ratelimit_test.go
1,ratelimit.New
Look at the first initialization process:
// New returns a Limiter that will limit to the given RPS.
func New(rate int, opts ...Option) Limiter {
return newAtomicBased(rate, opts...)
}
传入的参数是1s内产生的token数量:
// newAtomicBased returns a new atomic based limiter.
func newAtomicBased(rate int, opts ...Option) *atomicLimiter {
// TODO consider moving config building to the implementation
// independent code.
config := buildConfig(opts)
perRequest := config.per / time.Duration(rate)
l := &atomicLimiter{
perRequest: perRequest,
maxSlack: -1 * time.Duration(config.slack) * perRequest,
clock: config.clock,
}
initialState := state{
last: time.Time{},
sleepFor: 0,
}
atomic.StorePointer(&l.state, unsafe.Pointer(&initialState))
return l
}
1,通过options修改配置参数 config := buildConfig(opts)
func buildConfig(opts []Option) config {
c := config{
clock: clock.New(),
slack: 10,
per: time.Second,
}
for _, opt := range opts {
opt.apply(&c)
}
return c
}
可以看到,默认情况下per是1s
2,Computed a token money(时间间隔)
perRequest := config.per / time.Duration(rate)
3,初始化atomicLimiter,Tokens generated time interval,时钟
type atomicLimiter struct {
state unsafe.Pointer
//lint:ignore U1000 Padding is unused but it is crucial to maintain performance
// of this rate limiter in case of collocation with other frequently accessed memory.
padding [56]byte // cache line size - state pointer size = 64 - 8; created to avoid false sharing.
perRequest time.Duration
maxSlack time.Duration
clock Clock
}
4,Record the initialization state:The current time and sleep time
type state struct {
last time.Time
sleepFor time.Duration
}
After complete the initialization process,We have entered the token of the process.
2,rl.Take
Take是一个接口,返回当前时间
// Limiter is used to rate-limit some process, possibly across goroutines.
// The process is expected to call Take() before every iteration, which
// may block to throttle the goroutine.
type Limiter interface {
// Take should block to make sure that the RPS is met.
Take() time.Time
}
atomicLimiter 实现了这个接口
// Take blocks to ensure that the time spent between multiple
// Take calls is on average time.Second/rate.
func (t *atomicLimiter) Take() time.Time {
var (
newState state
taken bool
interval time.Duration
)
for !taken {
now := t.clock.Now()
previousStatePointer := atomic.LoadPointer(&t.state)
oldState := (*state)(previousStatePointer)
newState = state{
last: now,
sleepFor: oldState.sleepFor,
}
// If this is our first request, then we allow it.
if oldState.last.IsZero() {
taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
continue
}
// sleepFor calculates how much time we should sleep based on
// the perRequest budget and how long the last request took.
// Since the request may take longer than the budget, this number
// can get negative, and is summed across requests.
newState.sleepFor += t.perRequest - now.Sub(oldState.last)
// We shouldn't allow sleepFor to get too negative, since it would mean that
// a service that slowed down a lot for a short period of time would get
// a much higher RPS following that.
if newState.sleepFor < t.maxSlack {
newState.sleepFor = t.maxSlack
}
if newState.sleepFor > 0 {
newState.last = newState.last.Add(newState.sleepFor)
interval, newState.sleepFor = newState.sleepFor, 0
}
taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
}
t.clock.Sleep(interval)
return newState.last
}
1,获取当前时间
2,If it is initialized state,That is the last access time is0,Then set the last access time is the time,直接返回.
3,Calculation of the sleep time,睡眠时间=The previous record of sleep+Each token of interval-(当前时间-上一次访问时间),Is the access time interval
4,If sleep time is less thanmaxSlack,Description the request volume is small,From the last access time for a long time,Amend the sleep timemaxSlack,Otherwise unable to cope with a large number of sudden traffic.
5,If sleep time is greater than0,That request quantity is big,Need to wait for a while to return,调用 t.clock.Sleep(t.sleepFor),进入睡眠状态,At the same time modify the last access time and sleep time
6,如果小于等于0,That request is,可以立即返回,并记录当前时间
mutexLimiter Also implements the interface:
type mutexLimiter struct {
sync.Mutex
last time.Time
sleepFor time.Duration
perRequest time.Duration
maxSlack time.Duration
clock Clock
}
The difference is one is based on mutexes implement,One is based on atomic operation to realize
func (t *mutexLimiter) Take() time.Time {
t.Lock()
defer t.Unlock()
now := t.clock.Now()
// If this is our first request, then we allow it.
if t.last.IsZero() {
t.last = now
return t.last
}
// sleepFor calculates how much time we should sleep based on
// the perRequest budget and how long the last request took.
// Since the request may take longer than the budget, this number
// can get negative, and is summed across requests.
t.sleepFor += t.perRequest - now.Sub(t.last)
// We shouldn't allow sleepFor to get too negative, since it would mean that
// a service that slowed down a lot for a short period of time would get
// a much higher RPS following that.
if t.sleepFor < t.maxSlack {
t.sleepFor = t.maxSlack
}
// If sleepFor is positive, then we should sleep now.
if t.sleepFor > 0 {
t.clock.Sleep(t.sleepFor)
t.last = now.Add(t.sleepFor)
t.sleepFor = 0
} else {
t.last = now
}
return t.last
}
Leaky Bucket,每个请求的间隔是固定的,然而,在实际上的互联网应用中,流量经常是突发性的.对于这种情况,uber-go 对 Leaky Bucket 做了一些改良,引入了最大松弛量 (maxSlack) 的概念.We understand the overall background: If we request per second limit 100 个请求,The average request interval 10ms.但是实际情况下,Some request interval is longer,Some request interval shorter.
(1)当 t.sleepFor > 0,On behalf of the previous request that extra time,Can't completely offset the required quantity,因此需要 sleep 相应时间, 同时将 t.sleepFor 置为 0.
(2)当 t.sleepFor < 0,Describe the request more than expected intervals,Will accumulate the extra time to t.sleepFor 即可.
但是,对于某种情况,请求 1 完成后,请求 2 After a long time to arrive (好几个小时都有可能),So for the request at this time 2 的请求间隔 now.Sub(t.last),会非常大.Behind that even if a large number of requests arrive instantaneous,Also can't offset all the time.It thus lost the meaning of the current limiting.
为了防止这种情况,ratelimit Introduced maximum relaxation (maxSlack) 的概念, 该值为负值,表示允许抵消的最长时间,防止以上情况的出现.
边栏推荐
- 技术分享 | Apache Linkis 快速集成网页IDE工具 Scriptis
- 【StoneDB性能相关工具】内存监控
- ShardingSphere-proxy +PostgreSQL implements read-write separation (static strategy)
- Leetcode刷题——字符串相加相关题目(415. 字符串相加、面试题 02.05. 链表求和、2. 两数相加)
- 云平台简介
- 4KMILES加入艾盛集团,以更强劲的数字商务能力,加速中国跨境电商的全域全效增长
- ALV报表学习总结
- 7月29-31 | APACHECON ASIA 2022
- 解析List接口中的常用的被实现子类重写的方法
- 六石管理学:入门机会只有一次,先把产品做好
猜你喜欢
Five data structures of Redis and their corresponding usage scenarios
特拉维夫大学 | Efficient Long-Text Understanding with Short-Text Models(使用短文本模型进行高效的长文本理解)
Translate My Wonderful | July Moli Translation Program Winners Announced
实现fashion_minst服装图像分类
Brain-computer interface 003 | Musk said that he has realized a virtual self-dialogue with the cloud, and related concept shares have risen sharply
实战:10 种实现延迟任务的方法,附代码!
Mysql安装流程 【压缩版】
太魔人招新啦|快来加入我们吧!
Redis 5 种数据结构及对应使用场景
SQL Server实现group_concat功能
随机推荐
Parse common methods in the Collection interface that are overridden by subclasses
ABAP语法小复习
技术分享 | Apache Linkis 快速集成网页IDE工具 Scriptis
如何ES源码中添加一个自己的API 流程梳理
SQL 入门之第一讲——MySQL 8.0.29安装教程(windows 64位)
译出我精彩 | 7月墨力翻译计划获奖名单公布
Async的线程池使用的哪个?
SQL 嵌套 N 层太长太难写怎么办?
分享一个 web 应用版本监测 (更新) 的工具库
「 每日一练,快乐水题 」1374. 生成每种字符都是奇数个的字符串
SQL Server数据类型转换函数cast()和convert()详解
当TIME_WAIT状态的TCP正常挥手,收到SYN后…
Wintun:一款惊艳的 WireGuard 虚拟网卡接口驱动
软考 ----- UML设计与分析(下)
什么是乙二醇二乙酸酯(EGDA)?
J9 digital theory: the Internet across chain bridge has what effect?
VMware虚拟机无法上网
golang源码分析:time/rate
setup syntax sugar defineProps defineEmits defineExpose
golang刷leetcode 经典(9)为运算表达式设计优先级