Concurrency Patterns

Kamal Namdeo
7 min read

Concurrency Patterns in Go: A Guide to Efficient Parallelism

Go, also known as Golang, is renowned for its built-in support for concurrency. With its lightweight goroutines and channels, Go allows developers to build highly concurrent programs that scale well with minimal effort. However, writing concurrent code can quickly become complex if not approached with the right patterns.

In this blog post, we’ll explore some common and effective concurrency patterns in Go that can help you build efficient, safe, and scalable concurrent applications.

What is Concurrency in Go?

Concurrency is the ability of a program to perform multiple tasks at the same time. In Go, concurrency is achieved using goroutines and channels. A goroutine is a lightweight thread of execution, and channels provide a way to safely communicate between goroutines.

Go’s concurrency model is designed around these two concepts, making it easier to write concurrent code than in many other languages.


Key Concurrency Patterns in Go

1. The Worker Pool Pattern

The worker pool pattern is a common concurrency pattern used to limit the number of concurrently running goroutines, helping avoid overwhelming system resources. It is particularly useful when you need to process a large number of tasks concurrently, but you want to ensure that only a fixed number of workers are running at any given time.

Example: Worker Pool

Here’s how you can implement a worker pool in Go:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
    "fmt"
    "sync"
)

func worker(id int, tasks <-chan string, wg *sync.WaitGroup) {
    defer wg.Done()
    for task := range tasks {
        fmt.Printf("Worker %d processing task: %s\n", id, task)
    }
}

func main() {
    var wg sync.WaitGroup
    tasks := make(chan string, 10)

    // Start 3 workers
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go worker(i, tasks, &wg)
    }

    // Send tasks to the workers
    tasks <- "Task 1"
    tasks <- "Task 2"
    tasks <- "Task 3"
    tasks <- "Task 4"
    tasks <- "Task 5"

    close(tasks)

    // Wait for all workers to finish
    wg.Wait()
}

Explanation:

  • Worker Goroutines: We create a fixed number of workers that process tasks from a shared channel.
  • Channel: The tasks are sent through the channel, and workers receive and process them one by one.
  • WaitGroup: We use a sync.WaitGroup to wait for all workers to finish processing tasks before the program exits.

The worker pool pattern helps in managing concurrency efficiently by controlling the number of concurrent workers.


2. Fan-Out, Fan-In Pattern

The fan-out, fan-in pattern is useful when you need to distribute tasks across multiple goroutines (fan-out) and then aggregate their results (fan-in). It is often used when you have a large amount of independent work that can be done concurrently and later combined into a final result.

Example: Fan-Out, Fan-In

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package main

import (
    "fmt"
    "sync"
)

func worker(id int, jobs <-chan int, results chan<- int) {
    for job := range jobs {
        fmt.Printf("Worker %d processing job: %d\n", id, job)
        results <- job * 2 // Example of processing: multiply by 2
    }
}

func main() {
    var wg sync.WaitGroup
    jobs := make(chan int, 10)
    results := make(chan int, 10)

    // Fan-out: Start 3 worker goroutines
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            worker(id, jobs, results)
        }(i)
    }

    // Send jobs to workers
    go func() {
        for i := 1; i <= 5; i++ {
            jobs <- i
        }
        close(jobs)
    }()

    // Fan-in: Collect results from workers
    go func() {
        for res := range results {
            fmt.Println("Result:", res)
        }
    }()

    // Wait for all workers to finish
    wg.Wait()
    close(results)
}

Explanation:

  • Fan-out: Multiple worker goroutines process tasks concurrently.
  • Fan-in: A separate goroutine collects results from the worker goroutines and processes them.
  • Channels: Channels are used for communication between the workers and the main goroutine.

This pattern is ideal for parallel processing tasks that need to be aggregated or further processed once completed.


3. Pipeline Pattern

The pipeline pattern involves processing data through a series of stages, where each stage is a goroutine that transforms the data. Each stage in the pipeline processes the data concurrently, making this pattern ideal for stream processing or any task that can be broken down into a series of transformations.

Example: Pipeline Pattern

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package main

import (
    "fmt"
)

func stage1(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for i := range in {
            out <- i * 2 // Example: double the number
        }
        close(out)
    }()
    return out
}

func stage2(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for i := range in {
            out <- i + 3 // Example: add 3 to the number
        }
        close(out)
    }()
    return out
}

func main() {
    input := make(chan int)
    go func() {
        for i := 1; i <= 5; i++ {
            input <- i
        }
        close(input)
    }()

    // Pass through stages
    result := stage2(stage1(input))

    // Collect results
    for res := range result {
        fmt.Println("Result:", res)
    }
}

Explanation:

  • Stage 1: Doubles the input numbers.
  • Stage 2: Adds 3 to the output of stage 1.
  • Channels: Data flows through channels, with each stage processing the data concurrently.

The pipeline pattern helps break down complex tasks into simpler steps, each performed in parallel, which can significantly improve performance.


4. Mutex and RWMutex Pattern

When multiple goroutines need to access shared resources, you can use mutexes and RWMutexes to ensure that only one goroutine can access a resource at a time. This is crucial for preventing data races and maintaining consistency.

Example: Mutex

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
    "fmt"
    "sync"
)

var (
    counter int
    mu      sync.Mutex
)

func increment() {
    mu.Lock()
    counter++
    mu.Unlock()
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            increment()
        }()
    }
    wg.Wait()
    fmt.Println("Final counter:", counter)
}

Explanation:

  • Mutex Locking: mu.Lock() ensures that only one goroutine can modify counter at a time, avoiding race conditions.
  • Unlocking: mu.Unlock() allows other goroutines to access the critical section.

Using mutexes is essential for managing shared resources in a concurrent environment.


Best Practices for Concurrency in Go

  • Minimize Shared State: Use channels for communication instead of shared variables where possible to avoid synchronization issues.
  • Avoid Blocking: Make sure goroutines don’t block each other unnecessarily, as this can reduce the benefits of concurrency.
  • Graceful Shutdown: Always ensure proper synchronization (e.g., using sync.WaitGroup) when shutting down your program to allow goroutines to finish their tasks.
  • Test Concurrency: Concurrency can introduce subtle bugs like race conditions. Use tools like Go’s -race flag to catch these issues during development.

Conclusion

Concurrency is one of Go’s most powerful features, and understanding concurrency patterns can help you write more efficient and scalable programs. Patterns like the worker pool, fan-out, fan-in, pipeline, and mutex ensure that your concurrent applications are robust and easy to manage.

By leveraging these patterns, you can avoid common pitfalls in concurrent programming and take full advantage of Go’s concurrency model.

Happy coding and concurrent processing in Go!


---