Skip to main content

concurrency pattern

前言

现在我们已经非常熟练的掌握了goroutine channel sync同步的原理,现在我们就要利用这些基础来帮助我们更好的编写并发程序

并发模式

1. for select循环模式

 for { //for无限循环 或者 for range循环
select{
//通过一个channel控制
}
}

举例,如监控狗无限循环

for{
select <- done:
return
default:
// 执行具体的任务
}

有限循环:

for _, s:= range []int{}{
select{
case <- done:
return
case resultCh <- s:

}
}

2. select timeout 模式

package main

import (
"fmt"
"time"
)

func main() {
result:=make(chan string)
go func() {
//模拟网络请求
time.Sleep(8*time.Second)
result <- "服务端结果"
}()

select {
case v:=<- result:
fmt.Println(v)
case <-time.After(time.Second*5):
fmt.Println("网络访问超时")
}
}

如果能使用Context的func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) 则优先使用Context.WithTimeout

3. pipline模式

工序之间相互依赖,上个工序做完才能做下一个

整个生产线可能有成百上千道工序,每道工序都是只做自己的事情,最终会经过一步步工序的组装,最后组装成一件产品

从技术上看,每道工序的输出就是下道工序的输入,工序之间传递的东西就是数据,这种模式则称为流水线模式,而传递的数据则称为数据流

从以上的流水线示意图我们可以看出,从最开始的生产到工序1234至成品,这就是一条比较形象的流水线,也就是pipline,下面以手机组装威力,来模拟流水线的生产

假设组装手机有以下3道工序,分别是配件的采购 配件的组装和 打包成品,如下图所示:

通过这个示意图我们可以看到,我们采购的配件是通过channel的方式传递给工序2进行组装,然后又通过channel的方式传递给工序3打包,它们之间都是通过通道来传递数据的。

相对于工序2来说,工序1是生产者,工序3是消费者;相对工序1来说,工序2是消费者;相对于工序3来说,工序2是生产者,下面用代码来演示

package main

import "fmt"

func main() {
coms := buy(10) //采购10套配件
phones := assemble(coms) //组打包10部手机
packs := pack(phones) //打包他们以便售卖

for p := range packs {
fmt.Println(p)
}
}

//工序1:采购
func buy(n int) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for i := 0; i < n; i++ {
out <- fmt.Sprintf("配件%d", i)
}
}()
return out
}

//工序2:组装
func assemble(in <-chan string) <-chan string {
out := make(chan string)

go func() {
defer close(out)
for c := range in {
out <- "组装(" + c + ")"
}
}()

return out
}

//工序3:打包
func pack(in <-chan string) <-chan string {
out := make(chan string)

go func() {
defer close(out)
for c := range in {
out <- "打包(" + c + ")"
}
}()
return out
}

通过上面的流水线模式示例,我们可以总结以下:

4. 扇入和扇出模式

经过一段时间的运转,组织者发现产能提不上去

通过调查发现:工序2过慢,导致上游工序1配件采购速度不得不降下来,下游工序3没太多事情做,不得不闲下来,这就是产能降低的原因

为了提高手机产能,组织者决定对工序2增加2班人手

工序1采购的配件会被工序2的3班人手同时进行组装,这3班人手组装好的手机也会同时传递给merge这个组件进行汇聚,然后再传递给工序3打包成品

在这个流程中会产生2种模式,也就是扇入和扇出

package main

import (
"fmt"
"sync"
)

func main() {
coms := buy(100) //采购100套配件
//3班人同时组装100部手机
phones1 := assemble(coms)
phones2 := assemble(coms)
phones3 := assemble(coms)
//汇聚3个channel成1个
phones := merge(phones1,phones2,phones3) //打包他们以便售卖

packs:= pack(phones)
for p := range packs {
fmt.Println(p)
}
}

//工序1:采购
func buy(n int) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for i := 0; i < n; i++ {
out <- fmt.Sprintf("配件%d", i)
}
}()
return out
}

//工序2:组装
func assemble(in <-chan string) <-chan string {
out := make(chan string)

go func() {
defer close(out)
for c := range in {
out <- "组装(" + c + ")"
}
}()

return out
}

//工序3:打包
func pack(in <-chan string) <-chan string {
out := make(chan string)

go func() {
defer close(out)
for c := range in {
out <- "打包(" + c + ")"
}
}()
return out
}

//扇入函数(组件),把多个channel中的数据发送到一个channel中
func merge(ins ...<- chan string) <-chan string {
var wg sync.WaitGroup
out:=make(chan string)

//把一个channel中的数据发送到out中
p:= func(in <- chan string) {
defer wg.Done()
for c:=range in {
out<- c
}
}

wg.Add(len(ins))

//扇入,需要启动多个goroutine用于处理多个channel中的数据
for _, cs := range ins{
go p(cs)
}

//等待所有输入的数据ins处理完,再关闭输出out
go func() {
wg.Wait()
close(out)
}()

return out
}

运行:

打包(组装(配件2))
打包(组装(配件3))
打包(组装(配件1))
打包(组装(配件5))
打包(组装(配件0))
打包(组装(配件4))
打包(组装(配件7))
打包(组装(配件6))
打包(组装(配件9))
打包(组装(配件8))
打包(组装(配件11))
打包(组装(配件10))
打包(组装(配件13))
打包(组装(配件12))
打包(组装(配件14))
打包(组装(配件15))
打包(组装(配件16))
打包(组装(配件18))
打包(组装(配件17))
打包(组装(配件20))
打包(组装(配件19))
打包(组装(配件22))
打包(组装(配件21))
打包(组装(配件23))
打包(组装(配件24))
打包(组装(配件25))
打包(组装(配件26))
打包(组装(配件28))
打包(组装(配件27))
打包(组装(配件30))
打包(组装(配件29))
打包(组装(配件32))
打包(组装(配件31))
打包(组装(配件34))
打包(组装(配件33))
打包(组装(配件36))
打包(组装(配件35))
打包(组装(配件38))
打包(组装(配件37))
打包(组装(配件40))
打包(组装(配件39))
打包(组装(配件42))
打包(组装(配件41))
打包(组装(配件43))
打包(组装(配件45))
打包(组装(配件44))
打包(组装(配件47))
打包(组装(配件46))
打包(组装(配件49))
打包(组装(配件48))
打包(组装(配件51))
打包(组装(配件50))
打包(组装(配件53))
打包(组装(配件55))
打包(组装(配件54))
打包(组装(配件52))
打包(组装(配件57))
打包(组装(配件56))
打包(组装(配件59))
打包(组装(配件58))
打包(组装(配件60))
打包(组装(配件62))
打包(组装(配件61))
打包(组装(配件64))
打包(组装(配件63))
打包(组装(配件66))
打包(组装(配件65))
打包(组装(配件68))
打包(组装(配件67))
打包(组装(配件70))
打包(组装(配件69))
打包(组装(配件72))
打包(组装(配件71))
打包(组装(配件73))
打包(组装(配件75))
打包(组装(配件74))
打包(组装(配件77))
打包(组装(配件76))
打包(组装(配件79))
打包(组装(配件78))
打包(组装(配件81))
打包(组装(配件80))
打包(组装(配件83))
打包(组装(配件82))
打包(组装(配件85))
打包(组装(配件84))
打包(组装(配件87))
打包(组装(配件86))
打包(组装(配件89))
打包(组装(配件88))
打包(组装(配件91))
打包(组装(配件90))
打包(组装(配件93))
打包(组装(配件92))
打包(组装(配件95))
打包(组装(配件94))
打包(组装(配件97))
打包(组装(配件96))
打包(组装(配件99))
打包(组装(配件98))

5. future模式

在实际需求中,有大量的任务之间相互独立、没有依赖,所以为了提高性能,这些独立的任务就可以并发的执行

package main

import (
"fmt"
"time"
)

func main() {
vegetableCh := washVegetables()
waterCh := boilWater()

fmt.Println("已经安排洗菜和烧水了 我先眯一会...")
time.Sleep(2*time.Second)

fmt.Println("要做火锅了,看看菜和水好了吗")
vegetables := <- vegetableCh
water := <- waterCh

fmt.Println("准备好了,可以做火锅了:", vegetables,water)
}

//洗菜
func washVegetables() <-chan string {
vegetables:=make(chan string)
go func() {
time.Sleep(time.Second*5)
vegetables<-"洗好的菜"
}()
return vegetables
}


//烧水
func boilWater() <-chan string {
water:=make(chan string)
go func() {
time.Sleep(time.Second*5)
water<-"烧好的水"
}()
return water
}

如果一个大任务可以拆分成一个个独立的小任务,并且可以通过小任务的结果得出大任务的结果,那么就可以使用future模式