Go
Concurrency

Concurrency

Concurrency vs Parallelism

Concurrency is the composition of independently executing processes, while parallelism is the simultaneous execution of (possibly related) computations.

Concurrency means multiple tasks which start, run, and complete in overlapping time periods, in no specific order. Parallelism is when multiple tasks OR several part of a unique task literally run at the same time, e.g. on a multi-core processor. Remember that Concurrency and parallelism are NOT the same thing.

The idea of concurrent programming is old. It has been used in many languages like oCaml, Erlang, etc. Go, however, introduces Channels as a primary concept of concurrency.

graph TD
    subgraph Concurrency
        A[Task 1] --> C{Switch}
        B[Task 2] --> C
        C --> A
        C --> B
        C -.-> D[Task 3]
        D -.-> C
    end

    subgraph Parallelism
        E[Task 1] --> G[Task 1 Complete]
        F[Task 2] --> H[Task 2 Complete]
        I[Task 3] --> J[Task 3 Complete]
    end

    style Concurrency fill:#118098,stroke:#333,stroke-width:2px
    style Parallelism fill:#7C5576,stroke:#333,stroke-width:2px

Process, Threads & Goroutines

  • Processes are independent execution units with their own resources, isolated from each other for security.
  • Kernel-level threads are managed by the operating system, allowing concurrent execution within a process and can utilize multiple CPU cores.
  • Green threads, (a.k.a user-level threads) managed in user space by a runtime library, are lightweight and offer efficient context switching but may not fully leverage multicore architectures as kernel threads do.

Each method offers different trade-offs in performance, isolation, and resource utilization.

Go’s Approach to Concurrency

Go has built-in support for concurrency, which is achieved using goroutines and channels. A goroutine is a lightweight thread managed by the Go runtime. Channels are a typed conduit through which you can send and receive values with the channel operator (<-).

Go implements a hybrid approach to concurrency that combines aspects of green threads and kernel threads. Goroutines behave similarly to green threads: they are lightweight, managed by the Go runtime, and allow for the creation of thousands of concurrent operations with minimal overhead.

Instead of being purely user-level threads, the Go scheduler maps these goroutines onto a smaller number of OS threads (kernel-level threads). This mapping allows Go programs to efficiently utilize multiple CPU cores while abstracting the complexity of direct thread management and synchronization. The runtime dynamically manages the allocation of goroutines to threads, enabling concurrent execution with efficient use of system resources. This approach allows Go to offer the benefits of green threads, such as low overhead and high scalability, while still leveraging the parallel execution capabilities of kernel threads.

An independently executing function, launched by a go statement. It is very cheap, it is practical to have even hundred of thousands of goroutines. It is not a thread, instead goroutines are multiplexed dynamically onto threads as needed to keep all the goroutines running. – Rob Pike

To create a goroutine, use the keyword go followed by a function invocation.

Channels

Don’t communicate by sharing memory. Share memory by communicating.

Channels in Go are a powerful feature used for communication between goroutines. You can think of channels as pipes through which you can send and receive values between different parts of a Go program that are running concurrently. Channels ensure that data is safely shared between goroutines, helping to avoid common pitfalls like race conditions.

To declare a channel, you use the chan keyword followed by the type of values that the channel will carry. The direction of the channel (send or receive) can be specified but is optional in a general declaration. Here’s how you declare a channel:


var myChannel chan int

// another way to declare a channel is to use the make function
// this is an unbuffered channel. More on it later
myChannel := make(chan int)

// and this is a buffered channel. This is a channel with a capacity of 10.
myBufferedChannel := make(chan int, 10)

To send a value into a channel, you use the channel variable, followed by the arrow operator (<-), and then the value to send:

myChannel <- 5 // Sends the integer 5 into myChannel

This operation will block if it’s an unbuffered channel and there’s no goroutine ready to receive the value, or if it’s a buffered channel that is already full.

To receive a value from a channel, you use the arrow operator (<-) followed by the channel variable:

value := <-myChannel // Receives a value from myChannel

This operation will block if the channel is empty until a value is sent on the channel.

You can close a channel when no more values will be sent on it. Closing a channel is done with the close function:

close(myChannel)

In function signatures, you can specify whether a channel is meant only to send or receive values. This is done by placing the arrow operator before or after the chan keyword:

func sendOnly(ch chan<- int) {
    ch <- 5 // This function can only send values to the channel
}

func receiveOnly(ch <-chan int) int {
    return <-ch // This function can only receive values from the channel
}

Types of Channels

A sender and a receiver must both be ready to play their part in the communication. Otherwise we wait until they are. Thus channels both communicate and synchronize. ⎯ Rob Pike

Unbuffered Channels

flowchart LR
    A[Goroutine A] -- "send value" --> B(Unbuffered Channel)
    B -- "waits for receive" --> C[Blocked Send]
    B -- "if receiver ready" --> D[Successful Send/Receive]
    E[Goroutine B] -- "receive value" --> B
    B -- "waits for send" --> F[Blocked Receive]
    B -- "if sender ready" --> D

    classDef goroutine fill:#748796,stroke:#333,stroke-width:2px;
    classDef operation fill:#077fed,stroke:#393,stroke-width:2px;
    class A,E goroutine;
    class B channel;
    class C,F operation;

Unbuffered channels are the most straightforward type of channel. When you send a value to an unbuffered channel, the send operation blocks until another goroutine reads from the channel. Similarly, when you read from an unbuffered channel, the read blocks until some goroutine writes a value to that channel.

package main

import (
    "fmt"
    "time"
)

func main() {
    message := make(chan string) // Creating an unbuffered channel

    go func() {
        message <- "Hello, World!" // Sending a value to the channel (blocks until received)
    }()

    time.Sleep(time.Second) // Simulating work to show blocking behavior
    received := <-message   // Receiving a value from the channel
    fmt.Println(received)
}

Buffered Channels

flowchart LR
    A[Goroutine A] -- "send value" --> B(Buffered Channel)
    B -- "holds value" --> C{Buffer}
    C -- "if full" --> D[Blocked Send]
    C -- "if not full" --> E[Successful Send]
    F[Goroutine B] -- "receive value" --> B
    B -- "if empty" --> G[Blocked Receive]
    B -- "if not empty" --> H[Successful Receive]

    classDef goroutine fill:#748796,stroke:#333,stroke-width:2px;
    classDef operation fill:#077fed,stroke:#393,stroke-width:2px;
    class A,F goroutine;
    class B channel;
    class D,E,G,H operation;

Buffered channels, on the other hand, have a capacity 1, meaning they can hold one or more values before blocking. When you send a value to a buffered channel, the operation only blocks if the channel’s buffer is full. Similarly, reading from a buffered channel only blocks if the channel is empty. Buffered channels are useful when you want to limit the amount of blocking, allowing a certain number of values to queue up in the channel.

package main

import "fmt"

func main() {
    message := make(chan string, 2) // Creating a buffered channel with capacity of 2

    message <- "Hello" // Sending a value to the channel (does not block if buffer is not full)
    message <- "World"    // Sending another value

    fmt.Println(<-message) // Receiving a value from the channel (Hello)
    fmt.Println(<-message) // Receiving another value (World)
}

Select

The select statement in Go is a powerful feature used for simultaneously waiting on multiple communication operations involving channels. It allows a goroutine to wait on multiple channel operations, blocking until one of its cases can proceed, which makes it particularly useful for concurrent programming to synchronize or manage multiple channel operations.

Here’s how select works:

  • The select statement blocks until one of its cases can proceed.
  • If multiple cases can proceed, one is chosen at random to execute.
  • It is often used with a default case, which is executed immediately if no other case is ready, making it possible to avoid blocking.

The select statement is essential for implementing complex concurrency patterns and handling asynchronous I/O operations without resorting to busy waiting or polling mechanisms.

package main

import (
    "fmt"
    "time"
)

func main() {
    c1 := make(chan string)
    c2 := make(chan string)

    go func() {
        time.Sleep(2 * time.Second)
        c1 <- "one"
    }()

    go func() {
        time.Sleep(1 * time.Second)
        c2 <- "two"
    }()

    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-c1:
            fmt.Println("Received", msg1)
        case msg2 := <-c2:
            fmt.Println("Received", msg2)
        }
    }
}

Patterns

Generator: function that returns a channel

A generator is a way to produce a series of values, one at a time, using a combination of functions, goroutines, and channels.

c := boring("boring!") // function returning a channel
for i := 0; i < 5; i++ {
  fmt.Printf("You say %q\n", <-c)
}
fmt.Println("You're boring; I'm leaving")

func boring(msg string) <-chan string{
  c:= make(chan string)
  go func() {
    for i := 0; i++ {
      c <- fmt.Sprintf("%s %d", msg, i)
      time.Sleep(time.Duration(rand.Intn(1e3)) * time.Milleseocond)
    }
  }()
  return c // Return the channel to the caller
}

Multiplexing

However if you have two channels, one will still block the other. Multiplexing refers to combining multiple channel operations into a single operation, typically using the select statement. This allows a goroutine to wait on multiple channels at once, responding as soon as one of the channel operations is ready.

We can instead use a fan-in function to let whoever is ready to talk.

func fanIn(input1, input2 <-chan string) <-chan string{
  c:= make(chan string)
  go func() {for {c <- <-input1}}()
  go func() {for {c <- <-input2}}()

  return c
}

func main() {
  c:= fanIn(boring("Joe"), boring("Ann"))
  for i := 0; i < 10; i++ {
     fmt.Println(<-c)
  }

  fmt.Println("You are both boring; I'm leaving.")
}

Ticker and Timer

A time ticker is a mechanism provided by Go’s time package that emits a value at regular intervals, which can trigger actions in a goroutine. This pattern is commonly used for tasks such as periodic cleanup, data polling, or updating state at fixed intervals.

  1. Create the Ticker: Initialize a ticker with the desired interval using the time.NewTicker function.

  2. Handle Ticker Events: Use a goroutine to listen for events emitted by the ticker. This goroutine will perform the desired action each time the ticker fires.

  3. Stop the Ticker: Ensure the ticker is stopped when it’s no longer needed to avoid resource leaks.

func main() {
    ticker := time.NewTicker(1 * time.Second)
    defer ticker.Stop()

    done := make(chan bool)

    // Goroutine to handle ticker events
    go func() {
        for {
            select {
            case <-done:
                return
            case t := <-ticker.C:
                fmt.Println("Tick at", t)
                // Perform desired action here
            }
        }
    }()

    // Simulate work for 5 seconds
    time.Sleep(5 * time.Second)
    done <- true // Stop the ticker
    fmt.Println("Ticker stopped")
}

In simple cases, when only one channel is involved, you can range loop the ticker channel directly. This simplifies the code by removing the need for a select statement to handle multiple channels. Here’s an example of using a range loop with a ticker channel

func main() {
    ticker := time.NewTicker(1 * time.Second)
    go func() {
        // Range over ticker.C channel
        for t := range ticker.C {
            fmt.Println("Tick at", t)
            // Perform desired action here
        }
        fmt.Println("Ticker stopped")
    }()

    // Simulate work for 5 seconds
    time.Sleep(5 * time.Second)
    ticker.Stop() // Stop the ticker
}

Worker Pool

A worker pool is a common pattern used in concurrent programming to manage a group of worker goroutines that process tasks concurrently. This pattern is useful when you have a large number of tasks to process and want to limit the number of concurrent operations to avoid resource exhaustion.

The type of channel used to communicate tasks to the worker pool can be either buffered or unbuffered, depending on the desired behavior. As covered above, a buffered channel can help to smooth out bursts of tasks, while an unbuffered channel can provide more control over the flow of tasks.

// Job represents a task that can be executed by a worker
type Job func()
// worker represents a worker that executes jobs
// the jobs channel is used to receive tasks, we use a WaitGroup to wait for all jobs to complete
func worker(id int, jobs <-chan Job, wg *sync.WaitGroup) {
    for job := range jobs {
        fmt.Printf("Worker %d started job\n", id)
        job()
        fmt.Printf("Worker %d finished job\n", id)
        wg.Done()
    }
}

func main() {
    const numWorkers = 3
    jobs := make(chan Job, 5)
    // We will talk about WaitGroup later
    // but for know, just know this:
    // wg it is used to wait for all jobs to complete
    var wg sync.WaitGroup

    // Create and start workers.
    for i := 0; i < numWorkers; i++ {
        go worker(i, jobs, &wg)
    }

    // Enqueue jobs
    for j := 0; j < 10; j++ {
        wg.Add(1)
        jobs <- func() {
            time.Sleep(2 * time.Second) // Simulate time-consuming task
            fmt.Println("Job completed")
        }
    }

    // Wait for all jobs to complete
    wg.Wait()
    close(jobs) // Close the job channel to terminate workers
    fmt.Println("All jobs completed")
}

Sync

The sync package in Go provides synchronization primitives such as mutexes, condition variables, and once operations, among others.

Mutex and RWMutex

The sync package provides two types of mutexes: Mutex and RWMutex. A Mutex is a mutual exclusion lock, which means it provides a way to protect shared resources from being accessed by multiple goroutines at the same time. An RWMutex is a reader/writer mutual exclusion lock, which means it provides a way to protect shared resources from being accessed by multiple goroutines at the same time, but with the added ability to allow multiple readers to access the resource at the same time.

package main

import (
    "fmt"
    "sync"
    "time"
)

// Counter holds an integer value and a mutex for synchronization
type Counter struct {
    value int
    mutex sync.Mutex
}

// Increment safely increments the counter's value using the mutex
func (c *Counter) Increment() {
    c.mutex.Lock()   // Lock the mutex before accessing the value
    c.value++        // Increment the counter value
    c.mutex.Unlock() // Unlock the mutex after updating the value
}

func main() {
    counter := Counter{} // Create an instance of Counter

    // Start 100 goroutines to increment the counter
    for i := 0; i < 100; i++ {
        go counter.Increment()
    }

    // Wait a bit for all goroutines to finish
    time.Sleep(100 * time.Millisecond)

    fmt.Printf("Final Counter Value: %d\n", counter.value)
}

WaitGroup

The sync package provides a WaitGroup type that is useful for synchronizing goroutines. A WaitGroup waits for a collection of goroutines to finish. The main goroutine calls Add to set the number of goroutines to wait for. Then each of the goroutines runs and calls Done when finished. At the same time, the main goroutine calls Wait to block until all goroutines have finished. Again, back to our worker pool example:

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done() // Notify the WaitGroup that the worker is done

    fmt.Printf("Worker %d starting\n", id)
    time.Sleep(time.Second) // Simulate an expensive task
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup

    // Start 5 worker goroutines
    for i := 1; i <= 5; i++ {
        wg.Add(1) // Increment the WaitGroup counter for each worker
        go worker(i, &wg)
    }

    wg.Wait() // Wait for all workers to finish
    fmt.Println("All workers completed")
}

References


  1. The capacity of a channel is the number of values that can be held in the channel’s buffer. It is specified as the second argument to the make function when creating a channel. If no capacity is specified, the channel is unbuffered. ↩︎