Golang:通道,同步等待組 并發(fā)爬蟲
在Go的并發(fā)編程中有一句很經(jīng)典的話:不要以共享內(nèi)存的方式去通信,而要以通信的方式去共享內(nèi)存。
在Go語言中并不鼓勵用鎖保護(hù)共享狀態(tài)的方式在不同的Goroutine中分享信息(以共享內(nèi)存的方式去通信)。而是鼓勵通過channel將共享狀態(tài)或共享狀態(tài)的變化在各個Goroutine之間傳遞(以通信的方式去共享內(nèi)存),這樣同樣能像用鎖一樣保證在同一的時間只有一個Goroutine訪問共享狀態(tài)。
當(dāng)然,在主流的編程語言中為了保證多線程之間共享數(shù)據(jù)安全性和一致性,都會提供一套基本的同步工具集,如鎖,條件變量,原子操作等等。Go語言標(biāo)準(zhǔn)庫也毫不意外的提供了這些同步機(jī)制,使用方式也和其他語言也差不多。

WaitGroup
WaitGroup,同步等待組。
在類型上,它是一個結(jié)構(gòu)體。一個WaitGroup的用途是等待一個goroutine的集合執(zhí)行完成。主goroutine調(diào)用了Add()方法來設(shè)置要等待的goroutine的數(shù)量。然后,每個goroutine都會執(zhí)行并且執(zhí)行完成后調(diào)用Done()這個方法。與此同時,可以使用Wait()方法來阻塞,直到所有的goroutine都執(zhí)行完成。
Add()方法
Add這個方法,用來設(shè)置到WaitGroup的計數(shù)器的值。我們可以理解為每個waitgroup中都有一個計數(shù)器 用來表示這個同步等待組中要執(zhí)行的goroutin的數(shù)量。
如果計數(shù)器的數(shù)值變?yōu)?,那么就表示等待時被阻塞的goroutine都被釋放,如果計數(shù)器的數(shù)值為負(fù)數(shù),那么就會引發(fā)恐慌,程序就報錯了。
Done()方法
Done()方法,就是當(dāng)WaitGroup同步等待組中的某個goroutine執(zhí)行完畢后,設(shè)置這個WaitGroup的counter數(shù)值減1。
Wait()方法
Wait()方法,表示讓當(dāng)前的goroutine等待,進(jìn)入阻塞狀態(tài)。一直到WaitGroup的計數(shù)器為零。才能解除阻塞, 這個goroutine才能繼續(xù)執(zhí)行。
示例代碼
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup // 創(chuàng)建同步等待組對象
func main() {
/*
WaitGroup:同步等待組
可以使用Add(),設(shè)置等待組中要 執(zhí)行的子goroutine的數(shù)量,
在main 函數(shù)中,使用wait(),讓主程序處于等待狀態(tài)。直到等待組中子程序執(zhí)行完畢。解除阻塞
子gorotuine對應(yīng)的函數(shù)中。wg.Done(),用于讓等待組中的子程序的數(shù)量減1
*/
//設(shè)置等待組中,要執(zhí)行的goroutine的數(shù)量
wg.Add(2)
go fun1()
go fun2()
fmt.Println("main進(jìn)入阻塞狀態(tài)。。。等待wg中的子goroutine結(jié)束。。")
wg.Wait() //表示main goroutine進(jìn)入等待,意味著阻塞
fmt.Println("main,解除阻塞。。")
}
func fun1() {
for i:=1;i<=10;i++{
fmt.Println("fun1.。。i:",i)
}
wg.Done() //給wg等待中的執(zhí)行的goroutine數(shù)量減1.同Add(-1)
}
func fun2() {
defer wg.Done()
for j:=1;j<=10;j++{
fmt.Println("\tfun2..j,",j)
}
}
channel通道
通道可以被認(rèn)為是Goroutines通信的管道。類似于管道中的水從一端到另一端的流動,數(shù)據(jù)可以從一端發(fā)送到另一端,通過通道接收。
在前面講Go語言的并發(fā)時候,我們就說過,當(dāng)多個Goroutine想實現(xiàn)共享數(shù)據(jù)的時候,雖然也提供了傳統(tǒng)的同步機(jī)制,但是Go語言強(qiáng)烈建議的是使用Channel通道來實現(xiàn)Goroutines之間的通信。
“不要通過共享內(nèi)存來通信,而應(yīng)該通過通信來共享內(nèi)存” 這是一句風(fēng)靡golang社區(qū)的經(jīng)典語
接收和發(fā)送
一個通道發(fā)送和接收數(shù)據(jù),默認(rèn)是阻塞的。當(dāng)一個數(shù)據(jù)被發(fā)送到通道時,在發(fā)送語句中被阻塞,直到另一個Goroutine從該通道讀取數(shù)據(jù)。相對地,當(dāng)從通道讀取數(shù)據(jù)時,讀取被阻塞,直到一個Goroutine將數(shù)據(jù)寫入該通道。
示例代碼:以下代碼加入了睡眠,可以更好的理解channel的阻塞
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan int)
done := make(chan bool) // 通道
go func() {
fmt.Println("子goroutine執(zhí)行。。。")
time.Sleep(3 * time.Second)
data := <-ch1 // 從通道中讀取數(shù)據(jù)
fmt.Println("data:", data)
done <- true
}()
// 向通道中寫數(shù)據(jù)。。
time.Sleep(5 * time.Second)
ch1 <- 100
<-done
fmt.Println("main。。over")
}
在上面的程序中,我們先創(chuàng)建了一個chan bool通道。然后啟動了一條子Goroutine,并循環(huán)打印10個數(shù)字。然后我們向通道ch1中寫入輸入true。
然后在主goroutine中,我們從ch1中讀取數(shù)據(jù)。這一行代碼是阻塞的,這意味著在子Goroutine將數(shù)據(jù)寫入到該通道之前,主goroutine將不會執(zhí)行到下一行代碼。
因此,我們可以通過channel實現(xiàn)子goroutine和主goroutine之間的通信。當(dāng)子goroutine執(zhí)行完畢前,主goroutine會因為讀取ch1中的數(shù)據(jù)而阻塞。從而保證了子goroutine會先執(zhí)行完畢。這就消除了對時間的需求。
在之前的程序中,我們要么讓主goroutine進(jìn)入睡眠,以防止主要的Goroutine退出。要么通過WaitGroup來保證子goroutine先執(zhí)行完畢,主goroutine才結(jié)束。
死鎖
使用通道時要考慮的一個重要因素是死鎖。如果Goroutine在一個通道上發(fā)送數(shù)據(jù),那么預(yù)計其他的Goroutine應(yīng)該接收數(shù)據(jù)。如果這種情況不發(fā)生,那么程序?qū)⒃谶\(yùn)行時出現(xiàn)死鎖。
類似地,如果Goroutine正在等待從通道接收數(shù)據(jù),那么另一些Goroutine將會在該通道上寫入數(shù)據(jù),否則程序?qū)梨i。
示例代碼
package main
func main() {
ch := make(chan int)
ch <- 5
}
報錯:
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
/Users/ruby/go/src/l_goroutine/demo08_chan.go:5 +0x50
Goroutine
Goroutine 是實際并發(fā)執(zhí)行的實體,它底層是使用協(xié)程(coroutine)實現(xiàn)并發(fā),coroutine是一種運(yùn)行在用戶態(tài)的用戶線程,類似于 greenthread,go底層選擇使用coroutine的出發(fā)點(diǎn)是因為,它具有以下特點(diǎn):
用戶空間 避免了內(nèi)核態(tài)和用戶態(tài)的切換導(dǎo)致的成本
可以由語言和框架層進(jìn)行調(diào)度
更小的??臻g允許創(chuàng)建大量的實例
Goroutine 調(diào)度器
Go并發(fā)調(diào)度: G-P-M模型
在操作系統(tǒng)提供的內(nèi)核線程之上,Go搭建了一個特有的兩級線程模型。goroutine機(jī)制實現(xiàn)了M : N的線程模型,goroutine機(jī)制是協(xié)程(coroutine)的一種實現(xiàn),golang內(nèi)置的調(diào)度器,可以讓多核CPU中每個CPU執(zhí)行一個協(xié)程。
以上內(nèi)容來自 https://github.com/rubyhan1314/Golang-100-Days
主要說明一下同步等待組和通道的基本使用,以及 go 是如何處理并發(fā)的,更多可以繼續(xù)參考以上,來自千峰的 go 教程。
實戰(zhàn)爬蟲
前面說了這么多只不過是為這個腳本做鋪墊,要不然則來的太唐突。
我這里寫了一個爬蟲腳本,用到了通道來做并發(fā),并有同步等待組做 awit() 操作
直接來看代碼
獲取html
func HttpGet(url string) (result string, err error) {
resp, err1 := http.Get(url)
if err != nil {
err = err1
return
}
defer resp.Body.Close()
//讀取網(wǎng)頁的body內(nèi)容
buf := make([]byte, 4*1024)
for true {
n, err := resp.Body.Read(buf)
if err != nil {
if err == io.EOF{
break
}else {
fmt.Println("resp.Body.Read err = ", err)
break
}
}
result += string(buf[:n])
}
return
}
爬取網(wǎng)頁存為 .html 文件
func spiderPage(url string) string {
fmt.Println("正在爬取", url)
//爬,將所有的網(wǎng)頁內(nèi)容爬取下來
result, err := HttpGet(url)
if err != nil {
fmt.Println(err)
}
//把內(nèi)容寫入到文件
filename := strconv.Itoa(rand.Int()) + ".html"
f, err1 := os.Create(filename)
if err1 != nil{
fmt.Println(err1)
}
//寫內(nèi)容
f.WriteString(result)
//關(guān)閉文件
f.Close()
return url + " 抓取成功"
}
爬取方法方面就寫完了,接下來就到了重要的部分了
定義一個工作者函數(shù)
func doWork(start, end int,wg *sync.WaitGroup) {
fmt.Printf("正在爬取第%d頁到%d頁\n", start, end)
//因為很有可能爬蟲還沒有結(jié)束下面的循環(huán)就已經(jīng)結(jié)束了,所以這里就需要且到通道
page := make(chan string,100)
results := make(chan string,100)
go sendResult(results,start,end)
go func() {
for i := 0; i <= 20; i++ {
wg.Add(1)
go asyn_worker(page, results, wg)
}
}()
for i := start; i <= end; i++ {
url := "https://tieba.baidu.com/f?kw=%E7%BB%9D%E5%9C%B0%E6%B1%82%E7%94%9F&ie=utf-8&pn=" + strconv.Itoa((i-1)*50)
page <- url
println("加入" + url + "到page")
}
println("關(guān)閉通道")
close(page)
wg.Wait()
//time.Sleep(time.Second * 5)
println(" Main 退出 。。。。。")
}
從通道取出數(shù)據(jù)
func asyn_worker(page chan string, results chan string,wg *sync.WaitGroup){
defer wg.Done() //defer wg.Done()必須放在go并發(fā)函數(shù)內(nèi)
for{
v, ok := <- page //顯示的調(diào)用close方法關(guān)閉通道。
if !ok{
fmt.Println("已經(jīng)讀取了所有的數(shù)據(jù),", ok)
break
}
//fmt.Println("取出數(shù)據(jù):",v, ok)
results <- spiderPage(v)
}
//for n := range page {
// results <- spiderPage(n)
//}
}
發(fā)送抓取結(jié)果
func sendResult(results chan string,start,end int) {
//for i := start; i <= end; i++ {
// fmt.Println(<-results)
//}
// 發(fā)送抓取結(jié)果
for{
v, ok := <- results
if !ok{
fmt.Println("已經(jīng)讀取了所有的數(shù)據(jù),", ok)
break
}
fmt.Println(v)
}
}
大體思路是這樣的:
可以看到我定義了兩個通道,一個是用來存入 url 的,另一個是用來存入爬取結(jié)果的,緩沖空間是 100
在方法 doWork 中, sendResult 會阻塞等待 results 通道的輸出,匿名函數(shù)則是等待 page 通道的輸出
緊接著下面就是把 200 個 url 寫入 page 通道,匿名函數(shù)得到 page 的輸出就會執(zhí)行 asyn_worker 函數(shù),也就是爬取 html 的函數(shù)了(將其存入results 通道)
然后 sendResult 函數(shù)得到 results 通道的輸出,將結(jié)果打印出來
可以看到 我在匿名函數(shù)中并發(fā)了 20 個 goroution,并且啟用了同步等待組作為參數(shù)傳入,理論上可以根據(jù)機(jī)器的性能來定義 并發(fā)數(shù)
main函數(shù)
func main() {
start_time := time.Now().UnixNano()
var wg sync.WaitGroup
doWork(1,200, &wg)
//輸出執(zhí)行時間,單位為毫秒。
fmt.Printf("執(zhí)行時間: %ds",(time.Now().UnixNano() - start_time) / 1000)
}
運(yùn)行爬蟲并計算運(yùn)行時間,這個時間因機(jī)器而異,但應(yīng)該不會相差太多
完整代碼
package main
import (
"fmt"
"io"
"sync"
"math/rand"
"net/http"
"os"
"strconv"
"time"
)
func HttpGet(url string) (result string, err error) {
resp, err1 := http.Get(url)
if err != nil {
err = err1
return
}
defer resp.Body.Close()
//讀取網(wǎng)頁的body內(nèi)容
buf := make([]byte, 4*1024)
for true {
n, err := resp.Body.Read(buf)
if err != nil {
if err == io.EOF{
break
}else {
fmt.Println("resp.Body.Read err = ", err)
break
}
}
result += string(buf[:n])
}
return
}
//爬取網(wǎng)頁
func spiderPage(url string) string {
fmt.Println("正在爬取", url)
//爬,將所有的網(wǎng)頁內(nèi)容爬取下來
result, err := HttpGet(url)
if err != nil {
fmt.Println(err)
}
//把內(nèi)容寫入到文件
filename := strconv.Itoa(rand.Int()) + ".html"
f, err1 := os.Create(filename)
if err1 != nil{
fmt.Println(err1)
}
//寫內(nèi)容
f.WriteString(result)
//關(guān)閉文件
f.Close()
return url + " 抓取成功"
}
func asyn_worker(page chan string, results chan string,wg *sync.WaitGroup){
defer wg.Done() //defer wg.Done()必須放在go并發(fā)函數(shù)內(nèi)
for{
v, ok := <- page //顯示的調(diào)用close方法關(guān)閉通道。
if !ok{
fmt.Println("已經(jīng)讀取了所有的數(shù)據(jù),", ok)
break
}
//fmt.Println("取出數(shù)據(jù):",v, ok)
results <- spiderPage(v)
}
//for n := range page {
// results <- spiderPage(n)
//}
}
func doWork(start, end int,wg *sync.WaitGroup) {
fmt.Printf("正在爬取第%d頁到%d頁\n", start, end)
//因為很有可能爬蟲還沒有結(jié)束下面的循環(huán)就已經(jīng)結(jié)束了,所以這里就需要且到通道
page := make(chan string,100)
results := make(chan string,100)
go sendResult(results,start,end)
go func() {
for i := 0; i <= 20; i++ {
wg.Add(1)
go asyn_worker(page, results, wg)
}
}()
for i := start; i <= end; i++ {
url := "https://tieba.baidu.com/f?kw=%E7%BB%9D%E5%9C%B0%E6%B1%82%E7%94%9F&ie=utf-8&pn=" + strconv.Itoa((i-1)*50)
page <- url
println("加入" + url + "到page")
}
println("關(guān)閉通道")
close(page)
wg.Wait()
//time.Sleep(time.Second * 5)
println(" Main 退出 。。。。。")
}
func sendResult(results chan string,start,end int) {
//for i := start; i <= end; i++ {
// fmt.Println(<-results)
//}
// 發(fā)送抓取結(jié)果
for{
v, ok := <- results
if !ok{
fmt.Println("已經(jīng)讀取了所有的數(shù)據(jù),", ok)
break
}
fmt.Println(v)
}
}
func main() {
start_time := time.Now().UnixNano()
var wg sync.WaitGroup
doWork(1,200, &wg)
//輸出執(zhí)行時間,單位為毫秒。
fmt.Printf("執(zhí)行時間: %ds",(time.Now().UnixNano() - start_time) / 1000)
}
總體來說,這個腳本就是為了弄清楚 Go 語言的并發(fā)原理 以及 通道,同步等待組的基本使用,或者只用 go 語言的鎖,目的都是為了防止 臨界資源的安全問題。
有了 channel 和 goroutine 之后,Go 的并發(fā)編程變得異常容易和安全,得以讓程序員把注意力留到業(yè)務(wù)上去,實現(xiàn)開發(fā)效率的提升。
歡迎轉(zhuǎn)載,但要聲明出處,不然我順著網(wǎng)線過去就是一拳。
個人技術(shù)博客:http://www.gzky.live