Closing channel of unknown length

I can’t close the channel when there is no knowledge of its use. Length

package main import ( "fmt" "time" ) func gen(ch chan int) { var i int for { time.Sleep(time.Millisecond * 10) ch <- i i++ // when no more data (eg from db, or event stream) if i > 100 { break } } // hot to close it properly? close(ch) } func receiver(ch chan int) { for i := range ch { fmt.Println("received:", i) } } func main() { ch := make(chan int) for i := 0; i < 10; i++ { go gen(ch) } receiver(ch) } 

It gives me an error

 panic: send on closed channel goroutine 8 [running]: main.gen(0xc82001a0c0) /home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:12 +0x57 created by main.main /home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:35 +0xbd goroutine 1 [panicwait]: runtime.gopark(0x0, 0x0, 0x50b8e0, 0x9, 0x10, 0x1) /usr/lib/go/src/runtime/proc.go:185 +0x163 runtime.main() /usr/lib/go/src/runtime/proc.go:121 +0x2f4 runtime.goexit() /usr/lib/go/src/runtime/asm_amd64.s:1696 +0x1 goroutine 6 [sleep]: time.Sleep(0x989680) /usr/lib/go/src/runtime/time.go:59 +0xf9 main.gen(0xc82001a0c0) /home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:11 +0x29 created by main.main /home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:33 +0x79 goroutine 7 [sleep]: time.Sleep(0x989680) /usr/lib/go/src/runtime/time.go:59 +0xf9 main.gen(0xc82001a0c0) /home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:11 +0x29 created by main.main /home/exu/src/github.com/exu/go-workshops/100-concurrency-channels/16-close-problem.go:34 +0x9b exit status 2 

This is logical - the first goroutine closure channel when the second tries to send it. What would be the best approach to closing the channel in this situation?

+2
source share
2 answers

Once the channel is closed, you cannot send additional values, otherwise it will panic. This is what you experience.

This is because you run multiple goroutines that use the same channel and send values ​​to it. And you close the channel in each of them. And since they are not synchronized, as soon as the first goroutine reaches the point where it closes it, others can (and they will) continue to send values ​​to it: panic!

You can close the channel only once (trying to close the already closed channel also causes panic). And you must do this when all the mountains that send values ​​on it are executed. To do this, you need to determine when all senders are sent. An idiomatic way of detecting this is to use sync.WaitGroup .

For each initial goroutine sender, we add 1 to WaitGroup using WaitGroup.Add() . And every goroutine that is done sending values ​​can signal this by calling WaitGroup.Done() . This is best done as a delayed statement, so if your goroutin stops abruptly (for example, panic), WaitGroup.Done() will still be called and will not leave the other gyrotings hanging (waiting for the remission of sin - the call is “absent” WaitGroup.Done() it will never come ...).

And WaitGroup.Wait() will wait for the completion of all the goroutines of the sender, and only after that and only once will it close the channel. We want to detect this "global" event and close the channel when processing the values ​​sent to it, so we must do this in our own goroutine.

The goroutine receiver will work until the channel is closed, since we used the for ... range construct on the channel. And since it starts in the main goroutine, the program will not exit until all values ​​are correctly accepted and processed from the channel. The for ... range construct ends until all values ​​are received that were sent before the channel was closed.

Please note that the solution below works with the buffered and unbuffered channel also without changes (try using a buffer channel with ch := make(chan int, 100) ).

The correct solution (try on the Go Playground ):

 func gen(ch chan int, wg *sync.WaitGroup) { defer wg.Done() var i int for { time.Sleep(time.Millisecond * 10) ch <- i i++ // when no more data (eg from db, or event stream) if i > 100 { break } } } func receiver(ch chan int) { for i := range ch { fmt.Println("received:", i) } } func main() { ch := make(chan int) wg := &sync.WaitGroup{} for i := 0; i < 10; i++ { wg.Add(1) go gen(ch, wg) } go func() { wg.Wait() close(ch) }() receiver(ch) } 

Note:

Note that it is important that receiver(ch) executed in the main goroutine and the code waiting for WaitGroup , and closes the channel in its own (non-main) goroutine; and not vice versa. If you switch these 2, it can lead to an “early termination”, that is, not all values ​​can be received and processed from the channel. The reason for this is that the Go program ends when the main goroutine ends (spec: Program Execution ). This does not wait for the completion of other (not main) goroutines. Therefore, if waiting and closing the channel will be in the main goroutine, after the channel is closed, the program can exit at any time, without waiting for another goroutine, which in this case will quote in order to receive values ​​from the channel.

+11
source

to use a selection other than the range.

 for{ i,ok:=<-ch //process with i if !ok { break } } 
0
source

All Articles