on
Cancelling Multiple Goroutines
Cancelling Multiple Goroutines
When Go was first released, there was a way to do some things in concurrency. As time has gone on, various things have changed. The Context
package for one thing. :)
This article doesn’t go into all of the ways of doing concurrency but will focus on one problem and take you through a few different solutions so you can see how things have evolved.
The Problem
The problem I’d like to address here is being able to cancel multiple goroutines. There are many blog posts out there (I curate @CuratedGo, please follow) which show how to cancel just one goroutine, but my use-case was slightly more complicated. The rest of this article summarises my progress through getting this to work.
The way we’re going decide when to quit is by listening for a C-c
keypress. Of course at that point, we want to make sure we tidy up things nicely at that point. For example, if we’re currently streaming tweets from Twitter, we’d rather we told them we’re finished than just drop the connection.
Let’s get started.
A Main without Tidying up
package main
import (
"fmt"
"time"
)
func main() {
ticker := time.NewTicker(3 * time.Second)
for {
select {
case now := <-ticker.C:
fmt.Printf("tick %s\n", now.UTC().Format("20060102-150405.000000000"))
}
}
}
And let’s run it and C-c
it.
$ go run 01/tidy.go
tick 20170612-213112.045887655
tick 20170612-213115.045986150
tick 20170612-213118.045993591
^Csignal: interrupt
Here you can see we have sent the interrupt
signal. Make a mental note of that name. However, we haven’t actually
tidied up the timer. There are a few ways we could do it, and the easiest for this program is to defer ticker.Stop()
so it gets run at the end of main()
.
package main
import (
"fmt"
"time"
)
func main() {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
select {
case now := <-ticker.C:
fmt.Printf("tick %s\n", now.UTC().Format("20060102-150405.000000000"))
}
}
}
There is no discernable difference in the output, however you are being a good citizen. :)
$ go run 02/tidy.go
tick 20170612-213456.385205269
tick 20170612-213459.385180852
tick 20170612-213502.385222563
^Csignal: interrupt
We said earlier that we want to run multiple goroutines and we want to listen for C-c
, so let’s do the C-c
first.
Using the os/signal
package, we can tell Go to listen for (you guessed it)
OS Signals such as os.Interrupt
and os.Kill
. Let’s see what that looks like:
package main
import (
"fmt"
"os"
"os/signal"
"time"
)
func main() {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
for {
select {
case now := <-ticker.C:
fmt.Printf("tick %s\n", now.UTC().Format("20060102-150405.000000000"))
case <-c:
fmt.Println("Received C-c - shutting down")
return
}
}
}
And when we run it, instead of seeing the default message Go provides when it receives an interrupt signal, we can see our own message:
$ go run 03/tidy.go
tick 20170612-214602.313917282
tick 20170612-214605.313950300
tick 20170612-214608.313950904
^CReceived C-c - shutting down
Excellent, so let’s start moving the program closer to what we want - running multiple goroutines and stopping them cleanly
Signalling a Goroutine to Stop
Even though we only have one task at the moment, we will put it into it’s own goroutine and signal it to stop when we
have received the C-c
. I’m going to use the first half of a post called
“Stopping Goroutines” by the excellent Mat Ryer
as the basis for this process. Note when this post was written - 2015 - and be sure we’ll change a few things by the
time we’ve finished this article.
The next example shows the ticker in it’s own goroutine. Notice that instead of keeping the signal receiver in the for
select case <-c
we’ll just change it to <-c
since that’s the only thing we’re going to leave in main()
. I will
prefix the messages with either main
or tick
so you can see what’s going on.
package main
import (
"fmt"
"os"
"os/signal"
"time"
)
func main() {
// a channel to tell `tick()` to stop, and one to tell us they've stopped
stopChan := make(chan struct{})
stoppedChan := make(chan struct{})
go tick(stopChan, stoppedChan)
// listen for C-c
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
fmt.Println("main: received C-c - shutting down")
// tell the goroutine to stop
fmt.Println("main: telling goroutines to stop")
close(stopChan)
// and wait for them to reply back
<-stoppedChan
fmt.Println("main: goroutine has told us they've finished")
}
func tick(stop, stopped chan struct{}) {
// tell the caller we've stopped
defer close(stopped)
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
select {
case now := <-ticker.C:
fmt.Printf("tick: tick %s\n", now.UTC().Format("20060102-150405.000000000"))
case <-stop:
fmt.Println("tick: caller has told us to stop")
return
}
}
}
Once you press C-c
here, you can see the exchange of messages.
$ go run 04/tidy.go
tick: tick 20170612-220018.345218301
tick: tick 20170612-220021.345202622
tick: tick 20170612-220024.345147172
^Cmain: received C-c - shutting down
main: telling goroutines to stop
tick: caller has told us to stop
main: goroutine has told us they've finished
So far so good. It works.
But I can see one problem on the horizon. When we add another goroutine, we’ll have to create another stopped
channel
for the second goroutine to tell us when they’ve stopped. (Side-note: I originally also created a new stop
chan too,
but we can re-use that channel for both goroutines.)
Let’s see what the extra stopped
channel looks like. In this example our second goroutine tock()
is very similar to
the first, except it tocks
every 5s instead of ticks
every 3s.
package main
import (
"fmt"
"os"
"os/signal"
"time"
)
func main() {
// a channel to tell `tick()` and `tock()` to stop
stopChan := make(chan struct{})
// a channel for `tick()` to tell us they've stopped
tickStoppedChan := make(chan struct{})
go tick(stopChan, tickStoppedChan)
// a channel for `tock()` to tell us they've stopped
tockStoppedChan := make(chan struct{})
go tock(stopChan, tockStoppedChan)
// listen for C-c
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
fmt.Println("main: received C-c - shutting down")
// tell the goroutine to stop
fmt.Println("main: telling goroutines to stop")
close(stopChan)
// and wait for them to reply back
<-tickStoppedChan
<-tockStoppedChan
fmt.Println("main: all goroutines have told us they've finished")
}
func tick(stop, stopped chan struct{}) {
// tell the caller we've stopped
defer close(stopped)
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
select {
case now := <-ticker.C:
fmt.Printf("tick: tick %s\n", now.UTC().Format("20060102-150405.000000000"))
case <-stop:
fmt.Println("tick: caller has told us to stop")
return
}
}
}
func tock(stop, stopped chan struct{}) {
// tell the caller we've stopped
defer close(stopped)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case now := <-ticker.C:
fmt.Printf("tock: tock %s\n", now.UTC().Format("20060102-150405.000000000"))
case <-stop:
fmt.Println("tock: caller has told us to stop")
return
}
}
}
It’s starting to look unwieldy. However, let’s take a look at the output for completeness:
$ go run 05/tidy.go
tick: tick 20170612-220618.466725240
tock: tock 20170612-220620.466789888
tick: tick 20170612-220621.466756817
tick: tick 20170612-220624.466762771
^Cmain: received C-c - shutting down
main: telling goroutines to stop
tock: caller has told us to stop
tick: caller has told us to stop
main: all goroutines have told us they've finished
Even though it’s looking a bit nasty, it still works as it should.
sync.WaitGroup
Let’s try and tidy-up and simplify a bit here. The reason to do this is because if we’d like to add another goroutine to this program - or indeed another 10, 20 or a hundred - we’re going to have a headache with all the channels we need to create.
So instead of channels, let’s try another concurrency fundamental that Go provides, which is
sync.WaitGroup. Here we create just one WaitGroup
(instead of two channels)
and use that for the goroutines to signal they’ve finished. Remember, once we create the WaitGroup
we shouldn’t copy
it, so we need to pass it by reference.
package main
import (
"fmt"
"os"
"os/signal"
"sync"
"time"
)
func main() {
// a channel to tell `tick()` and `tock()` to stop
stopChan := make(chan struct{})
// a WaitGroup for the goroutines to tell us they've stopped
wg := sync.WaitGroup{}
// a channel for `tick()` to tell us they've stopped
wg.Add(1)
go tick(stopChan, &wg)
// a channel for `tock()` to tell us they've stopped
wg.Add(1)
go tock(stopChan, &wg)
// listen for C-c
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
fmt.Println("main: received C-c - shutting down")
// tell the goroutine to stop
fmt.Println("main: telling goroutines to stop")
close(stopChan)
// and wait for them both to reply back
wg.Wait()
fmt.Println("main: all goroutines have told us they've finished")
}
func tick(stop chan struct{}, wg *sync.WaitGroup) {
// tell the caller we've stopped
defer wg.Done()
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
select {
case now := <-ticker.C:
fmt.Printf("tick: tick %s\n", now.UTC().Format("20060102-150405.000000000"))
case <-stop:
fmt.Println("tick: caller has told us to stop")
return
}
}
}
func tock(stop chan struct{}, wg *sync.WaitGroup) {
// tell the caller we've stopped
defer wg.Done()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case now := <-ticker.C:
fmt.Printf("tock: tock %s\n", now.UTC().Format("20060102-150405.000000000"))
case <-stop:
fmt.Println("tock: caller has told us to stop")
return
}
}
}
The output is exactly the same as the previous program, so we should be on the right lines. The program itself has a
few lines removed, a few lines added and looks very similar, however adding new goroutines is a little simpler now. We
just need call wg.Add(1)
and pass both the stop
channel and the waitgroup to it. As I said, it’s only a little
simpler but that’s good, right?
$ go run 06/tidy.go
tick: tick 20170612-221717.992723221
tock: tock 20170612-221719.992700713
tick: tick 20170612-221720.992722592
tick: tick 20170612-221723.992745407
^Cmain: received C-c - shutting down
main: telling goroutines to stop
tock: caller has told us to stop
tick: caller has told us to stop
main: all goroutines have told us they've finished
So far, so good. However, there is another problem on the horizon. Let’s imagine we want to also create a webserver in a goroutine. In the past we used to create one using the following code. The problem here though is that the server blocks the goroutine until it has finished.
package main
import (
"fmt"
"net/http"
)
func main() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "Hello, World!")
})
http.ListenAndServe(":8080", nil)
}
So the question is, how do we also tell the web server to stop?
Context
In Go v1.7, the context package was added and that is our next secret. The ability
to tell a webserver to stop using a context was also added. Using a Context
has become the swiss-army knife of concurrency control in Go over the
past few years (it used to live at https://godoc.org/golang.org/x/net/context but was moved into the standard library).
Let’s have a very quick look at how we can create and cancel a Context
:
// create a context that we can cancel
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// pass this ctx to our goroutines - each of which would select on `<-ctx.Done()`
go tick(ctx, ...)
// sometime later ... in our case after a `C-c`
cancel()
(Side note: if you haven’t see JustForFunc by Francesc Campoy yet, you should watch it - Francesc talks about the Context package in episodes 9 and 10.)
One major advantage of using a Context
over a stop
channel is if any of the goroutines are also creating other
goroutines to do the work for them. In the case of using stopped channels we’d have to create more stop channels to
tell the child goroutines to finish. We’d also have to tie much of this together to make it work. When we use a
Context
however, each goroutine would derive a Context
from the one it was given, and each of them would be told to
cancel.
Before we try adding a webserver, let’s change our example above to use a Context
. The first thing we’ll need to do
is pass the context to each goroutine instead of the channel. Instead of selecting on the channel, it’ll select on
<-ctx.Done()
and still signal back to main()
when it has tidied up.
package main
import (
"context"
"fmt"
"os"
"os/signal"
"sync"
"time"
)
func main() {
// create a context that we can cancel
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// a WaitGroup for the goroutines to tell us they've stopped
wg := sync.WaitGroup{}
// a channel for `tick()` to tell us they've stopped
wg.Add(1)
go tick(ctx, &wg)
// a channel for `tock()` to tell us they've stopped
wg.Add(1)
go tock(ctx, &wg)
// listen for C-c
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
fmt.Println("main: received C-c - shutting down")
// tell the goroutines to stop
fmt.Println("main: telling goroutines to stop")
cancel()
// and wait for them both to reply back
wg.Wait()
fmt.Println("main: all goroutines have told us they've finished")
}
func tick(ctx context.Context, wg *sync.WaitGroup) {
// tell the caller we've stopped
defer wg.Done()
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
select {
case now := <-ticker.C:
fmt.Printf("tick: tick %s\n", now.UTC().Format("20060102-150405.000000000"))
case <-ctx.Done():
fmt.Println("tick: caller has told us to stop")
return
}
}
}
func tock(ctx context.Context, wg *sync.WaitGroup) {
// tell the caller we've stopped
defer wg.Done()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case now := <-ticker.C:
fmt.Printf("tock: tock %s\n", now.UTC().Format("20060102-150405.000000000"))
case <-ctx.Done():
fmt.Println("tock: caller has told us to stop")
return
}
}
}
There is very little difference between this program and the previous one, however we now have the ability to:
- create a webserver that we can cancel with the
Context
- pass the same context to sub goroutines which will also cancel their work when told
And again, the output is the same. We must be doing something right.
$ go run 07/tidy.go
tick: tick 20170612-223954.341894561
tock: tock 20170612-223956.341886006
tick: tick 20170612-223957.341887182
tick: tick 20170612-224000.341927373
^Cmain: received C-c - shutting down
main: telling goroutines to stop
tock: caller has told us to stop
tick: caller has told us to stop
main: all goroutines have told us they've finished
Now let’s get onto the beast and tell our program to also serve HTTP requests.
The Webserver
Before we show the entire program, let’s take a look at what the webserver goroutine would look like. The magic here is
that instead of calling http.ListenAndServe()
we explicitly create the webserver and by doing this we can eventually
signal to it to stop. We’re going to model this on the excellent
HTTP server connection draining section of this article by Tyler
Christensen.
func server(ctx context.Context, wg *sync.WaitGroup) {
// tell the caller that we've stopped
defer wg.Done()
// create a new mux and handler
mux := http.NewServeMux()
mux.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Println("server: received request")
time.Sleep(3 * time.Second)
io.WriteString(w, "Finished!\n")
fmt.Println("server: request finished")
}))
// create a server
srv := &http.Server{Addr: ":8080", Handler: mux}
go func() {
// service connections
if err := srv.ListenAndServe(); err != nil {
fmt.Printf("Listen : %s\n", err)
}
}()
<-ctx.Done()
fmt.Println("server: caller has told us to stop")
// shut down gracefully, but wait no longer than 5 seconds before halting
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// ignore error since it will be "Err shutting down server : context canceled"
srv.Shutdown(shutdownCtx)
fmt.Println("server gracefully stopped")
}
For this func, the only two lines we added in main()
were:
// run `server` in it's own goroutine
wg.Add(1)
go server(ctx, &wg)
For the output of this program, I will send a request to the server curl localhost:8080
after the first tick and you
should see the request start and finish either side of the 2nd tick. And as usual we’ll just show three ticks (and one tock):
$ go run 08/tidy.go
tick: tick 20170612-230003.228960866
server: received request
tock: tock 20170612-230005.228893119
tick: tick 20170612-230006.228868513
server: request finished
tick: tick 20170612-230009.228863351
^Cmain: received C-c - shutting down
main: telling goroutines to stop
server: caller has told us to stop
tick: caller has told us to stop
server gracefully stopped
tock: caller has told us to stop
main: all goroutines have told us they've finished
And as we expected the server also shut down correctly. This time though, I’ll send a request after the 2nd tick but
C-c
the server before the 3rd tick to demonstrate the server graefully shutting down.
$ go run 08/tidy.go
tick: tick 20170612-230408.026717601
tock: tock 20170612-230410.026710464
tick: tick 20170612-230411.026700385
server: received request
^Cmain: received C-c - shutting down
main: telling goroutines to stop
tick: caller has told us to stop
tock: caller has told us to stop
server: caller has told us to stop
Listen : http: Server closed
server: request finished
server gracefully stopped
main: all goroutines have told us they've finished
Notice that both tick()
and tock()
finished first, then we had a couple of seconds where we waited for the
webserver to finish it’s request and then finally shut down. In the previous example the server shut down when it
wasn’t servicing any requests and the srv.ListenAndServe()
didn’t return any error. In this example the server was
servicing a request and returned the http: Server closed
error which appeared above - after which the request
finished
message appeared to prove the request was still in progress. However, it did finish, the client received the
response and everything shut down as expected.
$ curl localhost:8080
Finished!
And that’s it! I hope you’ve enjoyed following along in this rather long article, but I hope we demonstrated not just
how to use a Context
to cancel multiple goroutines, but also how the way we write concurrent Go programs has changed
over the years. As with everything, there are many ways to do all of this and I’m sure I’ve missed some but I hope that
has given you a taster to play with more concurrency and Context
.
defer follow.Me(“andychilton”)
I’m Andrew Chilton and I enjoy being a part of the Go community. Please follow me or @CuratedGo for interesting Go articles, blog posts, and videos.
I am also creating a blogging platform called ZenType. You can show your interest in exchange for a 50% lifetime discount before it launches.
(Ends)