記一個(gè)簡(jiǎn)單的協(xié)程池
github地址
https://github.com/JeonYang/chanPool
包結(jié)構(gòu)
|channal
|dispatcher.go
|error.go
|job.go
|pool.go
|pool_test.go
|worker.go
dispatcher.go
package channal
import (
"errors"
)
// 協(xié)調(diào)器
type Dispatcher interface {
Start()
Stop()
AddJob(job Job) error
JobQueueLen() int
WorkerPool() chan Worker
}
// 調(diào)度員
type dispatcher struct {
workerPool chan Worker
jobQueue chan Job
stopSignal chan struct{}
stop bool
stoped bool
}
// 創(chuàng)建調(diào)度器
func NewDispatcher(workerPool chan Worker, jobQueue chan Job) Dispatcher {
return &dispatcher{workerPool: workerPool, jobQueue: jobQueue, stopSignal: make(chan struct{})}
}
// 分派工作給自由工人
func (dis *dispatcher) Start() {
dis.stoped = false
dis.stop = false
go func() {
for {
select {
// 監(jiān)聽調(diào)度器的工作通道
case job := <-dis.jobQueue:
worker := <-dis.workerPool
worker.AddJob(job)
// 監(jiān)聽調(diào)度器的停止信號(hào)
case <-dis.stopSignal:
for i := 0; i < len(dis.workerPool); i++ {
worker := <-dis.workerPool
worker.Stop()
}
dis.stopSignal <- struct{}{}
return
}
}
}()
}
func (dis *dispatcher) Stop() {
dis.stop = true
dis.stopSignal <- struct{}{}
<-dis.stopSignal
dis.stoped = true
}
func (dis *dispatcher) JobQueueLen() int {
return len(dis.jobQueue)
}
func (dis *dispatcher) WorkerPool() chan Worker {
return dis.workerPool
}
func (dis *dispatcher) AddJob(job Job) error {
if dis.stop {
errors.New(Stoped)
}
dis.jobQueue <- job
return nil
}
error.go
package channal
var Stoped = "STOPED"
job.go
package channal
// 工作
type Job func()
pool.go
package channal
import (
"sync"
"errors"
)
type Pool interface {
Start()
Stop()
AddJob(job Job) error
WaitForAll()
EnableWaitForAll(enable bool)
}
type pool struct {
dispatcher Dispatcher
wg sync.WaitGroup
enableWaitForAll bool // 啟用所有等待
workerNum int // 工人總數(shù)
jobNum int // 工作數(shù)
workerCount int // 正在工作工人的數(shù)量
stoped bool
stop bool
}
//workerNum 工人池中的工人數(shù)量
//
//jobNum job池中的job數(shù)量
func NewPool(workerNum, jobNum int) Pool {
workers := make(chan Worker, workerNum)
jobs := make(chan Job, jobNum)
return &pool{
dispatcher: NewDispatcher(workers, jobs),
enableWaitForAll: false,
workerNum: workerNum,
jobNum: jobNum,
}
}
// 添加一個(gè)job到j(luò)ob池中
func (p *pool) AddJob(job Job) error {
if p.stop {
return errors.New(Stoped)
}
if p.enableWaitForAll {
p.wg.Add(1)
}
err := p.dispatcher.AddJob(func() {
job()
if p.enableWaitForAll {
p.wg.Done()
}
})
if err != nil {
return err
}
if p.dispatcher.JobQueueLen() > 0 {
if p.workerCount < p.workerNum {
worker := NewWorker(p.dispatcher.WorkerPool())
worker.Start()
p.workerCount++
}
}
return nil
}
// 等待所有協(xié)程操作完成
func (p *pool) WaitForAll() {
if p.enableWaitForAll {
p.wg.Wait()
}
}
// 停止所有進(jìn)程
func (p *pool) Stop() {
p.stop = true
p.dispatcher.Stop()
p.stoped = true
p.workerCount = 0
}
// 是否允許等待所有
func (p *pool) EnableWaitForAll(enable bool) {
p.enableWaitForAll = enable
}
//Start worker pool and dispatch
func (p *pool) Start() {
p.dispatcher.Start()
p.stoped = false
p.stop = false
}
pool_test.go
package channal
import (
"testing"
"fmt"
"time"
)
func TestNewPool(t *testing.T) {
pool := NewPool(10, 10)
pool.Start()
pool.EnableWaitForAll(false)
pool.AddJob(job_.do)
pool.AddJob(job_.do)
pool.AddJob(job_.do)
pool.WaitForAll()
//time.Sleep(time.Second)
pool.Stop()
fmt.Println(" pool.AddJob(do)", pool.AddJob(do))
pool.Start()
time.Sleep(time.Minute)
}
func do() {
fmt.Println("=========")
}
var job_ job=job{"123","321"}
type job struct {
name string
val string
}
func (job job) do() {
fmt.Println("name=========",job.name)
fmt.Println("val=========",job.val)
}
worker.go
package channal
// 工人
type Worker interface {
Start()
Stop()
AddJob(job Job)
}
type worker struct {
workerPool chan Worker
jobQueue chan Job
stopSignal chan struct{}
stoped bool
stop bool
}
func NewWorker(workerPool chan Worker) *worker {
return &worker{
workerPool: workerPool,
jobQueue: make(chan Job),
stopSignal: make(chan struct{}),
}
}
// 工人開始工作
func (w *worker) Start() {
go func() {
for {
// 將本身注冊(cè)給相應(yīng)的工人池
w.workerPool <- w
select {
// 監(jiān)聽工人的工作通道
case job := <-w.jobQueue:
job()
//監(jiān)聽工人停止信號(hào)
case <-w.stopSignal:
w.stopSignal <- struct{}{}
return
}
}
}()
}
// 工人開始工作
func (w *worker) Stop() {
w.stopSignal <- struct{}{}
<-w.stopSignal
close(w.stopSignal)
close(w.jobQueue)
}
// 工人開始工作
func (w *worker) AddJob(job Job) {
w.jobQueue <- job
}