2013-06-05 20 views
11

Bu yüzden, bir tüketici ve birçok üreticiyi Go - klasik fanIn işlevini Concurrency in Go numaralı konuşmadan gerçekleştirmenin birçok yolunu gördüm.Go: Bir üretici, birçok tüketici

Ne istemek bir fanOut işlevidir. Bir değeri okuduğu bir kanal olarak alır ve bu değerin kopyalarını yazan bir kanal dilimini döndürür.

Bunu uygulamak için doğru/önerilen bir yol var mı?

cevap

13

Bunu yapmanın en iyi yolunu hemen hemen tarif ettiniz, ancak burada küçük bir kod örneği var.

git oyun alanı: Giriş kanalı tükendiğinde biz çıkış kanallarını kapatmak nasıl https://play.golang.org/p/jwdtDXVHJk

package main 

import (
    "fmt" 
    "time" 
) 

func producer(iters int) <-chan int { 
    c := make(chan int) 
    go func() { 
     for i := 0; i < iters; i++ { 
      c <- i 
      time.Sleep(1 * time.Second) 
     } 
     close(c) 
    }() 
    return c 
} 

func consumer(cin <-chan int) { 
    for i := range cin { 
     fmt.Println(i) 
    } 
} 

func fanOut(ch <-chan int, size, lag int) []chan int { 
    cs := make([]chan int, size) 
    for i, _ := range cs { 
     // The size of the channels buffer controls how far behind the recievers 
     // of the fanOut channels can lag the other channels. 
     cs[i] = make(chan int, lag) 
    } 
    go func() { 
     for i := range ch { 
      for _, c := range cs { 
       c <- i 
      } 
     } 
     for _, c := range cs { 
      // close all our fanOut channels when the input channel is exhausted. 
      close(c) 
     } 
    }() 
    return cs 
} 

func fanOutUnbuffered(ch <-chan int, size int) []chan int { 
    cs := make([]chan int, size) 
    for i, _ := range cs { 
     // The size of the channels buffer controls how far behind the recievers 
     // of the fanOut channels can lag the other channels. 
     cs[i] = make(chan int) 
    } 
    go func() { 
     for i := range ch { 
      for _, c := range cs { 
       c <- i 
      } 
     } 
     for _, c := range cs { 
      // close all our fanOut channels when the input channel is exhausted. 
      close(c) 
     } 
    }() 
    return cs 
} 

func main() { 
    c := producer(10) 
    chans := fanOutUnbuffered(c, 3) 
    go consumer(chans[0]) 
    go consumer(chans[1]) 
    consumer(chans[2]) 
} 

önemli bir parçası dikkat etmektir. Ayrıca, çıkış kanallarından biri gönderim sırasında bloke olursa, diğer çıkış kanallarındaki gönderimi de tutacaktır. Kanalların tampon boyutunu ayarlayarak gecikme miktarını kontrol ederiz.

+1

! Teşekkür ederim. Beni rahatsız eden kanalların kapanışıydı. Gelecekte buna ihtiyaç duyanlara bir teşekkür ve hızlı bir referans olarak, burada çalışan bir sürüm: http://play.golang.org/p/jwdtDXVHJk – Carl

2

aşağıda Bu çözüm biraz yapmacık, ama benim için çalışıyor: Mükemmel

package main 

import (
    "fmt" 
    "time" 
    "crypto/rand" 
    "encoding/binary" 
) 

func handleNewChannels(arrchangen chan [](chan uint32), 
         intchangen chan (chan uint32)) { 
    currarr := []chan uint32{} 
    arrchangen <- currarr 
    for { 
     newchan := <-intchangen 
     currarr = append(currarr, newchan) 
     arrchangen <- currarr 
    } 
} 

func sendToChannels(arrchangen chan [](chan uint32)) { 
    tick := time.Tick(1 * time.Second) 
    currarr := <-arrchangen 
    for { 
     select { 
     case <-tick: 
      sent := false 
      var n uint32 
      binary.Read(rand.Reader, binary.LittleEndian, &n) 
      for i := 0 ; i < len(currarr) ; i++ { 
       currarr[i] <- n 
       sent = true 
      } 
      if sent { 
       fmt.Println("Sent generated ", n) 
      } 
     case newarr := <-arrchangen: 
      currarr = newarr 
     } 
    } 
} 
func handleChannel(tchan chan uint32) { 
    for { 
     val := <-tchan 
     fmt.Println("Got the value ", val) 
    } 
} 

func createChannels(intchangen chan (chan uint32)) { 
    othertick := time.Tick(5 * time.Second) 
    for { 
     <-othertick 
     fmt.Println("Creating new channel! ") 
     newchan := make(chan uint32) 
     intchangen <- newchan 
     go handleChannel(newchan) 
    } 
} 

func main() { 
    arrchangen := make(chan [](chan uint32)) 
    intchangen := make(chan (chan uint32)) 
    go handleNewChannels(arrchangen, intchangen) 
    go sendToChannels(arrchangen) 
    createChannels(intchangen) 
}