Golang 通道,同步等待組 并發(fā)爬蟲

Golang:通道,同步等待組 并發(fā)爬蟲

在Go的并發(fā)編程中有一句很經(jīng)典的話:不要以共享內(nèi)存的方式去通信,而要以通信的方式去共享內(nèi)存。

在Go語言中并不鼓勵用鎖保護(hù)共享狀態(tài)的方式在不同的Goroutine中分享信息(以共享內(nèi)存的方式去通信)。而是鼓勵通過channel將共享狀態(tài)或共享狀態(tài)的變化在各個Goroutine之間傳遞(以通信的方式去共享內(nèi)存),這樣同樣能像用鎖一樣保證在同一的時間只有一個Goroutine訪問共享狀態(tài)。

當(dāng)然,在主流的編程語言中為了保證多線程之間共享數(shù)據(jù)安全性和一致性,都會提供一套基本的同步工具集,如鎖,條件變量,原子操作等等。Go語言標(biāo)準(zhǔn)庫也毫不意外的提供了這些同步機(jī)制,使用方式也和其他語言也差不多。

image

WaitGroup

WaitGroup,同步等待組。

在類型上,它是一個結(jié)構(gòu)體。一個WaitGroup的用途是等待一個goroutine的集合執(zhí)行完成。主goroutine調(diào)用了Add()方法來設(shè)置要等待的goroutine的數(shù)量。然后,每個goroutine都會執(zhí)行并且執(zhí)行完成后調(diào)用Done()這個方法。與此同時,可以使用Wait()方法來阻塞,直到所有的goroutine都執(zhí)行完成。

Add()方法

Add這個方法,用來設(shè)置到WaitGroup的計數(shù)器的值。我們可以理解為每個waitgroup中都有一個計數(shù)器 用來表示這個同步等待組中要執(zhí)行的goroutin的數(shù)量。

如果計數(shù)器的數(shù)值變?yōu)?,那么就表示等待時被阻塞的goroutine都被釋放,如果計數(shù)器的數(shù)值為負(fù)數(shù),那么就會引發(fā)恐慌,程序就報錯了。

Done()方法

Done()方法,就是當(dāng)WaitGroup同步等待組中的某個goroutine執(zhí)行完畢后,設(shè)置這個WaitGroup的counter數(shù)值減1。

Wait()方法

Wait()方法,表示讓當(dāng)前的goroutine等待,進(jìn)入阻塞狀態(tài)。一直到WaitGroup的計數(shù)器為零。才能解除阻塞, 這個goroutine才能繼續(xù)執(zhí)行。

示例代碼


package main

import (
    "fmt"
    "sync"
)
var wg sync.WaitGroup // 創(chuàng)建同步等待組對象
func main()  {
    /*
    WaitGroup:同步等待組
        可以使用Add(),設(shè)置等待組中要 執(zhí)行的子goroutine的數(shù)量,
        
        在main 函數(shù)中,使用wait(),讓主程序處于等待狀態(tài)。直到等待組中子程序執(zhí)行完畢。解除阻塞

        子gorotuine對應(yīng)的函數(shù)中。wg.Done(),用于讓等待組中的子程序的數(shù)量減1
     */
    //設(shè)置等待組中,要執(zhí)行的goroutine的數(shù)量
    wg.Add(2)
    go fun1()
    go fun2()
    fmt.Println("main進(jìn)入阻塞狀態(tài)。。。等待wg中的子goroutine結(jié)束。。")
    wg.Wait() //表示main goroutine進(jìn)入等待,意味著阻塞
    fmt.Println("main,解除阻塞。。")

}
func fun1()  {
    for i:=1;i<=10;i++{
        fmt.Println("fun1.。。i:",i)
    }
    wg.Done() //給wg等待中的執(zhí)行的goroutine數(shù)量減1.同Add(-1)
}
func fun2()  {
    defer wg.Done()
    for j:=1;j<=10;j++{
        fmt.Println("\tfun2..j,",j)
    }
}

channel通道

通道可以被認(rèn)為是Goroutines通信的管道。類似于管道中的水從一端到另一端的流動,數(shù)據(jù)可以從一端發(fā)送到另一端,通過通道接收。

在前面講Go語言的并發(fā)時候,我們就說過,當(dāng)多個Goroutine想實現(xiàn)共享數(shù)據(jù)的時候,雖然也提供了傳統(tǒng)的同步機(jī)制,但是Go語言強(qiáng)烈建議的是使用Channel通道來實現(xiàn)Goroutines之間的通信。

“不要通過共享內(nèi)存來通信,而應(yīng)該通過通信來共享內(nèi)存” 這是一句風(fēng)靡golang社區(qū)的經(jīng)典語

接收和發(fā)送

一個通道發(fā)送和接收數(shù)據(jù),默認(rèn)是阻塞的。當(dāng)一個數(shù)據(jù)被發(fā)送到通道時,在發(fā)送語句中被阻塞,直到另一個Goroutine從該通道讀取數(shù)據(jù)。相對地,當(dāng)從通道讀取數(shù)據(jù)時,讀取被阻塞,直到一個Goroutine將數(shù)據(jù)寫入該通道。

示例代碼:以下代碼加入了睡眠,可以更好的理解channel的阻塞

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan int)
    done := make(chan bool) // 通道
    go func() {
        fmt.Println("子goroutine執(zhí)行。。。")
        time.Sleep(3 * time.Second)
        data := <-ch1 // 從通道中讀取數(shù)據(jù)
        fmt.Println("data:", data)
        done <- true
    }()
    // 向通道中寫數(shù)據(jù)。。
    time.Sleep(5 * time.Second)
    ch1 <- 100

    <-done
    fmt.Println("main。。over")

}

在上面的程序中,我們先創(chuàng)建了一個chan bool通道。然后啟動了一條子Goroutine,并循環(huán)打印10個數(shù)字。然后我們向通道ch1中寫入輸入true。
然后在主goroutine中,我們從ch1中讀取數(shù)據(jù)。這一行代碼是阻塞的,這意味著在子Goroutine將數(shù)據(jù)寫入到該通道之前,主goroutine將不會執(zhí)行到下一行代碼。

因此,我們可以通過channel實現(xiàn)子goroutine和主goroutine之間的通信。當(dāng)子goroutine執(zhí)行完畢前,主goroutine會因為讀取ch1中的數(shù)據(jù)而阻塞。從而保證了子goroutine會先執(zhí)行完畢。這就消除了對時間的需求。

在之前的程序中,我們要么讓主goroutine進(jìn)入睡眠,以防止主要的Goroutine退出。要么通過WaitGroup來保證子goroutine先執(zhí)行完畢,主goroutine才結(jié)束。

死鎖

使用通道時要考慮的一個重要因素是死鎖。如果Goroutine在一個通道上發(fā)送數(shù)據(jù),那么預(yù)計其他的Goroutine應(yīng)該接收數(shù)據(jù)。如果這種情況不發(fā)生,那么程序?qū)⒃谶\(yùn)行時出現(xiàn)死鎖。

類似地,如果Goroutine正在等待從通道接收數(shù)據(jù),那么另一些Goroutine將會在該通道上寫入數(shù)據(jù),否則程序?qū)梨i。

示例代碼
package main

func main() {  
    ch := make(chan int)
    ch <- 5
}
報錯:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
    /Users/ruby/go/src/l_goroutine/demo08_chan.go:5 +0x50

Goroutine

Goroutine 是實際并發(fā)執(zhí)行的實體,它底層是使用協(xié)程(coroutine)實現(xiàn)并發(fā),coroutine是一種運(yùn)行在用戶態(tài)的用戶線程,類似于 greenthread,go底層選擇使用coroutine的出發(fā)點(diǎn)是因為,它具有以下特點(diǎn):

用戶空間 避免了內(nèi)核態(tài)和用戶態(tài)的切換導(dǎo)致的成本
可以由語言和框架層進(jìn)行調(diào)度
更小的??臻g允許創(chuàng)建大量的實例

Goroutine 調(diào)度器

Go并發(fā)調(diào)度: G-P-M模型

在操作系統(tǒng)提供的內(nèi)核線程之上,Go搭建了一個特有的兩級線程模型。goroutine機(jī)制實現(xiàn)了M : N的線程模型,goroutine機(jī)制是協(xié)程(coroutine)的一種實現(xiàn),golang內(nèi)置的調(diào)度器,可以讓多核CPU中每個CPU執(zhí)行一個協(xié)程。

以上內(nèi)容來自 https://github.com/rubyhan1314/Golang-100-Days
主要說明一下同步等待組和通道的基本使用,以及 go 是如何處理并發(fā)的,更多可以繼續(xù)參考以上,來自千峰的 go 教程。

實戰(zhàn)爬蟲

前面說了這么多只不過是為這個腳本做鋪墊,要不然則來的太唐突。
我這里寫了一個爬蟲腳本,用到了通道來做并發(fā),并有同步等待組做 awit() 操作

直接來看代碼

獲取html
func HttpGet(url string) (result string, err error) {
    resp, err1 := http.Get(url)
    if err != nil {
        err = err1
        return
    }
    defer resp.Body.Close()
    //讀取網(wǎng)頁的body內(nèi)容
    buf := make([]byte, 4*1024)
    for true {
        n, err := resp.Body.Read(buf)
        if err != nil {
            if err == io.EOF{
                break
            }else {
                fmt.Println("resp.Body.Read err = ", err)
                break
            }
        }
        result += string(buf[:n])
    }
    return
}
爬取網(wǎng)頁存為 .html 文件
func spiderPage(url string) string {

    fmt.Println("正在爬取", url)
    //爬,將所有的網(wǎng)頁內(nèi)容爬取下來
    result, err := HttpGet(url)
    if err != nil {
        fmt.Println(err)
    }
    //把內(nèi)容寫入到文件
    filename := strconv.Itoa(rand.Int()) + ".html"
    f, err1 := os.Create(filename)
    if err1 != nil{
        fmt.Println(err1)
    }
    //寫內(nèi)容
    f.WriteString(result)
    //關(guān)閉文件
    f.Close()
    return url + " 抓取成功"

}

爬取方法方面就寫完了,接下來就到了重要的部分了

定義一個工作者函數(shù)
func doWork(start, end int,wg *sync.WaitGroup) {
    fmt.Printf("正在爬取第%d頁到%d頁\n", start, end)
    //因為很有可能爬蟲還沒有結(jié)束下面的循環(huán)就已經(jīng)結(jié)束了,所以這里就需要且到通道
    page := make(chan string,100)
    results := make(chan string,100)


    go sendResult(results,start,end)

    go func() {

        for i := 0; i <= 20; i++ {
            wg.Add(1)
            go asyn_worker(page, results, wg)
        }
    }()

    for i := start; i <= end; i++ {
            url := "https://tieba.baidu.com/f?kw=%E7%BB%9D%E5%9C%B0%E6%B1%82%E7%94%9F&ie=utf-8&pn=" + strconv.Itoa((i-1)*50)
            page <- url
            println("加入" + url + "到page")
        }
        println("關(guān)閉通道")
        close(page)

    wg.Wait()
    //time.Sleep(time.Second * 5)
    println(" Main 退出 。。。。。")
}
從通道取出數(shù)據(jù)
func asyn_worker(page chan string, results chan string,wg *sync.WaitGroup){

    defer wg.Done()  //defer wg.Done()必須放在go并發(fā)函數(shù)內(nèi)

    for{
        v, ok := <- page //顯示的調(diào)用close方法關(guān)閉通道。
        if !ok{
            fmt.Println("已經(jīng)讀取了所有的數(shù)據(jù),", ok)
            break
        }
        //fmt.Println("取出數(shù)據(jù):",v, ok)
        results <- spiderPage(v)
    }


    //for n := range page {
    //  results <- spiderPage(n)
    //}
}
發(fā)送抓取結(jié)果
func sendResult(results chan string,start,end int)  {

    //for i := start; i <= end; i++ {
    //  fmt.Println(<-results)
    //}

    // 發(fā)送抓取結(jié)果
    for{
        v, ok := <- results
        if !ok{
            fmt.Println("已經(jīng)讀取了所有的數(shù)據(jù),", ok)
            break
        }
        fmt.Println(v)

    }
}

大體思路是這樣的:

可以看到我定義了兩個通道,一個是用來存入 url 的,另一個是用來存入爬取結(jié)果的,緩沖空間是 100
在方法 doWork 中, sendResult 會阻塞等待 results 通道的輸出,匿名函數(shù)則是等待 page 通道的輸出

緊接著下面就是把 200 個 url 寫入 page 通道,匿名函數(shù)得到 page 的輸出就會執(zhí)行 asyn_worker 函數(shù),也就是爬取 html 的函數(shù)了(將其存入results 通道)

然后 sendResult 函數(shù)得到 results 通道的輸出,將結(jié)果打印出來

可以看到 我在匿名函數(shù)中并發(fā)了 20 個 goroution,并且啟用了同步等待組作為參數(shù)傳入,理論上可以根據(jù)機(jī)器的性能來定義 并發(fā)數(shù)

main函數(shù)

func main() {
    start_time := time.Now().UnixNano()

    var wg sync.WaitGroup

    doWork(1,200, &wg)
    //輸出執(zhí)行時間,單位為毫秒。
    fmt.Printf("執(zhí)行時間: %ds",(time.Now().UnixNano() - start_time) / 1000)

}

運(yùn)行爬蟲并計算運(yùn)行時間,這個時間因機(jī)器而異,但應(yīng)該不會相差太多

完整代碼

package main

import (
    "fmt"
    "io"
    "sync"
    "math/rand"
    "net/http"
    "os"
    "strconv"
    "time"
)



func HttpGet(url string) (result string, err error) {
    resp, err1 := http.Get(url)
    if err != nil {
        err = err1
        return
    }
    defer resp.Body.Close()
    //讀取網(wǎng)頁的body內(nèi)容
    buf := make([]byte, 4*1024)
    for true {
        n, err := resp.Body.Read(buf)
        if err != nil {
            if err == io.EOF{
                break
            }else {
                fmt.Println("resp.Body.Read err = ", err)
                break
            }
        }
        result += string(buf[:n])
    }
    return
}


//爬取網(wǎng)頁
func spiderPage(url string) string {

    fmt.Println("正在爬取", url)
    //爬,將所有的網(wǎng)頁內(nèi)容爬取下來
    result, err := HttpGet(url)
    if err != nil {
        fmt.Println(err)
    }
    //把內(nèi)容寫入到文件
    filename := strconv.Itoa(rand.Int()) + ".html"
    f, err1 := os.Create(filename)
    if err1 != nil{
        fmt.Println(err1)
    }
    //寫內(nèi)容
    f.WriteString(result)
    //關(guān)閉文件
    f.Close()
    return url + " 抓取成功"

}

func asyn_worker(page chan string, results chan string,wg *sync.WaitGroup){

    defer wg.Done()  //defer wg.Done()必須放在go并發(fā)函數(shù)內(nèi)

    for{
        v, ok := <- page //顯示的調(diào)用close方法關(guān)閉通道。
        if !ok{
            fmt.Println("已經(jīng)讀取了所有的數(shù)據(jù),", ok)
            break
        }
        //fmt.Println("取出數(shù)據(jù):",v, ok)
        results <- spiderPage(v)
    }

    //for n := range page {
    //  results <- spiderPage(n)
    //}
}

func doWork(start, end int,wg *sync.WaitGroup) {
    fmt.Printf("正在爬取第%d頁到%d頁\n", start, end)
    //因為很有可能爬蟲還沒有結(jié)束下面的循環(huán)就已經(jīng)結(jié)束了,所以這里就需要且到通道
    page := make(chan string,100)
    results := make(chan string,100)


    go sendResult(results,start,end)

    go func() {

        for i := 0; i <= 20; i++ {
            wg.Add(1)
            go asyn_worker(page, results, wg)
        }
    }()


    for i := start; i <= end; i++ {
            url := "https://tieba.baidu.com/f?kw=%E7%BB%9D%E5%9C%B0%E6%B1%82%E7%94%9F&ie=utf-8&pn=" + strconv.Itoa((i-1)*50)
            page <- url
            println("加入" + url + "到page")
        }
        println("關(guān)閉通道")
        close(page)

    wg.Wait()
    //time.Sleep(time.Second * 5)
    println(" Main 退出 。。。。。")
}


func sendResult(results chan string,start,end int)  {

    //for i := start; i <= end; i++ {
    //  fmt.Println(<-results)
    //}

    // 發(fā)送抓取結(jié)果
    for{
        v, ok := <- results
        if !ok{
            fmt.Println("已經(jīng)讀取了所有的數(shù)據(jù),", ok)
            break
        }
        fmt.Println(v)

    }
}

func main() {
    start_time := time.Now().UnixNano()

    var wg sync.WaitGroup

    doWork(1,200, &wg)
    //輸出執(zhí)行時間,單位為毫秒。
    fmt.Printf("執(zhí)行時間: %ds",(time.Now().UnixNano() - start_time) / 1000)

}

總體來說,這個腳本就是為了弄清楚 Go 語言的并發(fā)原理 以及 通道,同步等待組的基本使用,或者只用 go 語言的鎖,目的都是為了防止 臨界資源的安全問題。

有了 channel 和 goroutine 之后,Go 的并發(fā)編程變得異常容易和安全,得以讓程序員把注意力留到業(yè)務(wù)上去,實現(xiàn)開發(fā)效率的提升。

歡迎轉(zhuǎn)載,但要聲明出處,不然我順著網(wǎng)線過去就是一拳。
個人技術(shù)博客:http://www.gzky.live

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容