concurrency pattern
前言
现在我们已经非常熟练的掌握了goroutine channel sync同步的原理,现在我们就要利用这些基础来帮助我们更好的编写并发程序
并发模式
1. for select循环模式
for { //for无限循环 或者 for range循环
select{
//通过一个channel控制
}
}
举例,如监控狗无限循环
for{
select <- done:
return
default:
// 执行具体的任务
}
有限循环:
for _, s:= range []int{}{
select{
case <- done:
return
case resultCh <- s:
}
}
2. select timeout 模式
package main
import (
"fmt"
"time"
)
func main() {
result:=make(chan string)
go func() {
//模拟网络请求
time.Sleep(8*time.Second)
result <- "服务端结果"
}()
select {
case v:=<- result:
fmt.Println(v)
case <-time.After(time.Second*5):
fmt.Println("网络访问超时")
}
}
如果能使用Context的func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
则优先使用Context.WithTimeout
3. pipline模式
工序之间相互依赖,上个工序做完才能做下一个
整个生产线可能有成百上千道工序,每道工序都是只做自己的事情,最终会经过一步步工序的组装,最后组装成一件产品
从技术上看,每道工序的输出就是下道工序的输入,工序之间传递的东西就是数据,这种模式则称为流水线模式,而传递的数据则称为数据流
从以上的流水线示意图我们可以看出,从最开始的生产到工序1234至成品,这就是一条比较形象的流水线,也就是pipline,下面以手机组装威力,来模拟流水线的生产
假设组装手机有以下3道工序,分别是配件的采购 配件的组装和 打包成品,如下图所示:
通过这个示意图我们可以看到,我们采购的配件是通过channel的方式传递给工序2进行组装,然后又通过channel的方式传递给工序3打包,它们之间都是通过通道来传递数据的。
相对于工序2来说,工序1是生产者,工序3是消费者;相对工序1来说,工序2是消费者;相对于工序3来说,工序2是生产者,下面用代码来演示
package main
import "fmt"
func main() {
coms := buy(10) //采购10套配件
phones := assemble(coms) //组打包10部手机
packs := pack(phones) //打包他们以便售卖
for p := range packs {
fmt.Println(p)
}
}
//工序1:采购
func buy(n int) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for i := 0; i < n; i++ {
out <- fmt.Sprintf("配件%d", i)
}
}()
return out
}
//工序2:组装
func assemble(in <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for c := range in {
out <- "组装(" + c + ")"
}
}()
return out
}
//工序3:打包
func pack(in <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for c := range in {
out <- "打包(" + c + ")"
}
}()
return out
}
通过上面的流水线模式示例,我们可以总结以下:
4. 扇入和扇出模式
经过一段时间的运转,组织者发现产能提不上去
通过调查发现:工序2过慢,导致上游工序1配件采购速度不得不降下来,下游工序3没太多事情做,不得不闲下来,这就是产能降低的原因
为了提高手机产能,组织者决定对工序2增加2班人手
工序1采购的配件会被工序2的3班人手同时进行组装,这3班人手组装好的手机也会同时传递给merge这个组件进行汇聚,然后再传递给工序3打包成品
在这个流程中会产生2种模式,也就是扇入和扇出
package main
import (
"fmt"
"sync"
)
func main() {
coms := buy(100) //采购100套配件
//3班人同时组装100部手机
phones1 := assemble(coms)
phones2 := assemble(coms)
phones3 := assemble(coms)
//汇聚3个channel成1个
phones := merge(phones1,phones2,phones3) //打包他们以便售卖
packs:= pack(phones)
for p := range packs {
fmt.Println(p)
}
}
//工序1:采购
func buy(n int) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for i := 0; i < n; i++ {
out <- fmt.Sprintf("配件%d", i)
}
}()
return out
}
//工序2:组装
func assemble(in <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for c := range in {
out <- "组装(" + c + ")"
}
}()
return out
}
//工序3:打包
func pack(in <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for c := range in {
out <- "打包(" + c + ")"
}
}()
return out
}
//扇入函数(组件),把多个channel中的数据发送到一个channel中
func merge(ins ...<- chan string) <-chan string {
var wg sync.WaitGroup
out:=make(chan string)
//把一个channel中的数据发送到out中
p:= func(in <- chan string) {
defer wg.Done()
for c:=range in {
out<- c
}
}
wg.Add(len(ins))
//扇入,需要启动多个goroutine用于处理多个channel中的数据
for _, cs := range ins{
go p(cs)
}
//等待所有输入的数据ins处理完,再关闭输出out
go func() {
wg.Wait()
close(out)
}()
return out
}
运行:
打包(组装(配件2))
打包(组装(配件3))
打包(组装(配件1))
打包(组装(配件5))
打包(组装(配件0))
打包(组装(配件4))
打包(组装(配件7))
打包(组装(配件6))
打包(组装(配件9))
打包(组装(配件8))
打包(组装(配件11))
打包(组装(配件10))
打包(组装(配件13))
打包(组装(配件12))
打包(组装(配件14))
打包(组装(配件15))
打包(组装(配件16))
打包(组装(配件18))
打包(组装(配件17))
打包(组装(配件20))
打包(组装(配件19))
打包(组装(配件22))
打包(组装(配件21))
打包(组装(配件23))
打包(组装(配件24))
打包(组装(配件25))
打包(组装(配件26))
打包(组装(配件28))
打包(组装(配件27))
打包(组装(配件30))
打包(组装(配件29))
打包(组装(配件32))
打包(组装(配件31))
打包(组装(配件34))
打包(组装(配件33))
打包(组装(配件36))
打包(组装(配件35))
打包(组装(配件38))
打包(组装(配件37))
打包(组装(配件40))
打包(组装(配件39))
打包(组装(配件42))
打包(组装(配件41))
打包(组装(配件43))
打包(组装(配件45))
打包(组装(配件44))
打包(组装(配件47))
打包(组装(配件46))
打包(组装(配件49))
打包(组装(配件48))
打包(组装(配件51))
打包(组装(配件50))
打包(组装(配件53))
打包(组装(配件55))
打包(组装(配件54))
打包(组装(配件52))
打包(组装(配件57))
打包(组装(配件56))
打包(组装(配件59))
打包(组装(配件58))
打包(组装(配件60))
打包(组装(配件62))
打包(组装(配件61))
打包(组装(配件64))
打包(组装(配件63))
打包(组装(配件66))
打包(组装(配件65))
打包(组装(配件68))
打包(组装(配件67))
打包(组装(配件70))
打包(组装(配件69))
打包(组装(配件72))
打包(组装(配件71))
打包(组装(配件73))
打包(组装(配件75))
打包(组装(配件74))
打包(组装(配件77))
打包(组装(配件76))
打包(组装(配件79))
打包(组装(配件78))
打包(组装(配件81))
打包(组装(配件80))
打包(组装(配件83))
打包(组装(配件82))
打包(组装(配件85))
打包(组装(配件84))
打包(组装(配件87))
打包(组装(配件86))
打包(组装(配件89))
打包(组装(配件88))
打包(组装(配件91))
打包(组装(配件90))
打包(组装(配件93))
打包(组装(配件92))
打包(组装(配件95))
打包(组装(配件94))
打包(组装(配件97))
打包(组装(配件96))
打包(组装(配件99))
打包(组装(配件98))
5. future模式
在实际需求中,有大量的任务之间相互独立、没有依赖,所以为了提高性能,这些独立的任务就可以并发的执行
package main
import (
"fmt"
"time"
)
func main() {
vegetableCh := washVegetables()
waterCh := boilWater()
fmt.Println("已经安排洗菜和烧水了 我先眯一会...")
time.Sleep(2*time.Second)
fmt.Println("要做火锅了,看看菜和水好了吗")
vegetables := <- vegetableCh
water := <- waterCh
fmt.Println("准备好了,可以做火锅了:", vegetables,water)
}
//洗菜
func washVegetables() <-chan string {
vegetables:=make(chan string)
go func() {
time.Sleep(time.Second*5)
vegetables<-"洗好的菜"
}()
return vegetables
}
//烧水
func boilWater() <-chan string {
water:=make(chan string)
go func() {
time.Sleep(time.Second*5)
water<-"烧好的水"
}()
return water
}
如果一个大任务可以拆分成一个个独立的小任务,并且可以通过小任务的结果得出大任务的结果,那么就可以使用future模式