斷續(xù)器
計(jì)時(shí)器 是當(dāng)想在未來(lái)做一些事情 - tickers 是用于定期做一些事情。這里是一個(gè)例行程序,周期性執(zhí)行直到停止。
代碼使用與計(jì)時(shí)器的機(jī)制類(lèi)似:發(fā)送值到通道。這里我們將使用通道上的一個(gè)范圍內(nèi)來(lái)迭代,每隔500ms發(fā)送一次。
代碼可以像定時(shí)器一樣停止,當(dāng)代碼停止后,它不會(huì)再其通道上接收值。
package main
import (
"time"
"fmt"
)
func main(){
ticker := time.NewTicker(time.Millisecond * 500)
go func() {
for t := range ticker.C{
fmt.Println("Tick at ",t)
}
}()
time.Sleep(time.Millisecond * 1600 )
ticker.Stop()
fmt.Println("Ticker stopped")
}
Go 工作池
在這個(gè)例子中,我們將實(shí)現(xiàn)如何使用 goroutine 和 channel 實(shí)現(xiàn)一個(gè)工作池。
這里是工作程序(worker),我們將運(yùn)行幾個(gè)并發(fā)實(shí)例。這些工作程序(worker)將在工作 chan 上接收工作,并將發(fā)送相應(yīng)的結(jié)果。這里使用 延時(shí)1s的方式模擬工作的過(guò)程。
為了使用工作程序(worker)池,需要向他們發(fā)送任務(wù)并收集相關(guān)結(jié)果。這里實(shí)現(xiàn)的時(shí)候使用了兩個(gè)通道。這啟動(dòng)了 3 個(gè)worker,最初被阻止,因?yàn)闆](méi)有任務(wù)。
然后手機(jī)作業(yè)的所有結(jié)果。
package main
import (
"fmt"
"time"
)
//worker本體函數(shù)
func worker(id int,job <-chan int, result chan<- int){
for j:=range job{
fmt.Println("worker",id,"started job",j)
time.Sleep(time.Second)
fmt.Println("worker",id,"finished job",j)
result<- j*2
}
}
func main(){
jobs:= make(chan int,100)
results := make(chan int,100)
//創(chuàng)建3個(gè)worker
for w:=1 ; w<= 3;w++{
go worker(w,jobs,results)
}
//分配5個(gè)任務(wù)
for j:=1 ;j<= 5 ; j++{
jobs <- j
}
close(jobs)
//等待所有工作完成
for a :=1 ; a<=5 ; a++{
<- results
}
}
Go 速率限制
速率限制是控制資源利用和維持服務(wù)質(zhì)量的重要機(jī)制。通過(guò) goroutines,channel,ticker 都可以?xún)?yōu)雅的支持速率限制。
首先我們來(lái)看一下基本速率限制。假設(shè)想限制對(duì)傳入請(qǐng)求的處理。我們需要在同一個(gè)通道上處理。
這個(gè)限制器通道將 2000ms 接收一個(gè)值。這是速率限制方案中的調(diào)節(jié)器。
通過(guò)在服務(wù)每個(gè)請(qǐng)求之前阻塞來(lái)自限制器信道的接收,我們限制自己每200ms接收一個(gè)請(qǐng)求。
我們可能希望在速率限制方案中允許端脈沖串請(qǐng)求,同時(shí)保持總體速率限制。可以通過(guò)緩沖的限制器通道來(lái)實(shí)現(xiàn)。這個(gè) burstyLimiter通道將允許最多 3 個(gè)事件的突發(fā)。
填充通道以表示允許突發(fā)。
每2000ms,將嘗試向 burstyLimiter添加一個(gè)新值,最大限制為 3 。現(xiàn)在模擬 5個(gè)更多的傳入請(qǐng)求。這些傳入請(qǐng)求的前三個(gè)未超過(guò)burstyLimiter 值。
package main
import (
"time"
"fmt"
)
func main(){
requests := make(chan int , 5)
for i:= 1 ; i<= 5 ; i++{
requests <- i
}
close(requests)
limiter := time.Tick(time.Millisecond * 2000)
for req := range requests{
<- limiter
fmt.Println("request",req,time.Now())
}
burstyLimiter := make(chan time.Time , 3)
for i:= 0 ; i<3;i++{
burstyLimiter <- time.Now()
}
go func() {
for t:= range time.Tick(time.Millisecond * 2000){
burstyLimiter <- t
}
}()
burstyRequests := make(chan int , 5)
for i:=1 ; i<= 5 ; i++{
burstyRequests <- i
}
close(burstyRequests)
for req := range burstyRequests{
<- burstyLimiter
fmt.Println("request",req,time.Now())
}
}
Go原子計(jì)數(shù)器
go語(yǔ)言中管理狀態(tài)的主要機(jī)制是通過(guò)通道進(jìn)行通信。在過(guò)去的文章中,我們已經(jīng)看到了這一點(diǎn),例如工作池。還有一些其他選項(xiàng)用于管理狀態(tài)。這里我們將使用 sync/atomic 包來(lái)實(shí)現(xiàn)由多個(gè) goroutine 訪問(wèn)的原子計(jì)數(shù)器。
使用一個(gè)無(wú)符號(hào)整數(shù)表示計(jì)數(shù)器(正數(shù))
為了模擬并發(fā)更新,將啟動(dòng) 50個(gè) goroutine , 每個(gè)增量計(jì)數(shù)器大學(xué)是 1ms。
為了原子地遞增計(jì)數(shù)器,這里使用 AddUint64() 函數(shù),在 ops 計(jì)數(shù)器的內(nèi)存地址上使用 & 語(yǔ)法。
為了安全地使用計(jì)數(shù)器,同時(shí)它任然被其他 goroutine 更新。通過(guò) LoadUint64提取一個(gè)當(dāng)前值的副本到 opsFinal。如上所述,需要獲取值的內(nèi)存地址 &ops 給這個(gè)函數(shù)。
運(yùn)行程序顯示執(zhí)行了大約 40000次操作。根據(jù)自己機(jī)器性能可以嘗試其他更nice的操作。
package main
import (
"sync/atomic"
"time"
"fmt"
)
func main(){
var ops uint64 = 0
for i:= 0 ; i< 50 ; i++{
go func() {
for{
atomic.AddUint64(&ops,1)
time.Sleep(time.Millisecond * 1 )
}
}()
}
time.Sleep(time.Second * 10)
opsFinal := atomic.LoadUint64(&ops)
fmt.Println("ops",opsFinal)
}