Go
Concurrency

Concurrency

Concurrency vs Parallelism

Concurrency is the composition of independently executing processes, while parallelism is the simultaneous execution of (possibly related) computations.

Concurrency means multiple tasks which start, run, and complete in overlapping time periods, in no specific order. Parallelism is when multiple tasks OR several part of a unique task literally run at the same time, e.g. on a multi-core processor. Remember that Concurrency and parallelism are NOT the same thing.

The idea of concurrent programming is old. It has been used in many languages like oCaml, Erlang, etc. Go, however, introduces Channels as a primary concept of concurrency.

graph TD
    subgraph Concurrency
        A[Task 1] --> C{Switch}
        B[Task 2] --> C
        C --> A
        C --> B
        C -.-> D[Task 3]
        D -.-> C
    end

    subgraph Parallelism
        E[Task 1] --> G[Task 1 Complete]
        F[Task 2] --> H[Task 2 Complete]
        I[Task 3] --> J[Task 3 Complete]
    end

    style Concurrency fill:#118098,stroke:#333,stroke-width:2px
    style Parallelism fill:#7C5576,stroke:#333,stroke-width:2px

Process, Threads & Goroutines

  • Processes are independent execution units with their own resources, isolated from each other for security.
  • Kernel-level threads are managed by the operating system, allowing concurrent execution within a process and can utilize multiple CPU cores.
  • Green threads, (a.k.a user-level threads) managed in user space by a runtime library, are lightweight and offer efficient context switching but may not fully leverage multicore architectures as kernel threads do.

Each method offers different trade-offs in performance, isolation, and resource utilization.

Go’s Approach to Concurrency

Go has built-in support for concurrency, which is achieved using goroutines and channels. A goroutine is a lightweight thread managed by the Go runtime. Channels are a typed conduit through which you can send and receive values with the channel operator (<-).

Go implements a hybrid approach to concurrency that combines aspects of green threads and kernel threads. Goroutines behave similarly to green threads: they are lightweight, managed by the Go runtime, and allow for the creation of thousands of concurrent operations with minimal overhead.

Instead of being purely user-level threads, the Go scheduler maps these goroutines onto a smaller number of OS threads (kernel-level threads). This mapping allows Go programs to efficiently utilize multiple CPU cores while abstracting the complexity of direct thread management and synchronization. The runtime dynamically manages the allocation of goroutines to threads, enabling concurrent execution with efficient use of system resources. This approach allows Go to offer the benefits of green threads, such as low overhead and high scalability, while still leveraging the parallel execution capabilities of kernel threads.

An independently executing function, launched by a go statement. It is very cheap, it is practical to have even hundred of thousands of goroutines. It is not a thread, instead goroutines are multiplexed dynamically onto threads as needed to keep all the goroutines running. – Rob Pike

To create a goroutine, use the keyword go followed by a function invocation.

Go’s Concurrency relation to CPU

When a Go program is executed, the Go runtime takes advantage of the available CPU cores through its scheduler, which automatically distributes goroutines across available OS threads (logical processors). The number of OS threads that can execute user-level Go code concurrently is controlled by the GOMAXPROCS variable. By default, GOMAXPROCS is set to the number of CPU cores available on the machine, which means the Go scheduler can potentially run goroutines on all cores simultaneously.

Goroutines are multiplexed onto the available OS threads. Even if GOMAXPROCS is set to 1, multiple goroutines can still run concurrently on that single core, with the scheduler managing their execution time-slicing. This allows for efficient concurrency even on single-core machines, but these goroutines will not run in parallelβ€”they will share the CPU time allocated to the single thread.

If GOMAXPROCS is greater than 1, goroutines can run in parallel across multiple cores, with each core processing different goroutines simultaneously. This setup is beneficial for computationally intensive tasks that can be parallelized to improve performance significantly.

for example:

package main

import (
    "fmt"
    "runtime"
    "sync"
)

func main() {
    // Set GOMAXPROCS to 2, allowing the scheduler to use two cores
    runtime.GOMAXPROCS(2)

    var wg sync.WaitGroup
    // Launching 10 goroutines
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(num int) {
            defer wg.Done()
            // Each goroutine will print its ID
            fmt.Printf("Goroutine %d running\n", num)
        }(i)
    }

    wg.Wait()
    fmt.Println("All goroutines completed.")
}

In the example above, the Go runtime is configured to use up to two CPU cores (GOMAXPROCS(2)), and ten goroutines are launched. These goroutines are managed by the Go scheduler, which distributes them across the two cores.

graph TD;
    Core1(Core 1) -->|Goroutine 1| G1(Goroutine 1)
    Core1 -->|Goroutine 2| G2(Goroutine 2)
    Core1 -->|Goroutine 3| G3(Goroutine 3)
    Core1 -->|Goroutine 4| G4(Goroutine 4)
    Core1 -->|Goroutine 5| G5(Goroutine 5)
    Core2(Core 2) -->|Goroutine 6| G6(Goroutine 6)
    Core2 -->|Goroutine 7| G7(Goroutine 7)
    Core2 -->|Goroutine 8| G8(Goroutine 8)
    Core2 -->|Goroutine 9| G9(Goroutine 9)
    Core2 -->|Goroutine 10| G10(Goroutine 10)

    style Core1 fill:#e0f9c4,stroke:#333,stroke-width:2px
    style Core2 fill:#a7c7fb,stroke:#333,stroke-width:2px

The Go scheduler dynamically adjusts to the workload. If all running goroutines are consuming small amounts of CPU, the scheduler can effectively handle more goroutines concurrently on each processor. The scheduler uses a work-stealing algorithm, which helps in balancing the load across processors.

Goroutine Lifecycle

When a new goroutine is spawned using the go keyword, it begins executing concurrently and operates independently of the initiating thread. This goroutine will continue running until it either completes its assigned task, returns from the function, or encounters a runtime error. This design enables the main thread to proceed without needing to wait for the goroutine to finish.

However, the autonomy of goroutines in Go’s runtime environment needs to take into consideration the program’s lifecycle, particularly its termination. If the main thread concludes and the program exits while a background goroutine is still active, the termination of the program will abruptly halt all goroutines, irrespective of their completion status.

To prevent premature termination and ensure all essential background tasks are completed, developers must strategically manage the lifetime of the main thread. This is typically achieved using synchronization tools such as channels or sync.WaitGroup, which facilitate the main thread in waiting for signals of completion from background goroutines before the program is allowed to gracefully exit.

Please note, the concept of parent-child relationships in goroutines is not applicable in Go. Unlike traditional processes in operating systems, where child processes are terminated when the parent process exits. Each goroutine is scheduled independently by the Go scheduler, and the termination of one does not directly affect the execution of others. Therefore, as long as the main goroutine remains active, any other goroutines, irrespective of their origin, will continue to run. This setup ensures that all goroutines have the opportunity to complete, provided that the main goroutine is still running.

Channels

Don’t communicate by sharing memory. Share memory by communicating. ⎯ Rob Pike

Channels in Go are a powerful feature used for communication between goroutines. You can think of channels as pipes through which you can send and receive values between different parts of a Go program that are running concurrently. Channels ensure that data is safely shared between goroutines, helping to avoid common pitfalls like race conditions.

To declare a channel, you use the chan keyword followed by the type of values that the channel will carry. The direction of the channel (send or receive) can be specified but is optional in a general declaration. Here’s how you declare a channel:


var myChannel chan int

// another way to declare a channel is to use the make function
// this is an unbuffered channel. More on it later
myChannel := make(chan int)

// and this is a buffered channel. This is a channel with a capacity of 10.
myBufferedChannel := make(chan int, 10)

To send a value into a channel, you use the channel variable, followed by the arrow operator (<-), and then the value to send:

myChannel <- 5 // Sends the integer 5 into myChannel

This operation will block if it’s an unbuffered channel and there’s no goroutine ready to receive the value, or if it’s a buffered channel that is already full.

To receive a value from a channel, you use the arrow operator (<-) followed by the channel variable:

value := <-myChannel // Receives a value from myChannel

This operation will block if the channel is empty until a value is sent on the channel.

You can close a channel when no more values will be sent on it. Closing a channel is done with the close function:

close(myChannel)

In function signatures, you can specify whether a channel is meant only to send or receive values. This is done by placing the arrow operator before or after the chan keyword:

func sendOnly(ch chan<- int) {
    ch <- 5 // This function can only send values to the channel
}

func receiveOnly(ch <-chan int) int {
    return <-ch // This function can only receive values from the channel
}

Types of Channels

A sender and a receiver must both be ready to play their part in the communication. Otherwise we wait until they are. Thus channels both communicate and synchronize. ⎯ Rob Pike

Unbuffered Channels

flowchart LR
    A[Goroutine A] -- "send value" --> B(Unbuffered Channel)
    B -- "waits for receive" --> C[Blocked Send]
    B -- "if receiver ready" --> D[Successful Send/Receive]
    E[Goroutine B] -- "receive value" --> B
    B -- "waits for send" --> F[Blocked Receive]
    B -- "if sender ready" --> D

    classDef goroutine fill:#748796,stroke:#333,stroke-width:2px;
    classDef operation fill:#077fed,stroke:#393,stroke-width:2px;
    class A,E goroutine;
    class B channel;
    class C,F operation;

Unbuffered channels are the most straightforward type of channel. When you send a value to an unbuffered channel, the send operation blocks until another goroutine reads from the channel. Similarly, when you read from an unbuffered channel, the read blocks until some goroutine writes a value to that channel.

package main

import (
    "fmt"
    "time"
)

func main() {
    message := make(chan string) // Creating an unbuffered channel

    go func() {
        message <- "Hello, World!" // Sending a value to the channel (blocks until received)
    }()

    time.Sleep(time.Second) // Simulating work to show blocking behavior
    received := <-message   // Receiving a value from the channel
    fmt.Println(received)
}

Buffered Channels

flowchart LR
    A[Goroutine A] -- "send value" --> B(Buffered Channel)
    B -- "holds value" --> C{Buffer}
    C -- "if full" --> D[Blocked Send]
    C -- "if not full" --> E[Successful Send]
    F[Goroutine B] -- "receive value" --> B
    B -- "if empty" --> G[Blocked Receive]
    B -- "if not empty" --> H[Successful Receive]

    classDef goroutine fill:#748796,stroke:#333,stroke-width:2px;
    classDef operation fill:#077fed,stroke:#393,stroke-width:2px;
    class A,F goroutine;
    class B channel;
    class D,E,G,H operation;

Buffered channels, on the other hand, have a capacity 1, meaning they can hold one or more values before blocking. When you send a value to a buffered channel, the operation only blocks if the channel’s buffer is full. Similarly, reading from a buffered channel only blocks if the channel is empty. Buffered channels are useful when you want to limit the amount of blocking, allowing a certain number of values to queue up in the channel.

package main

import "fmt"

func main() {
    message := make(chan string, 2) // Creating a buffered channel with capacity of 2

    message <- "Hello" // Sending a value to the channel (does not block if buffer is not full)
    message <- "World"    // Sending another value

    fmt.Println(<-message) // Receiving a value from the channel (Hello)
    fmt.Println(<-message) // Receiving another value (World)
}

Select

The select statement in Go is a powerful feature used for simultaneously waiting on multiple communication operations involving channels. It allows a goroutine to wait on multiple channel operations, blocking until one of its cases can proceed, which makes it particularly useful for concurrent programming to synchronize or manage multiple channel operations.

Here’s how select works:

  • The select statement blocks until one of its cases can proceed.
  • If multiple cases can proceed, one is chosen at random to execute.
  • It is often used with a default case, which is executed immediately if no other case is ready, making it possible to avoid blocking.

The select statement is essential for implementing complex concurrency patterns and handling asynchronous I/O operations without resorting to busy waiting or polling mechanisms.

package main

import (
    "fmt"
    "time"
)

func main() {
    c1 := make(chan string)
    c2 := make(chan string)

    go func() {
        time.Sleep(2 * time.Second)
        c1 <- "one"
    }()

    go func() {
        time.Sleep(1 * time.Second)
        c2 <- "two"
    }()

    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-c1:
            fmt.Println("Received", msg1)
        case msg2 := <-c2:
            fmt.Println("Received", msg2)
        }
    }
}

Patterns

Generator: function that returns a channel

A generator is a way to produce a series of values, one at a time, using a combination of functions, goroutines, and channels.

c := boring("boring!") // function returning a channel
for i := 0; i < 5; i++ {
  fmt.Printf("You say %q\n", <-c)
}
fmt.Println("You're boring; I'm leaving")

func boring(msg string) <-chan string{
  c:= make(chan string)
  go func() {
    for i := 0; i++ {
      c <- fmt.Sprintf("%s %d", msg, i)
      time.Sleep(time.Duration(rand.Intn(1e3)) * time.Milleseocond)
    }
  }()
  return c // Return the channel to the caller
}

Multiplexing

However if you have two channels, one will still block the other. Multiplexing refers to combining multiple channel operations into a single operation, typically using the select statement. This allows a goroutine to wait on multiple channels at once, responding as soon as one of the channel operations is ready.

We can instead use a fan-in function to let whoever is ready to talk.

func fanIn(input1, input2 <-chan string) <-chan string{
  c:= make(chan string)
  go func() {for {c <- <-input1}}()
  go func() {for {c <- <-input2}}()

  return c
}

func main() {
  c:= fanIn(boring("Joe"), boring("Ann"))
  for i := 0; i < 10; i++ {
     fmt.Println(<-c)
  }

  fmt.Println("You are both boring; I'm leaving.")
}

Ticker and Timer

A time ticker is a mechanism provided by Go’s time package that emits a value at regular intervals, which can trigger actions in a goroutine. This pattern is commonly used for tasks such as periodic cleanup, data polling, or updating state at fixed intervals.

  1. Create the Ticker: Initialize a ticker with the desired interval using the time.NewTicker function.

  2. Handle Ticker Events: Use a goroutine to listen for events emitted by the ticker. This goroutine will perform the desired action each time the ticker fires.

  3. Stop the Ticker: Ensure the ticker is stopped when it’s no longer needed to avoid resource leaks.

func main() {
    ticker := time.NewTicker(1 * time.Second)
    defer ticker.Stop()

    done := make(chan bool)

    // Goroutine to handle ticker events
    go func() {
        for {
            select {
            case <-done:
                return
            case t := <-ticker.C:
                fmt.Println("Tick at", t)
                // Perform desired action here
            }
        }
    }()

    // Simulate work for 5 seconds
    time.Sleep(5 * time.Second)
    done <- true // Stop the ticker
    fmt.Println("Ticker stopped")
}

In simple cases, when only one channel is involved, you can range loop the ticker channel directly. This simplifies the code by removing the need for a select statement to handle multiple channels. Here’s an example of using a range loop with a ticker channel

func main() {
    ticker := time.NewTicker(1 * time.Second)
    go func() {
        // Range over ticker.C channel
        for t := range ticker.C {
            fmt.Println("Tick at", t)
            // Perform desired action here
        }
        fmt.Println("Ticker stopped")
    }()

    // Simulate work for 5 seconds
    time.Sleep(5 * time.Second)
    ticker.Stop() // Stop the ticker
}

Worker Pool

A worker pool is a common pattern used in concurrent programming to manage a group of worker goroutines that process tasks concurrently. This pattern is useful when you have a large number of tasks to process and want to limit the number of concurrent operations to avoid resource exhaustion.

The type of channel used to communicate tasks to the worker pool can be either buffered or unbuffered, depending on the desired behavior. As covered above, a buffered channel can help to smooth out bursts of tasks, while an unbuffered channel can provide more control over the flow of tasks.

// Job represents a task that can be executed by a worker
type Job func()
// worker represents a worker that executes jobs
// the jobs channel is used to receive tasks, we use a WaitGroup to wait for all jobs to complete
func worker(id int, jobs <-chan Job, wg *sync.WaitGroup) {
    for job := range jobs {
        fmt.Printf("Worker %d started job\n", id)
        job()
        fmt.Printf("Worker %d finished job\n", id)
        wg.Done()
    }
}

func main() {
    const numWorkers = 3
    jobs := make(chan Job, 5)
    // We will talk about WaitGroup later
    // but for know, just know this:
    // wg it is used to wait for all jobs to complete
    var wg sync.WaitGroup

    // Create and start workers.
    for i := 0; i < numWorkers; i++ {
        go worker(i, jobs, &wg)
    }

    // Enqueue jobs
    for j := 0; j < 10; j++ {
        wg.Add(1)
        jobs <- func() {
            time.Sleep(2 * time.Second) // Simulate time-consuming task
            fmt.Println("Job completed")
        }
    }

    // Wait for all jobs to complete
    wg.Wait()
    close(jobs) // Close the job channel to terminate workers
    fmt.Println("All jobs completed")
}

Sync

The sync package in Go provides synchronization primitives such as mutexes, condition variables, and once operations, among others.

Mutex and RWMutex

The sync package provides two types of mutexes: Mutex and RWMutex. A Mutex is a mutual exclusion lock, which means it provides a way to protect shared resources from being accessed by multiple goroutines at the same time. An RWMutex is a reader/writer mutual exclusion lock, which means it provides a way to protect shared resources from being accessed by multiple goroutines at the same time, but with the added ability to allow multiple readers to access the resource at the same time.

package main

import (
    "fmt"
    "sync"
    "time"
)

// Counter holds an integer value and a mutex for synchronization
type Counter struct {
    value int
    mutex sync.Mutex
}

// Increment safely increments the counter's value using the mutex
func (c *Counter) Increment() {
    c.mutex.Lock()   // Lock the mutex before accessing the value
    c.value++        // Increment the counter value
    c.mutex.Unlock() // Unlock the mutex after updating the value
}

func main() {
    counter := Counter{} // Create an instance of Counter

    // Start 100 goroutines to increment the counter
    for i := 0; i < 100; i++ {
        go counter.Increment()
    }

    // Wait a bit for all goroutines to finish
    time.Sleep(100 * time.Millisecond)

    fmt.Printf("Final Counter Value: %d\n", counter.value)
}

WaitGroup

The sync package provides a WaitGroup type that is useful for synchronizing goroutines. A WaitGroup waits for a collection of goroutines to finish. The main goroutine calls Add to set the number of goroutines to wait for. Then each of the goroutines runs and calls Done when finished. At the same time, the main goroutine calls Wait to block until all goroutines have finished. Again, back to our worker pool example:

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done() // Notify the WaitGroup that the worker is done

    fmt.Printf("Worker %d starting\n", id)
    time.Sleep(time.Second) // Simulate an expensive task
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup

    // Start 5 worker goroutines
    for i := 1; i <= 5; i++ {
        wg.Add(1) // Increment the WaitGroup counter for each worker
        go worker(i, &wg)
    }

    wg.Wait() // Wait for all workers to finish
    fmt.Println("All workers completed")
}

References


  1. The capacity of a channel is the number of values that can be held in the channel’s buffer. It is specified as the second argument to the make function when creating a channel. If no capacity is specified, the channel is unbuffered. ↩︎