不依賴外部庫(kù)的情況下,限流算法有什么實(shí)現(xiàn)的思路?本文介紹了3種實(shí)現(xiàn)限流的方式。
一、漏桶算法
- 算法思想 與令牌桶是“反向”的算法,當(dāng)有請(qǐng)求到來(lái)時(shí)先放到木桶中,worker以固定的速度從木桶中取出請(qǐng)求進(jìn)行相應(yīng)。如果木桶已經(jīng)滿了,直接返回請(qǐng)求頻率超限的錯(cuò)誤碼或者頁(yè)面
-
適用場(chǎng)景
流量最均勻的限流方式,一般用于流量“整形”,例如保護(hù)數(shù)據(jù)庫(kù)的限流。先把對(duì)數(shù)據(jù)庫(kù)的訪問(wèn)加入到木桶中,worker再以db能夠承受的qps從木桶中取出請(qǐng)求,去訪問(wèn)數(shù)據(jù)庫(kù)。不太適合電商搶購(gòu)和微博出現(xiàn)熱點(diǎn)事件等場(chǎng)景的限流,一是應(yīng)對(duì)突發(fā)流量不是很靈活,二是為每個(gè)user_id/ip維護(hù)一個(gè)隊(duì)列(木桶),workder從這些隊(duì)列中拉取任務(wù),資源的消耗會(huì)比較大。
-
go語(yǔ)言實(shí)現(xiàn)
通常使用隊(duì)列來(lái)實(shí)現(xiàn),在go語(yǔ)言中可以通過(guò)buffered channel來(lái)快速實(shí)現(xiàn),任務(wù)加入channel,開(kāi)啟一定數(shù)量的worker從channel中獲取任務(wù)執(zhí)行。
package mainimport ("fmt""sync""time")// 每個(gè)請(qǐng)求來(lái)了,把需要執(zhí)行的業(yè)務(wù)邏輯封裝成Task,放入木桶,等待worker取出執(zhí)行type Task struct {handler func() Result // worker從木桶中取出請(qǐng)求對(duì)象后要執(zhí)行的業(yè)務(wù)邏輯函數(shù)resChan chan Result // 等待worker執(zhí)行并返回結(jié)果的channeltaskID int}// 封裝業(yè)務(wù)邏輯的執(zhí)行結(jié)果type Result struct {}// 模擬業(yè)務(wù)邏輯的函數(shù)func handler() Result {time.Sleep(300 * time.Millisecond)return Result{}}func NewTask(id int) Task {return Task{handler: handler,resChan: make(chan Result),taskID: id,}}// 漏桶type LeakyBucket struct {BucketSize int // 木桶的大小NumWorker int // 同時(shí)從木桶中獲取任務(wù)執(zhí)行的worker數(shù)量bucket chan Task // 存方任務(wù)的木桶}func NewLeakyBucket(bucketSize int, numWorker int) *LeakyBucket {return &LeakyBucket{BucketSize: bucketSize,NumWorker: numWorker,bucket: make(chan Task, bucketSize),}}func (b *LeakyBucket) validate(task Task) bool {// 如果木桶已經(jīng)滿了,返回falseselect {case b.bucket <- task:default:fmt.Printf("request[id=%d] is refused ", task.taskID)return false}// 等待worker執(zhí)行<-task.resChanfmt.Printf("request[id=%d] is run ", task.taskID)return true}func (b *LeakyBucket) Start() {// 開(kāi)啟worker從木桶拉取任務(wù)執(zhí)行go func() {for i := 0; i < b.NumWorker; i++ {go func() {for {task := <-b.bucketresult := task.handler()task.resChan <- result}}()}}()}func main() {bucket := NewLeakyBucket(10, 4)bucket.Start()var wg sync.WaitGroupfor i := 0; i < 20; i++ {wg.Add(1)go func(id int) {defer wg.Done()task := NewTask(id)bucket.validate(task)}(i)}wg.Wait()}
二、令牌桶算法
-
算法思想
想象有一個(gè)木桶,以固定的速度往木桶里加入令牌,木桶滿了則不再加入令牌。服務(wù)收到請(qǐng)求時(shí)嘗試從木桶中取出一個(gè)令牌,如果能夠得到令牌則繼續(xù)執(zhí)行后續(xù)的業(yè)務(wù)邏輯;如果沒(méi)有得到令牌,直接返回反問(wèn)頻率超限的錯(cuò)誤碼或頁(yè)面等,不繼續(xù)執(zhí)行后續(xù)的業(yè)務(wù)邏輯
- 特點(diǎn):由于木桶內(nèi)只要有令牌,請(qǐng)求就可以被處理,所以令牌桶算法可以支持突發(fā)流量。同時(shí)由于往木桶添加令牌的速度是固定的,且木桶的容量有上限,所以單位時(shí)間內(nèi)處理的請(qǐng)求書也能夠得到控制,起到限流的目的。假設(shè)加入令牌的速度為 1token/10ms,桶的容量為500,在請(qǐng)求比較的少的時(shí)候(小于每10毫秒1個(gè)請(qǐng)求)時(shí),木桶可以先"攢"一些令牌(最多500個(gè))。當(dāng)有突發(fā)流量時(shí),一下把木桶內(nèi)的令牌取空,也就是有500個(gè)在并發(fā)執(zhí)行的業(yè)務(wù)邏輯,之后要等每10ms補(bǔ)充一個(gè)新的令牌才能接收一個(gè)新的請(qǐng)求。
- 參數(shù)設(shè)置:木桶的容量 - 考慮業(yè)務(wù)邏輯的資源消耗和機(jī)器能承載并發(fā)處理多少業(yè)務(wù)邏輯。生成令牌的速度 - 太慢的話起不到“攢”令牌應(yīng)對(duì)突發(fā)流量的效果。
-
適用場(chǎng)景:
適合電商搶購(gòu)或者微博出現(xiàn)熱點(diǎn)事件這種場(chǎng)景,因?yàn)樵谙蘖鞯耐瑫r(shí)可以應(yīng)對(duì)一定的突發(fā)流量。如果采用均勻速度處理請(qǐng)求的算法,在發(fā)生熱點(diǎn)時(shí)間的時(shí)候,會(huì)造成大量的用戶無(wú)法訪問(wèn),對(duì)用戶體驗(yàn)的損害比較大。
-
go語(yǔ)言實(shí)現(xiàn):
假設(shè)每100ms生產(chǎn)一個(gè)令牌,按user_id/IP記錄訪問(wèn)最近一次訪問(wèn)的時(shí)間戳 t_last 和令牌數(shù),每次請(qǐng)求時(shí)如果 now - last > 100ms, 增加 (now - last) / 100ms個(gè)令牌。然后,如果令牌數(shù) > 0,令牌數(shù) -1 繼續(xù)執(zhí)行后續(xù)的業(yè)務(wù)邏輯,否則返回請(qǐng)求頻率超限的錯(cuò)誤碼或頁(yè)面。
package mainimport ("fmt""sync""time")// 并發(fā)訪問(wèn)同一個(gè)user_id/ip的記錄需要上鎖var recordMu map[string]*sync.RWMutexfunc init() {recordMu = make(map[string]*sync.RWMutex)}func max(a, b int) int {if a > b {return a}return b}type TokenBucket struct {BucketSize int // 木桶內(nèi)的容量:最多可以存放多少個(gè)令牌TokenRate time.Duration // 多長(zhǎng)時(shí)間生成一個(gè)令牌records map[string]*record // 報(bào)錯(cuò)user_id/ip的訪問(wèn)記錄}// 上次訪問(wèn)時(shí)的時(shí)間戳和令牌數(shù)type record struct {last time.Timetoken int}func NewTokenBucket(bucketSize int, tokenRate time.Duration) *TokenBucket {return &TokenBucket{BucketSize: bucketSize,TokenRate: tokenRate,records: make(map[string]*record),}}func (t *TokenBucket) getUidOrIp() string {// 獲取請(qǐng)求用戶的user_id或者ip地址return "127.0.0.1"}// 獲取這個(gè)user_id/ip上次訪問(wèn)時(shí)的時(shí)間戳和令牌數(shù)func (t *TokenBucket) getRecord(uidOrIp string) *record {if r, ok := t.records[uidOrIp]; ok {return r}return &record{}}// 保存user_id/ip最近一次請(qǐng)求時(shí)的時(shí)間戳和令牌數(shù)量func (t *TokenBucket) storeRecord(uidOrIp string, r *record) {t.records[uidOrIp] = r}// 驗(yàn)證是否能獲取一個(gè)令牌func (t *TokenBucket) validate(uidOrIp string) bool {// 并發(fā)修改同一個(gè)用戶的記錄上寫鎖rl, ok := recordMu[uidOrIp]if !ok {var mu sync.RWMutexrl = &murecordMu[uidOrIp] = rl}rl.Lock()defer rl.Unlock()r := t.getRecord(uidOrIp)now := time.Now()if r.last.IsZero() {// 第一次訪問(wèn)初始化為最大令牌數(shù)r.last, r.token = now, t.BucketSize} else {if r.last.Add(t.TokenRate).Before(now) {// 如果與上次請(qǐng)求的間隔超過(guò)了token rate// 則增加令牌,更新lastr.token += max(int(now.Sub(r.last) / t.TokenRate), t.BucketSize)r.last = now}}var result boolif r.token > 0 {// 如果令牌數(shù)大于1,取走一個(gè)令牌,validate結(jié)果為truer.token--result = true}// 保存最新的recordt.storeRecord(uidOrIp, r)return result}// 返回是否被限流func (t *TokenBucket) IsLimited() bool {return !t.validate(t.getUidOrIp())}func main() {tokenBucket := NewTokenBucket(5, 100*time.Millisecond)for i := 0; i< 6; i++ {fmt.Println(tokenBucket.IsLimited())}time.Sleep(100 * time.Millisecond)fmt.Println(tokenBucket.IsLimited())}
三、滑動(dòng)時(shí)間窗口算法
-
算法思想
滑動(dòng)時(shí)間窗口算法,是從對(duì)普通時(shí)間窗口計(jì)數(shù)的優(yōu)化。
使用普通時(shí)間窗口時(shí),我們會(huì)為每個(gè)user_id/ip維護(hù)一個(gè)KV: uidOrIp: timestamp_requestCount。假設(shè)限制1秒1000個(gè)請(qǐng)求,那么第100ms有一個(gè)請(qǐng)求,這個(gè)KV變成 uidOrIp: timestamp_1,遞200ms有1個(gè)請(qǐng)求,我們先比較距離記錄的timestamp有沒(méi)有超過(guò)1s,如果沒(méi)有只更新count,此時(shí)KV變成 uidOrIp: timestamp_2。當(dāng)?shù)?100ms來(lái)一個(gè)請(qǐng)求時(shí),更新記錄中的timestamp并重置計(jì)數(shù),KV變成 uidOrIp: newtimestamp_1
普通時(shí)間窗口有一個(gè)問(wèn)題,假設(shè)有500個(gè)請(qǐng)求集中在前1s的后100ms,500個(gè)請(qǐng)求集中在后1s的前100ms,其實(shí)在這200ms沒(méi)就已經(jīng)請(qǐng)求超限了,但是由于時(shí)間窗每經(jīng)過(guò)1s就會(huì)重置計(jì)數(shù),就無(wú)法識(shí)別到此時(shí)的請(qǐng)求超限。
對(duì)于滑動(dòng)時(shí)間窗口,我們可以把1ms的時(shí)間窗口劃分成10個(gè)time slot, 每個(gè)time slot統(tǒng)計(jì)某個(gè)100ms的請(qǐng)求數(shù)量。每經(jīng)過(guò)100ms,有一個(gè)新的time slot加入窗口,早于當(dāng)前時(shí)間100ms的time slot出窗口。窗口內(nèi)最多維護(hù)10個(gè)time slot,儲(chǔ)存空間的消耗同樣是比較低的。
-
適用場(chǎng)景
與令牌桶一樣,有應(yīng)對(duì)突發(fā)流量的能力
-
go語(yǔ)言實(shí)現(xiàn)
主要就是實(shí)現(xiàn)sliding window算法??梢詤⒖糂ilibili開(kāi)源的kratos框架里circuit breaker用循環(huán)列表保存time slot對(duì)象的實(shí)現(xiàn),他們這個(gè)實(shí)現(xiàn)的好處是不用頻繁的創(chuàng)建和銷毀time slot對(duì)象。下面給出一個(gè)簡(jiǎn)單的基本實(shí)現(xiàn):
package mainimport ("fmt""sync""time")var winMu map[string]*sync.RWMutexfunc init() {winMu = make(map[string]*sync.RWMutex)}type timeSlot struct {timestamp time.Time // 這個(gè)timeSlot的時(shí)間起點(diǎn)count int // 落在這個(gè)timeSlot內(nèi)的請(qǐng)求數(shù)}func countReq(win []*timeSlot) int {var count intfor _, ts := range win {count += ts.count}return count}type SlidingWindowLimiter struct {SlotDuration time.Duration // time slot的長(zhǎng)度WinDuration time.Duration // sliding window的長(zhǎng)度numSlots int // window內(nèi)最多有多少個(gè)slotwindows map[string][]*timeSlotmaxReq int // win duration內(nèi)允許的最大請(qǐng)求數(shù)}func NewSliding(slotDuration time.Duration, winDuration time.Duration, maxReq int) *SlidingWindowLimiter {return &SlidingWindowLimiter{SlotDuration: slotDuration,WinDuration: winDuration,numSlots: int(winDuration / slotDuration),windows: make(map[string][]*timeSlot),maxReq: maxReq,}}// 獲取user_id/ip的時(shí)間窗口func (l *SlidingWindowLimiter) getWindow(uidOrIp string) []*timeSlot {win, ok := l.windows[uidOrIp]if !ok {win = make([]*timeSlot, 0, l.numSlots)}return win}func (l *SlidingWindowLimiter) storeWindow(uidOrIp string, win []*timeSlot) {l.windows[uidOrIp] = win}func (l *SlidingWindowLimiter) validate(uidOrIp string) bool {// 同一user_id/ip并發(fā)安全mu, ok := winMu[uidOrIp]if !ok {var m sync.RWMutexmu = &mwinMu[uidOrIp] = mu}mu.Lock()defer mu.Unlock()win := l.getWindow(uidOrIp)now := time.Now()// 已經(jīng)過(guò)期的time slot移出時(shí)間窗timeoutOffset := -1for i, ts := range win {if ts.timestamp.Add(l.WinDuration).After(now) {break}timeoutOffset = i}if timeoutOffset > -1 {win = win[timeoutOffset+1:]}// 判斷請(qǐng)求是否超限var result boolif countReq(win) < l.maxReq {result = true}// 記錄這次的請(qǐng)求數(shù)var lastSlot *timeSlotif len(win) > 0 {lastSlot = win[len(win)-1]if lastSlot.timestamp.Add(l.SlotDuration).Before(now) {lastSlot = &timeSlot{timestamp: now, count: 1}win = append(win, lastSlot)} else {lastSlot.count++}} else {lastSlot = &timeSlot{timestamp: now, count: 1}win = append(win, lastSlot)}l.storeWindow(uidOrIp, win)return result}func (l *SlidingWindowLimiter) getUidOrIp() string {return "127.0.0.1"}func (l *SlidingWindowLimiter) IsLimited() bool {return !l.validate(l.getUidOrIp())}func main() {limiter := NewSliding(100*time.Millisecond, time.Second, 10)for i := 0; i < 5; i++ {fmt.Println(limiter.IsLimited())}time.Sleep(100 * time.Millisecond)for i := 0; i < 5; i++ {fmt.Println(limiter.IsLimited())}fmt.Println(limiter.IsLimited())for _, v := range limiter.windows[limiter.getUidOrIp()] {fmt.Println(v.timestamp, v.count)}fmt.Println("a thousand years later...")time.Sleep(time.Second)for i := 0; i < 7; i++ {fmt.Println(limiter.IsLimited())}for _, v := range limiter.windows[limiter.getUidOrIp()] {fmt.Println(v.timestamp, v.count)}}
原文標(biāo)題:幾種限流算法的go語(yǔ)言實(shí)現(xiàn)
文章出處:【微信公眾號(hào):馬哥Linux運(yùn)維】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
審核編輯:湯梓紅
聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問(wèn)題,請(qǐng)聯(lián)系本站處理。
舉報(bào)投訴
-
算法
+關(guān)注
關(guān)注
23文章
4800瀏覽量
98487 -
go語(yǔ)言
+關(guān)注
關(guān)注
1文章
159瀏覽量
9839
原文標(biāo)題:幾種限流算法的go語(yǔ)言實(shí)現(xiàn)
文章出處:【微信號(hào):magedu-Linux,微信公眾號(hào):馬哥Linux運(yùn)維】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
熱點(diǎn)推薦
一文詳解藍(lán)牙模塊原理與結(jié)構(gòu)
電子發(fā)燒友網(wǎng)站提供《一文詳解藍(lán)牙模塊原理與結(jié)構(gòu).pdf》資料免費(fèi)下載
發(fā)表于 11-26 16:40
?94次下載
常用限流方式分析 怎么設(shè)計(jì)出高并發(fā)限流方案
,而對(duì)于超過(guò)限制的流量,則通過(guò)拒絕服務(wù)的方式保證整體系統(tǒng)的可用性。 根據(jù)限流作用范圍,可以分為 單機(jī)限流和分布式限流 ;根據(jù)限流
Redis實(shí)現(xiàn)限流的三種方式分享
當(dāng)然,限流有許多種實(shí)現(xiàn)的方式,Redis具有很強(qiáng)大的功能,我用Redis實(shí)踐了三種的實(shí)現(xiàn)方式,可以較為簡(jiǎn)單的
限流方案常用算法 常用的限流方案
需要注意的是借助Redis實(shí)現(xiàn)的限流方案可用于分布式系統(tǒng),而guava實(shí)現(xiàn)的限流只能應(yīng)用于單機(jī)環(huán)境。如果你覺(jué)得服務(wù)器端限流麻煩,可以在不改任
發(fā)表于 04-08 10:50
?780次閱讀
為什么需要限流?常見(jiàn)的限流算法有哪些
計(jì)數(shù)器法是限流算法里最簡(jiǎn)單也是最容易實(shí)現(xiàn)的一種算法,具體規(guī)則為:在指定周期內(nèi)累加訪問(wèn)次數(shù),當(dāng)訪問(wèn)的次數(shù)達(dá)到我們?cè)O(shè)定的閾值時(shí),觸發(fā)
詳解從均值濾波到非局部均值濾波算法的原理及實(shí)現(xiàn)方式
將再啰嗦一次,詳解從均值濾波到非局部均值濾波算法的原理及實(shí)現(xiàn)方式。 細(xì)數(shù)主要的2D降噪算法,如下
Redis實(shí)現(xiàn)分布式多規(guī)則限流的方式介紹
市面上很多介紹 Redis 如何實(shí)現(xiàn)限流的,但是大部分都有一個(gè)缺點(diǎn),就是只能實(shí)現(xiàn)單一的限流,比如
一文詳解限流算法的實(shí)現(xiàn)方式
評(píng)論