Go Concurrency Design Patterns - Worker

Today I will continue my adventure into common go concurrency patterns with the worker pattern.

Today I will continue my adventure into common go concurrency patterns with the worker pattern.

What is it good for?

This pattern is good for when I have a known amount of jobs/tasks/processes but I would like to limit the resources processing them.

Examples

  • I have a list of 1000 urls and I would like to throttle to a max number of urls that can be requested simultaniously.

  • I have a large list of calculation that required heavy processing. Running them all at once drains all my CPU, I will need to limit how many are ran at any one time*

The Components

For this design pattern we will need;

  • An input channel (to input the jobs into)
  • An output channel (to push the completed jobs into)
  • A completed channel (to confirm that all the jobs have been completed, the main function will hang on this)
  • A worker function, which will listen to the input channel, do some work, and post to the output channel, and number of these can be setup
  • A output function which will listen on the output channel and print to screen the jobsthat have been completed

and that is pretty much it.

Walkthrough

First lets look at this for a high level view; the code is below if you would like to skip this

The input pipeline

To create the input pipline

input := make(chan int, 0)

To push to the input pipeline

input <- i

The output pipeline

To create the output pipline

output := make(chan int, 0)

To push to the output pipeline

output <- i

The worker function

* The function takes an input channel (this is what it will listen to)
* The function will take a output channel (this is what it will push to)
func worker(input chan int, output chan int) {
	for id := range input {
		if id == 0 {
			continue
		}
		fmt.Printf("working on %d\n", id)
		time.Sleep(time.Second * 2)
		// do something with found entity
		output <- id
	}
}

The output function

This watchOutputChannel function watches the output channel and prints to screen what is received in the output channel. You may notice that this only loop around for the amount of time we specified, and then an empty struct is passed into the completed pipe. This is what we will be waiting for in our main.go.

func watchOutputChannel(count int, output chan client.Entity, completed chan struct{}) {
   	for i := 0; i < count; i++ {
   		fmt.Printf("successfully completed id: %d\n", <-output)
   	}
   	completed <- struct{}{}
   }

Those are the four basic components, now let us tie them all together.

package main

import (
	"fmt"
	"time"
)

func main() {
	// How many workers do we want to run?
	count := 6

	// We would like to request ids 1-100
	maxId := 100

	// Lets create the three channels
	input := make(chan int)
	output := make(chan int)
	completed := make(chan struct{})

	// run as a go routine, lets watch for everything passed into output
	go watchOutputChannel(maxId, output, completed)

	// loop around for count and spin up a go routine each time
	for i := 1; i <= count; i++ {
		go worker(i, input, output)
		fmt.Printf("spun up go routine %d\n", i)
	}

	// push all these ids into the pipeline to be processed by the three go routes above when ready
	for i := 1; i <= maxId; i++ {
		input <- i
		fmt.Printf("pushed id %d into pipeline\n", i)
	}
	// close input channel, it will not be able to receive any more values,
	// but all values pushed in will be honoured
	close(input)
	fmt.Println("input closed")

	// Now we wait for the empty struct to be pushed into completed pipeline
	// this can be found in watchOutputChannel()
	<-completed
	fmt.Println("done")
}

func watchOutputChannel(count int, output chan int, completed chan struct{}) {
	for i := 0; i < count; i++ {
		fmt.Printf("successfully completed id: %d\n", <-output)
	}
	completed <- struct{}{}
}

func worker(workerId int, input chan int, output chan int) {
	for id := range input {
		if id == 0 {
			continue
		}
		fmt.Printf("working on id: %d in go routine: %d\n", id, workerId)
		time.Sleep(time.Second * 2)
		// do something with found entity
		output <- id
	}
}

The output

spun up go routine 1
spun up go routine 2
spun up go routine 3
spun up go routine 4
spun up go routine 5
spun up go routine 6
working on id: 1 in go routine: 6
pushed id 1 into pipeline
pushed id 2 into pipeline
pushed id 3 into pipeline
pushed id 4 into pipeline
pushed id 5 into pipeline
pushed id 6 into pipeline
working on id: 6 in go routine: 5
working on id: 2 in go routine: 1
working on id: 3 in go routine: 2
working on id: 4 in go routine: 3
working on id: 5 in go routine: 4
working on id: 7 in go routine: 5
successfully completed id: 6
successfully completed id: 1
successfully completed id: 5
successfully completed id: 4
successfully completed id: 3
successfully completed id: 2
pushed id 7 into pipeline
...