Archive | RSS | Atom
chilts.org

Cancelling Multiple Goroutines

Cancelling Multiple Goroutines

When Go was first released, there was a way to do some things in concurrency. As time has gone on, various things have changed. The Context package for one thing. :)

This article doesn’t go into all of the ways of doing concurrency but will focus on one problem and take you through a few different solutions so you can see how things have evolved.

The Problem

The problem I’d like to address here is being able to cancel multiple goroutines. There are many blog posts out there (I curate @CuratedGo, please follow) which show how to cancel just one goroutine, but my use-case was slightly more complicated. The rest of this article summarises my progress through getting this to work.

The way we’re going decide when to quit is by listening for a C-c keypress. Of course at that point, we want to make sure we tidy up things nicely at that point. For example, if we’re currently streaming tweets from Twitter, we’d rather we told them we’re finished than just drop the connection.

Let’s get started.

A Main without Tidying up

package main

import (
    "fmt"
    "time"
)

func main() {
    ticker := time.NewTicker(3 * time.Second)

    for {
        select {
        case now := <-ticker.C:
            fmt.Printf("tick %s\n", now.UTC().Format("20060102-150405.000000000"))
        }
    }
}

And let’s run it and C-c it.

$ go run 01/tidy.go 
tick 20170612-213112.045887655
tick 20170612-213115.045986150
tick 20170612-213118.045993591
^Csignal: interrupt

Here you can see we have sent the interrupt signal. Make a mental note of that name. However, we haven’t actually tidied up the timer. There are a few ways we could do it, and the easiest for this program is to defer ticker.Stop() so it gets run at the end of main().

package main

import (
    "fmt"
    "time"
)

func main() {
    ticker := time.NewTicker(3 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case now := <-ticker.C:
            fmt.Printf("tick %s\n", now.UTC().Format("20060102-150405.000000000"))
        }
    }
}

There is no discernable difference in the output, however you are being a good citizen. :)

$ go run 02/tidy.go 
tick 20170612-213456.385205269
tick 20170612-213459.385180852
tick 20170612-213502.385222563
^Csignal: interrupt

We said earlier that we want to run multiple goroutines and we want to listen for C-c, so let’s do the C-c first.

Using the os/signal package, we can tell Go to listen for (you guessed it) OS Signals such as os.Interrupt and os.Kill. Let’s see what that looks like:

package main

import (
    "fmt"
    "os"
    "os/signal"
    "time"
)

func main() {
    ticker := time.NewTicker(3 * time.Second)
    defer ticker.Stop()

    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt)

    for {
        select {
        case now := <-ticker.C:
            fmt.Printf("tick %s\n", now.UTC().Format("20060102-150405.000000000"))
        case <-c:
            fmt.Println("Received C-c - shutting down")
            return
        }
    }
}

And when we run it, instead of seeing the default message Go provides when it receives an interrupt signal, we can see our own message:

$ go run 03/tidy.go 
tick 20170612-214602.313917282
tick 20170612-214605.313950300
tick 20170612-214608.313950904
^CReceived C-c - shutting down

Excellent, so let’s start moving the program closer to what we want - running multiple goroutines and stopping them cleanly

Signalling a Goroutine to Stop

Even though we only have one task at the moment, we will put it into it’s own goroutine and signal it to stop when we have received the C-c. I’m going to use the first half of a post called “Stopping Goroutines” by the excellent Mat Ryer as the basis for this process. Note when this post was written - 2015 - and be sure we’ll change a few things by the time we’ve finished this article.

The next example shows the ticker in it’s own goroutine. Notice that instead of keeping the signal receiver in the for select case <-c we’ll just change it to <-c since that’s the only thing we’re going to leave in main(). I will prefix the messages with either main or tick so you can see what’s going on.

package main

import (
    "fmt"
    "os"
    "os/signal"
    "time"
)

func main() {
    // a channel to tell `tick()` to stop, and one to tell us they've stopped
    stopChan := make(chan struct{})
    stoppedChan := make(chan struct{})
    go tick(stopChan, stoppedChan)

    // listen for C-c
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt)
    <-c
    fmt.Println("main: received C-c - shutting down")

    // tell the goroutine to stop
    fmt.Println("main: telling goroutines to stop")
    close(stopChan)
    // and wait for them to reply back
    <-stoppedChan
    fmt.Println("main: goroutine has told us they've finished")
}

func tick(stop, stopped chan struct{}) {
    // tell the caller we've stopped
    defer close(stopped)

    ticker := time.NewTicker(3 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case now := <-ticker.C:
            fmt.Printf("tick: tick %s\n", now.UTC().Format("20060102-150405.000000000"))
        case <-stop:
            fmt.Println("tick: caller has told us to stop")
            return
        }
    }
}

Once you press C-c here, you can see the exchange of messages.

$ go run 04/tidy.go 
tick: tick 20170612-220018.345218301
tick: tick 20170612-220021.345202622
tick: tick 20170612-220024.345147172
^Cmain: received C-c - shutting down
main: telling goroutines to stop
tick: caller has told us to stop
main: goroutine has told us they've finished

So far so good. It works.

But I can see one problem on the horizon. When we add another goroutine, we’ll have to create another stopped channel for the second goroutine to tell us when they’ve stopped. (Side-note: I originally also created a new stop chan too, but we can re-use that channel for both goroutines.)

Let’s see what the extra stopped channel looks like. In this example our second goroutine tock() is very similar to the first, except it tocks every 5s instead of ticks every 3s.

package main

import (
    "fmt"
    "os"
    "os/signal"
    "time"
)

func main() {
    // a channel to tell `tick()` and `tock()` to stop
    stopChan := make(chan struct{})

    // a channel for `tick()` to tell us they've stopped
    tickStoppedChan := make(chan struct{})
    go tick(stopChan, tickStoppedChan)

    // a channel for `tock()` to tell us they've stopped
    tockStoppedChan := make(chan struct{})
    go tock(stopChan, tockStoppedChan)

    // listen for C-c
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt)
    <-c
    fmt.Println("main: received C-c - shutting down")

    // tell the goroutine to stop
    fmt.Println("main: telling goroutines to stop")
    close(stopChan)
    // and wait for them to reply back
    <-tickStoppedChan
    <-tockStoppedChan
    fmt.Println("main: all goroutines have told us they've finished")
}

func tick(stop, stopped chan struct{}) {
    // tell the caller we've stopped
    defer close(stopped)

    ticker := time.NewTicker(3 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case now := <-ticker.C:
            fmt.Printf("tick: tick %s\n", now.UTC().Format("20060102-150405.000000000"))
        case <-stop:
            fmt.Println("tick: caller has told us to stop")
            return
        }
    }
}

func tock(stop, stopped chan struct{}) {
    // tell the caller we've stopped
    defer close(stopped)

    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case now := <-ticker.C:
            fmt.Printf("tock: tock %s\n", now.UTC().Format("20060102-150405.000000000"))
        case <-stop:
            fmt.Println("tock: caller has told us to stop")
            return
        }
    }
}

It’s starting to look unwieldy. However, let’s take a look at the output for completeness:

$ go run 05/tidy.go 
tick: tick 20170612-220618.466725240
tock: tock 20170612-220620.466789888
tick: tick 20170612-220621.466756817
tick: tick 20170612-220624.466762771
^Cmain: received C-c - shutting down
main: telling goroutines to stop
tock: caller has told us to stop
tick: caller has told us to stop
main: all goroutines have told us they've finished

Even though it’s looking a bit nasty, it still works as it should.

sync.WaitGroup

Let’s try and tidy-up and simplify a bit here. The reason to do this is because if we’d like to add another goroutine to this program - or indeed another 10, 20 or a hundred - we’re going to have a headache with all the channels we need to create.

So instead of channels, let’s try another concurrency fundamental that Go provides, which is sync.WaitGroup. Here we create just one WaitGroup (instead of two channels) and use that for the goroutines to signal they’ve finished. Remember, once we create the WaitGroup we shouldn’t copy it, so we need to pass it by reference.

package main

import (
    "fmt"
    "os"
    "os/signal"
    "sync"
    "time"
)

func main() {
    // a channel to tell `tick()` and `tock()` to stop
    stopChan := make(chan struct{})

    // a WaitGroup for the goroutines to tell us they've stopped
    wg := sync.WaitGroup{}

    // a channel for `tick()` to tell us they've stopped
    wg.Add(1)
    go tick(stopChan, &wg)

    // a channel for `tock()` to tell us they've stopped
    wg.Add(1)
    go tock(stopChan, &wg)

    // listen for C-c
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt)
    <-c
    fmt.Println("main: received C-c - shutting down")

    // tell the goroutine to stop
    fmt.Println("main: telling goroutines to stop")
    close(stopChan)
    // and wait for them both to reply back
    wg.Wait()
    fmt.Println("main: all goroutines have told us they've finished")
}

func tick(stop chan struct{}, wg *sync.WaitGroup) {
    // tell the caller we've stopped
    defer wg.Done()

    ticker := time.NewTicker(3 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case now := <-ticker.C:
            fmt.Printf("tick: tick %s\n", now.UTC().Format("20060102-150405.000000000"))
        case <-stop:
            fmt.Println("tick: caller has told us to stop")
            return
        }
    }
}

func tock(stop chan struct{}, wg *sync.WaitGroup) {
    // tell the caller we've stopped
    defer wg.Done()

    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case now := <-ticker.C:
            fmt.Printf("tock: tock %s\n", now.UTC().Format("20060102-150405.000000000"))
        case <-stop:
            fmt.Println("tock: caller has told us to stop")
            return
        }
    }
}

The output is exactly the same as the previous program, so we should be on the right lines. The program itself has a few lines removed, a few lines added and looks very similar, however adding new goroutines is a little simpler now. We just need call wg.Add(1) and pass both the stop channel and the waitgroup to it. As I said, it’s only a little simpler but that’s good, right?

$ go run 06/tidy.go 
tick: tick 20170612-221717.992723221
tock: tock 20170612-221719.992700713
tick: tick 20170612-221720.992722592
tick: tick 20170612-221723.992745407
^Cmain: received C-c - shutting down
main: telling goroutines to stop
tock: caller has told us to stop
tick: caller has told us to stop
main: all goroutines have told us they've finished

So far, so good. However, there is another problem on the horizon. Let’s imagine we want to also create a webserver in a goroutine. In the past we used to create one using the following code. The problem here though is that the server blocks the goroutine until it has finished.

package main

import (
    "fmt"
    "net/http"
)

func main() {
    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        fmt.Fprintln(w, "Hello, World!")
    })
    http.ListenAndServe(":8080", nil)
}

So the question is, how do we also tell the web server to stop?

Context

In Go v1.7, the context package was added and that is our next secret. The ability to tell a webserver to stop using a context was also added. Using a Context has become the swiss-army knife of concurrency control in Go over the past few years (it used to live at https://godoc.org/golang.org/x/net/context but was moved into the standard library).

Let’s have a very quick look at how we can create and cancel a Context:

    // create a context that we can cancel
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // pass this ctx to our goroutines - each of which would select on `<-ctx.Done()`
    go tick(ctx, ...)

    // sometime later ... in our case after a `C-c`
    cancel()

(Side note: if you haven’t see JustForFunc by Francesc Campoy yet, you should watch it - Francesc talks about the Context package in episodes 9 and 10.)

One major advantage of using a Context over a stop channel is if any of the goroutines are also creating other goroutines to do the work for them. In the case of using stopped channels we’d have to create more stop channels to tell the child goroutines to finish. We’d also have to tie much of this together to make it work. When we use a Context however, each goroutine would derive a Context from the one it was given, and each of them would be told to cancel.

Before we try adding a webserver, let’s change our example above to use a Context. The first thing we’ll need to do is pass the context to each goroutine instead of the channel. Instead of selecting on the channel, it’ll select on <-ctx.Done() and still signal back to main() when it has tidied up.

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "sync"
    "time"
)

func main() {
    // create a context that we can cancel
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // a WaitGroup for the goroutines to tell us they've stopped
    wg := sync.WaitGroup{}

    // a channel for `tick()` to tell us they've stopped
    wg.Add(1)
    go tick(ctx, &wg)

    // a channel for `tock()` to tell us they've stopped
    wg.Add(1)
    go tock(ctx, &wg)

    // listen for C-c
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt)
    <-c
    fmt.Println("main: received C-c - shutting down")

    // tell the goroutines to stop
    fmt.Println("main: telling goroutines to stop")
    cancel()

    // and wait for them both to reply back
    wg.Wait()
    fmt.Println("main: all goroutines have told us they've finished")
}

func tick(ctx context.Context, wg *sync.WaitGroup) {
    // tell the caller we've stopped
    defer wg.Done()

    ticker := time.NewTicker(3 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case now := <-ticker.C:
            fmt.Printf("tick: tick %s\n", now.UTC().Format("20060102-150405.000000000"))
        case <-ctx.Done():
            fmt.Println("tick: caller has told us to stop")
            return
        }
    }
}

func tock(ctx context.Context, wg *sync.WaitGroup) {
    // tell the caller we've stopped
    defer wg.Done()

    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case now := <-ticker.C:
            fmt.Printf("tock: tock %s\n", now.UTC().Format("20060102-150405.000000000"))
        case <-ctx.Done():
            fmt.Println("tock: caller has told us to stop")
            return
        }
    }
}

There is very little difference between this program and the previous one, however we now have the ability to:

  1. create a webserver that we can cancel with the Context
  2. pass the same context to sub goroutines which will also cancel their work when told

And again, the output is the same. We must be doing something right.

$ go run 07/tidy.go 
tick: tick 20170612-223954.341894561
tock: tock 20170612-223956.341886006
tick: tick 20170612-223957.341887182
tick: tick 20170612-224000.341927373
^Cmain: received C-c - shutting down
main: telling goroutines to stop
tock: caller has told us to stop
tick: caller has told us to stop
main: all goroutines have told us they've finished

Now let’s get onto the beast and tell our program to also serve HTTP requests.

The Webserver

Before we show the entire program, let’s take a look at what the webserver goroutine would look like. The magic here is that instead of calling http.ListenAndServe() we explicitly create the webserver and by doing this we can eventually signal to it to stop. We’re going to model this on the excellent HTTP server connection draining section of this article by Tyler Christensen.

func server(ctx context.Context, wg *sync.WaitGroup) {
    // tell the caller that we've stopped
    defer wg.Done()

    // create a new mux and handler
    mux := http.NewServeMux()
    mux.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        fmt.Println("server: received request")
        time.Sleep(3 * time.Second)
        io.WriteString(w, "Finished!\n")
        fmt.Println("server: request finished")
    }))

    // create a server
    srv := &http.Server{Addr: ":8080", Handler: mux}

    go func() {
        // service connections
        if err := srv.ListenAndServe(); err != nil {
            fmt.Printf("Listen : %s\n", err)
        }
    }()

    <-ctx.Done()
    fmt.Println("server: caller has told us to stop")

    // shut down gracefully, but wait no longer than 5 seconds before halting
    shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    // ignore error since it will be "Err shutting down server : context canceled"
    srv.Shutdown(shutdownCtx)

    fmt.Println("server gracefully stopped")
}

For this func, the only two lines we added in main() were:

    // run `server` in it's own goroutine
    wg.Add(1)
    go server(ctx, &wg)

For the output of this program, I will send a request to the server curl localhost:8080 after the first tick and you should see the request start and finish either side of the 2nd tick. And as usual we’ll just show three ticks (and one tock):

$ go run 08/tidy.go 
tick: tick 20170612-230003.228960866
server: received request
tock: tock 20170612-230005.228893119
tick: tick 20170612-230006.228868513
server: request finished
tick: tick 20170612-230009.228863351
^Cmain: received C-c - shutting down
main: telling goroutines to stop
server: caller has told us to stop
tick: caller has told us to stop
server gracefully stopped
tock: caller has told us to stop
main: all goroutines have told us they've finished

And as we expected the server also shut down correctly. This time though, I’ll send a request after the 2nd tick but C-c the server before the 3rd tick to demonstrate the server graefully shutting down.

$ go run 08/tidy.go 
tick: tick 20170612-230408.026717601
tock: tock 20170612-230410.026710464
tick: tick 20170612-230411.026700385
server: received request
^Cmain: received C-c - shutting down
main: telling goroutines to stop
tick: caller has told us to stop
tock: caller has told us to stop
server: caller has told us to stop
Listen : http: Server closed
server: request finished
server gracefully stopped
main: all goroutines have told us they've finished

Notice that both tick() and tock() finished first, then we had a couple of seconds where we waited for the webserver to finish it’s request and then finally shut down. In the previous example the server shut down when it wasn’t servicing any requests and the srv.ListenAndServe() didn’t return any error. In this example the server was servicing a request and returned the http: Server closed error which appeared above - after which the request finished message appeared to prove the request was still in progress. However, it did finish, the client received the response and everything shut down as expected.

$ curl localhost:8080
Finished!

And that’s it! I hope you’ve enjoyed following along in this rather long article, but I hope we demonstrated not just how to use a Context to cancel multiple goroutines, but also how the way we write concurrent Go programs has changed over the years. As with everything, there are many ways to do all of this and I’m sure I’ve missed some but I hope that has given you a taster to play with more concurrency and Context.

defer follow.Me(“andychilton”)

I’m Andrew Chilton and I enjoy being a part of the Go community. Please follow me or @CuratedGo for interesting Go articles, blog posts, and videos.

I am also creating a blogging platform called ZenType. You can show your interest in exchange for a 50% lifetime discount before it launches.

(Ends)

20170612-231322.428325625