Go并发模式之pipeline(管道)
pipeline pipeline 是你可以用来在系统中形成抽象的另一种工具。特别是当程序需要流式处理 或批处理数据时,它是一个非常强大的工具。 pipeline 只不过是一系列将数据输入, 执行操作并将结果数据传回的系统。 将这些操作称为 pipeline 的一个 stage.
通过使用pipeline, 可以分离每个stage的关注点, 这提供了很多好处。如可以可以相互独立地修改各个stage, 还可以混合搭配stage(而无需修改stage);还可以 将每个stage 同时处理到上游 或下游 stage; 还可以扇出(或限制)pipeline
func main() {
//给 数组每个值 乘上 一个 常数 返回
multiply := func(values []int, multiplier int) []int {
multipliedValues := make([]int , len(values))
for i, v := range values{
multipliedValues[i] = v * multiplier
}
return multipliedValues
}
add := func(values []int , additive int) []int {
addedValues := make([]int, len(values))
for i, v := range values{
addedValues[i] = v + additive
}
return addedValues
}
ints := []int{1,2,3,4}
for _, v := range add(multiply(ints, 2), 1){
fmt.Println(v)
}
}
//3
//5
//7
//9
具备哪些 属性 是stage? pipeline stage 属性是什么? 1。 消耗 并 返回相同的 类型 2。 他能 很方便的用语言来表达,可以方便地 被传递(函数可以 很方便被传递); pipeline stage 事实上与函数式编程密切相关 pipeline stage 有趣特性之一 : 在不改变stage本身的情况下,将我们的stage 结合到更高层次 变得 非常容易。例子如下: ints := []int{1, 2, 3, 4} for _, v := range multiply(add(multiply(ints, 2), 1), 2) { fmt.Println(v) } 分析上面这些 例子: 这些操作 有 "批处理操作的特点" 可认为就是 批处理 操作; 还有另一种类型的pipeline stage 执行 流操作,特点是 stage 一次只接收和处理一个元素。
批处理和流处理 优点 和 缺点 批处理的特点/缺点: 任何时候(数组/片)的内存占用量 都是我们发送pipeline 开始处 片 的 大小的两倍。 将stage 转换为 流处理 代码如下:
func main(){
multiply := func(value, multiplier int) int {
return value * multiplier
}
add := func(value, additive int) int{
return value + additive
}
ints :=[]int{1, 2, 3, 4}
for _, v := range ints{
fmt.Println(multiply(add(multiply(v, 2), 1), 2))
}
}
优点是: 内存占用 只回落到只有 pipeline输入 的大小 以上写法的缺点: 让 range 语句为我们的pipeline 进行繁重的提升,这也限制了我们的扩展能力。 每次迭代 都三次函数调用,这也很不爽
构建pipeline的最佳模式/ 最佳实践
func main(){
generator := func(done <-chan interface{}, integers ...int) <-chan int {
intStream := make(chan int)
go func() {
defer close(intStream)
for _, i := range integers{
select {
case <-done :
return
case intStream <- i:
}
}
}()
return intStream
}
multiply := func(done <-chan interface{}, intStream <-chan int, multiplier int) <- chan int {
multipliedStream := make(chan int)
go func() {
defer close(multipliedStream)
for i := range intStream{
select{
case <-done :
return
case multipliedStream <- i*multiplier:
}
}
}()
return multipliedStream
}
add := func(done <-chan interface{}, intStream<- chan int, additive int)<-chan int{
addedStream := make(chan int)
go func(){
defer close(addedStream)
for i := range intStream {
select {
case <-done:
return
case addedStream <- i+ additive:
}
}
}()
return addedStream
}
done := make(chan interface{})
defer close(done)
intStream := generator(done , 1, 2, 3, 4)
pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)
for v := range pipeline{
fmt.Println(v)
}
}
//6
//10
//14
//18
分析: generator 函数将一组离散值转换为一个channel 上的数据流。在使用流水线时, 会经常看到这个 "生成器" 的存在 一些 便利的 生成器:
一些 便利的 生成器:
*/
var repeat = func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for{
for _, v := range values{
select {
case <-done:
return
case valueStream <- v :
}
}
}
}()
return valueStream
}
//从流中取出 1-- num 的值
var take = func(done <- chan interface{}, valueStream <-chan interface{}, num int,) <- chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
for i:=0; i< num; i++{
select{
case <- done:
return
case takeStream <- <-valueStream :
}
}
}()
return takeStream
}
//重复调用函数的生成器
var repeatFn = func(done<- chan interface{}, fn func() interface{}) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for{
select {
case <- done:
return
case valueStream<- fn():
}
}
}()
return valueStream
}
func main(){
//repeat 移到外部
//take 移到外部
//repeatFn 重复调用函数的生成器
done := make(chan interface{})
defer close(done)
for num := range take(done, repeat(done , 1,2, 3), 10){
fmt.Printf("%v ", num)
}
rand := func() interface{} { return rand.Int()}
for num := range take(done, repeatFn(done, rand),10){
fmt.Println(num)
}
}
/**
当需要处理特定的类型时, 你可以放置一个执行类型断言的 stage。 这个stage 的开销可以忽略不计。
当需要处理特定的类型时, 你可以放置一个执行类型断言的 stage。 这个stage 的开销可以忽略不计。
var toString = func(done <-chan interface{}, valueStream <- chan interface{})<-chan string{
stringStream := make(chan string)
go func(){
defer close(stringStream)
for v := range valueStream{
select {
case <- done :
return
case stringStream <- v.(string) :
}
}
}()
return stringStream
}
非类型相关的测试:
pipeline_test.go
//非类型 相关的 测试
func BenchmarkGeneric(b *testing.B){
done := make(chan interface{})
defer close(done)
b.ResetTimer()
for range toString(done, take(done, repeat(done, "a"), b.N)){
}
}
//BenchmarkGeneric-8 1000000 2494 ns/op
//类型相关的测试
func BenchmarkTyped(b *testing.B){
repeat := func(done <-chan interface{}, values ...string)<-chan string {
valueStream := make(chan string)
go func() {
defer close(valueStream)
for {
for _, v := range values{
select{
case <- done:
return
case valueStream <- v:
}
}
}
}()
return valueStream
}
take := func(done <-chan interface{}, valueStream <-chan string, num int) <-chan string {
takeStream := make(chan string)
go func() {
defer close(takeStream)
for i := num; i>0 || i == -1;{
if i != -1{
i--
}
select{
case <-done :
return
case takeStream <- <-valueStream:
}
}
}()
return takeStream
}
done := make(chan interface{})
defer close(done)
b.ResetTimer()
for range take(done, repeat(done, "a"), b.N){
}
}
//BenchmarkTyped-8 1000000 1265 ns/op
分析: 类型相关的stage 运行速度 是 非类型相关的 两倍。 一般来说,pipeline 上限制因素将是你的生成器,或者是计算密集型的一个 stage
扇入, 扇出 有时候,pipeline 中的某些个 stage 可能计算上特别耗时, 这可能会拖慢 整个pipeline的速度, 如何解决这个问题? 可以重新调整 stage的顺序,还可以重复使用 某个 stage, 开启多个goroutine 来并行执行 某些个stage. 扇出: 启动多个goroutine 来处理 来自 pipeline 的输入。 扇入: 将多个结果组合到一个channel的过程。 什么情况下: 适合 扇出,扇入 这种模式? 1 他不依赖于之前 stage 计算的值 2 运行需要很长时间
举个例子: 计算素数比较耗时,不使用扇出扇入 代码如下:
var toInt = func(done <-chan interface{}, valueStream <-chan interface{}) <-chan int{
intStream := make(chan int)
go func(){
defer close(intStream)
for v := range valueStream{
select {
case <-done :
return
case intStream <- v.(int) :
}
}
}()
return intStream
}
var primeFinder = func(done <-chan interface{}, intStream <-chan int) <-chan interface{} {
primeStream := make(chan interface{})
go func() {
defer close(primeStream)
for integer := range intStream{
integer -= 1
prime := true
for divisor := integer -1 ; divisor > 1 ; divisor--{
if integer%divisor == 0{
prime = false
break
}
}
if prime{
select {
case <-done :
return
case primeStream <- integer:
}
}
}
}()
return primeStream
}
func main(){
rand := func() interface{}{ return rand.Intn(50000000)}
done := make(chan interface{})
defer close(done)
start := time.Now()
randIntStream := toInt(done, repeatFn(done, rand))
fmt.Println("Primes:")
for prime := range take(done, primeFinder(done, randIntStream) , 10){
fmt.Printf("\t%d\n", prime)
}
fmt.Printf("Search took:%v", time.Since(start))
}
//Primes:
//24941317
//36122539
//6410693
//10128161
//25511527
//2107939
//14004383
//7190363
//45931967
//2393161
//Search took:29.760974984s
下面是加入 扇出, 扇入 改造提升后的代码如下:
下面是加入 扇出, 扇入 改造提升后的代码如下:
*/
var fanIn = func(done<-chan interface{}, channels ...<-chan interface{}) <-chan interface{} {
var wg sync.WaitGroup
multiplexedStream := make(chan interface{})
multiplex := func(c<-chan interface{}) {
defer wg.Done()
for i := range c {
select {
case <- done :
return
case multiplexedStream <-i:
}
}
}
// select from all the channels
wg.Add(len(channels))
for _, c := range channels{
go multiplex(c)
}
//Wait for all reads to complete
go func() {
wg.Wait()
close(multiplexedStream)
}()
return multiplexedStream
}
func main(){
done := make(chan interface{})
defer close(done)
start := time.Now()
rand := func() interface{} { return rand.Intn(50000000)}
randIntStream := toInt(done, repeatFn(done, rand))
numFinders := runtime.NumCPU()
fmt.Printf("Spinning up %d prime finders.\n", numFinders)
finders := make([]<-chan interface{}, numFinders) // chan 组成的数组
fmt.Println("Primes:")
for i:=0; i< numFinders; i++{
finders[i] = primeFinder(done, randIntStream)
}
for prime := range take(done, fanIn(done, finders...), 10){
fmt.Printf("\t%d\n", prime)
}
fmt.Printf("Search took:%v", time.Since(start))
}
//
//Spinning up 8 prime finders.
//Primes:
//6410693
//24941317
//10128161
//36122539
//25511527
//2107939
//14004383
//7190363
//2393161
//45931967
//Search took:9.450233287s
注意: 上面的例子(扇出 扇入) 不保证 读取-输出项目 的顺序!!! 分析 : 扇出 + 扇入 模式 如下图: