ants的基本使用

簡介

ants是一個高性能的 goroutine 池,實現(xiàn)了對大規(guī)模 goroutine 的調度管理、goroutine 復用,允許使用者在開發(fā)并發(fā)程序的時候限制 goroutine 數(shù)量,復用資源,達到更高效執(zhí)行任務的效果。

功能

  • 自動調度海量的 goroutines,復用 goroutines
  • 定期清理過期的 goroutines,進一步節(jié)省資源
  • 提供了大量有用的接口:任務提交、獲取運行中的 goroutine 數(shù)量、動態(tài)調整 Pool 大小、釋放 Pool、重啟 Pool
  • 優(yōu)雅處理 panic,防止程序崩潰
  • 資源復用,極大節(jié)省內存使用量;在大規(guī)模批量并發(fā)任務場景下比原生 goroutine 并發(fā)具有更高的性能
  • 非阻塞機制

地址

https://github.com/panjf2000/ants

項目中引入地址:

github.com/panjf2000/ants/v2

使用

ants中pool有2種方式NewPool和NewPoolWithFunc這2種方式創(chuàng)建。defaultPool也是NewPool的一種。這兩種方式的區(qū)別在于,NewPool創(chuàng)建的pool傳入的任務是task函數(shù)方法,用Submit方法提交一個task任務。而NewPoolWithFunc創(chuàng)建的pool傳入的任務是參數(shù),task函數(shù)是固定,在NewPoolWithFunc聲明的時候已經(jīng)傳入了函數(shù),用Invoke(args interface{})提交參數(shù)任務。
因此適用的場景也是不一樣的,NewPool適用于多個goroutine處理的事是不一樣的(傳入不同的函數(shù)可以體現(xiàn)出來),NewPoolWithFunc適用于多個goroutine處理的事是一樣的,只不過是要分批處理(函數(shù)是固定的,參數(shù)是要多次傳入的可以體現(xiàn)出來)。

  • 文檔demo
    demo中為我們介紹了defaultPool和NewPoolWithFunc的用法
package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"

    "github.com/panjf2000/ants/v2"
)

var sum int32

func myFunc(i interface{}) {
    n := i.(int32)
    atomic.AddInt32(&sum, n)
    fmt.Printf("run with %d\n", n)
}

func demoFunc() {
    time.Sleep(10 * time.Millisecond)
    fmt.Println("Hello World!")
}

func main() {
    defer ants.Release()

    runTimes := 1000

    // Use the common pool.
    var wg sync.WaitGroup
    syncCalculateSum := func() {
        demoFunc()
        wg.Done()
    }
    for i := 0; i < runTimes; i++ {
        wg.Add(1)
        _ = ants.Submit(syncCalculateSum)
    }
    wg.Wait()
    fmt.Printf("running goroutines: %d\n", ants.Running())
    fmt.Printf("finish all tasks.\n")

    // Use the pool with a function,
    // set 10 to the capacity of goroutine pool and 1 second for expired duration.
    p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
        myFunc(i)
        wg.Done()
    })
    defer p.Release()
    // Submit tasks one by one.
    for i := 0; i < runTimes; i++ {
        wg.Add(1)
        _ = p.Invoke(int32(i))
    }
    wg.Wait()
    fmt.Printf("running goroutines: %d\n", p.Running())
    fmt.Printf("finish all tasks, result is %d\n", sum)
}
  • 設置最大等待隊列長度
package main

import (
    "fmt"
    "github.com/panjf2000/ants/v2"
    "sync"
    "time"
)

func wrapper(i int, wg *sync.WaitGroup) func() {
    return func() {
        fmt.Printf("hello from task:%d\n", i)
        time.Sleep(1 * time.Second)
        wg.Done()
    }
}

// 阻塞
//我們設置 goroutine 池的容量為 4,最大阻塞隊列長度為 2。然后一個 for 提交 8 個任務,期望結果是:4 個任務在執(zhí)行,2 個任務在等待,2 個任務提交失敗。
func main() {
    p, _ := ants.NewPool(4, ants.WithMaxBlockingTasks(2))
    defer p.Release()

    var wg sync.WaitGroup
    wg.Add(8)
    for i := 1; i <= 8; i++ {
        go func(i int) {
            err := p.Submit(wrapper(i, &wg))
            if err != nil {
                fmt.Printf("task:%d err:%v\n", i, err)
                wg.Done()
            }
        }(i)
    }

    wg.Wait()
}

我們設置 goroutine 池的容量為 4,最大阻塞隊列長度為 2。然后一個 for 提交 8 個任務,期望結果是:4 個任務在執(zhí)行,2 個任務在等待,2 個任務提交失敗。

  • 非阻塞
package main

import (
    "fmt"
    "github.com/panjf2000/ants/v2"
    "sync"
    "time"
)

func wrapper(i int, wg *sync.WaitGroup) func() {
    return func() {
        fmt.Printf("hello from task:%d\n", i)
        time.Sleep(1 * time.Second)
        wg.Done()
    }
}

// 阻塞
//我們設置 goroutine 池的容量為 4,最大阻塞隊列長度為 2。然后一個 for 提交 8 個任務,期望結果是:4 個任務在執(zhí)行,2 個任務在等待,2 個任務提交失敗。
func main() {
    p, _ := ants.NewPool(4, ants.WithMaxBlockingTasks(2))
    defer p.Release()

    var wg sync.WaitGroup
    wg.Add(8)
    for i := 1; i <= 8; i++ {
        go func(i int) {
            err := p.Submit(wrapper(i, &wg))
            if err != nil {
                fmt.Printf("task:%d err:%v\n", i, err)
                wg.Done()
            }
        }(i)
    }

    wg.Wait()
}

ants池容量設置為 2。連續(xù)提交 3 個任務,期望結果前兩個任務正常執(zhí)行,第 3 個任務提交時返回錯誤

  • panic 處理器
package main

import (
    "fmt"
    "github.com/panjf2000/ants/v2"
    "sync"
    "time"
)

func wrapper(i int, wg *sync.WaitGroup) func() {
    return func() {
        fmt.Printf("hello from task:%d\n", i)
        if i%2 == 0 {
            panic(fmt.Sprintf("panic from task:%d", i))
        }
        wg.Done()
    }
}

//我們讓偶數(shù)個任務觸發(fā)panic。提交兩個任務,第二個任務一定會觸發(fā)panic。觸發(fā)panic之后,我們還可以繼續(xù)提交任務 3、5。注意這里沒有 4,提交任務 4 還是會觸發(fā)panic。
func main() {
    p, _ := ants.NewPool(2)
    defer p.Release()

    var wg sync.WaitGroup
    wg.Add(3)
    for i := 1; i <= 2; i++ {
        p.Submit(wrapper(i, &wg))
    }

    time.Sleep(1 * time.Second)
    p.Submit(wrapper(3, &wg))
    p.Submit(wrapper(5, &wg))
    wg.Wait()
}
  • 默認池
package main

import (
    "fmt"
    "github.com/panjf2000/ants/v2"
    "sync"
    "time"
)

func wrapper(i int, wg *sync.WaitGroup) func() {
    return func() {
        fmt.Printf("hello from task:%d\n", i)
        time.Sleep(1 * time.Second)
        wg.Done()
    }
}

func main() {
    defer ants.Release()

    var wg sync.WaitGroup
    wg.Add(2)
    for i := 1; i <= 2; i++ {
        ants.Submit(wrapper(i, &wg))
    }
    wg.Wait()
}

參考

1、Go 每日一庫之 ants
2、Go 每日一庫之 ants(源碼賞析)

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容