http客戶端并發(fā)訪問一個(gè)服務(wù)端
當(dāng)你要通過http請(qǐng)求群發(fā)100萬(wàn)郵件(發(fā)送郵件的服務(wù)器不需要你考慮資源消耗),越快發(fā)送完越好,很自然你會(huì)通過調(diào)節(jié)http.client的pool資源池的大小,從而單位時(shí)間內(nèi)發(fā)送更多的請(qǐng)求出去,這里便是值得注意的地方,你要向同一個(gè)目標(biāo)服務(wù)器發(fā)送100萬(wàn)請(qǐng)求,哪怕你把pool調(diào)大到500,想的是順時(shí)hi并發(fā)100個(gè)請(qǐng)求發(fā)送出去,但是其實(shí)只有2個(gè),因?yàn)閔ttp.client中MaxIdleConnsPerHost默認(rèn)值為2,它的作用是像同一個(gè)目標(biāo)機(jī)器發(fā)送請(qǐng)求的最大并發(fā)量。

image.png
以下實(shí)戰(zhàn)demo
package main
import (
"bytes"
"encoding/json"
"fmt"
"github.com/360EntSecGroup-Skylar/excelize/v2"
"github.com/go-resty/resty/v2"
"net/http"
"os"
"strings"
"sync"
"time"
)
var client = resty.New().SetDebug(false).SetHeader("Content-Type", "application/json").SetTimeout(10 * time.Second)
func init() {
t2 := http.DefaultTransport.(*http.Transport).Clone()
t2.MaxIdleConns = 100
t2.MaxConnsPerHost = 100
t2.MaxIdleConnsPerHost = 100 //無(wú)論你連接池多大,但是如果你是并發(fā)一個(gè)服務(wù)器,請(qǐng)調(diào)大次參數(shù),它是允許你客戶端向同一個(gè)服務(wù)端發(fā)送并發(fā)請(qǐng)求個(gè)數(shù)的控制參數(shù),連接池100,是針對(duì)100個(gè)不同服務(wù)器的并發(fā)個(gè)數(shù)控制
client.SetTransport(t2)
}
type Task struct {
textgoApi string
filePath string
sheets map[string][]int
result chan string
result2 chan string
resultPath string
resultPath2 string
}
func main() {
task := Task{
filePath: "",
sheets: map[string][]int{},
result: make(chan string, 100),
result2: make(chan string, 100),
textgoApi: "",
resultPath: "",
resultPath2: "",
}
fmt.Println("start run***")
task.run()
fmt.Println("start end***")
}
func (t *Task) run() {
f, err := excelize.OpenFile(t.filePath)
if err != nil {
fmt.Println(err, "run openfile發(fā)生了錯(cuò)誤")
return
}
go func() {
defer func() {
close(t.result)
close(t.result2)
}()
for key, val := range t.sheets {
fmt.Println(key, val)
t.handle(f, key, val)
}
}()
wg := &sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
n := 0
for res := range t.result2 {
n++
t.appendWrite(t.resultPath2, res)
}
t.appendWrite(t.resultPath2, fmt.Sprintf("總共沒違規(guī)合計(jì):%d", n))
}()
go func() {
defer wg.Done()
num := 0
for res := range t.result {
num++
t.appendWrite(t.resultPath, res)
}
t.appendWrite(t.resultPath, fmt.Sprintf("總共違規(guī)合計(jì):%d", num))
}()
wg.Wait()
}
func (t *Task) handle(file *excelize.File, sheet string, columns []int) {
rows, err := file.Rows(sheet)
if err != nil {
fmt.Println(err, "file.Rows(sheet)")
return
}
for rows.Next() {
data, err := rows.Columns()
if err != nil {
fmt.Println(err, "rows.Columns()")
continue
}
t.call(data, columns)
}
}
func (t *Task) call(data []string, columns []int) {
defer func() {
if err := recover(); err != nil {
fmt.Println(err, "recover---")
}
}()
for k, v := range data {
if t.exitsElem(columns, k+1) {
strs := strings.Split(v, ";")
isVio, s := t.isViolation(strs)
if isVio {
//違規(guī)
t.result <- fmt.Sprintf("違規(guī)類容:%s |--|--| 原始類容: %s", s, strings.Join(data, ""))
return
}
}
}
t.result2 <- fmt.Sprintf("沒有違規(guī)類容:%s |--|--| ", strings.Join(data, ""))
}
func (t *Task) isViolation(text []string) (b bool, s string) {
for _, v := range text {
if v == "" {
continue
}
resp := t.isVoi(v)
if resp {
s = v
b = true
return
}
}
return
}
type RequestParam struct {
Text string `json:"text"`
}
type Resp struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data struct {
OutputInfo []RespData `json:"output_info"`
} `json:"data"`
}
type RespData struct {
IsIllegal int `json:"is_illegal"`
IllegalType string `json:"illegal_type"`
Label int `json:"label"`
Score float64 `json:"score"`
}
//{
// "code": 0,
// "msg": "success",
// "data": {
// "output_info": [
// {
// "is_illegal": 1,
// "illegal_type": "涉政",
// "label": 3,
// "score": 0.99994314
// }
// ]
// }
//}
func (t *Task) isVoi(text string) (b bool) {
texts := []string{}
byteText := []rune(text)
by := bytes.Buffer{}
strText := ""
for _, v := range byteText {
by.WriteRune(v)
strText = by.String()
if len(strText) > 90 {
texts = append(texts, strText)
by = bytes.Buffer{}
}
}
texts = append(texts, strText)
for _, ttt := range texts {
b = t.post(ttt)
if b {
return
}
}
return
}
func (t *Task) post(ttt string) (b bool) {
data := Resp{}
for i := 0; i < 3; i++ {
reqParam := RequestParam{Text: ttt}
req, _ := json.Marshal(reqParam)
resp, err := client.R().SetBody(string(req)).Post(t.textgoApi)
if err != nil {
fmt.Println(err, "= client.R().SetBody(string(req)).Post(t.textgoApi)")
return
}
err = json.Unmarshal(resp.Body(), &data)
if err != nil {
return
}
if data.Code != 0 {
time.Sleep(time.Millisecond * 100)
fmt.Println("我是3次都失敗了的數(shù)據(jù)", data)
continue
}
if len(data.Data.OutputInfo) <= 0 {
return
}
if data.Data.OutputInfo[0].IsIllegal == 1 {
b = true
return
}
return
}
fmt.Println("我是3次都失敗了的數(shù)據(jù)", data, ttt)
return
}
func (t *Task) exitsElem(data []int, i int) (b bool) {
for _, v := range data {
if v == i {
return true
}
}
return
}
//追加寫
func (t *Task) appendWrite(path string, content string) (err error) {
f, err := os.OpenFile(path, os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
return
}
defer f.Close()
_, err = fmt.Fprintln(f, content)
return
}
// Pool Goroutine Pool
type Pool struct {
queue chan int
wg *sync.WaitGroup
}
// newPoll 新建一個(gè)協(xié)程池
func newPoll(size int) *Pool {
if size <= 0 {
size = 1
}
return &Pool{
queue: make(chan int, size),
wg: &sync.WaitGroup{},
}
}
// Add 新增一個(gè)執(zhí)行
func (p *Pool) Add(delta int) {
// delta為正數(shù)就添加
for i := 0; i < delta; i++ {
p.queue <- 1
}
// delta為負(fù)數(shù)就減少
for i := 0; i > delta; i-- {
<-p.queue
}
p.wg.Add(delta)
}
// Done 執(zhí)行完成減一
func (p *Pool) Done() {
<-p.queue
p.wg.Done()
}
// Wait 等待Goroutine執(zhí)行完畢
func (p *Pool) Wait() {
p.wg.Wait()
}
參考文獻(xiàn)
原來這樣使用 Go HTTP 客戶端才能獲取更高性能