Go Concurrency Patterns - A Deep Dive into Producer-Consumer, Fan-out/Fan-in, and Pipelines
Wenhao Wang
Dev Intern · Leapcell

Go's built-in concurrency features, primarily goroutines and channels, provide a powerful yet elegant way to write highly concurrent and parallel applications. Unlike traditional thread-based models, Go's approach simplifies concurrent programming, making it less prone to common pitfalls like deadlocks and race conditions. This article delves into several core concurrency patterns in Go: Producer-Consumer, Fan-out/Fan-in, and Pipelines, illustrating their implementation and benefits with practical examples.
The Foundations: Goroutines and Channels
Before diving into patterns, let's briefly recap the building blocks:
- Goroutines: Lightweight, independently executing functions. They are multiplexed onto a smaller number of OS threads, making them very efficient. You launch a goroutine by prepending
goto a function call:go myFunction(). - Channels: Typed conduits through which goroutines can communicate and synchronize. They are the "Go way" to share memory by communicating, rather than communicating by sharing memory. Think of them as pipes connecting concurrent components. You create a channel using
make(chan Type), send withch <- value, and receive withvalue := <-ch. Channels can be buffered (make(chan Type, capacity)) or unbuffered (make(chan Type)).
Pattern 1: Producer-Consumer
The Producer-Consumer pattern is a classic concurrency design where one or more "producers" generate data and place it into a shared buffer, while one or more "consumers" retrieve data from the buffer and process it. In Go, channels naturally serve as this shared buffer.
Why use it?
- Decoupling: Producers don't need to know how data is consumed, and consumers don't need to know how data is produced.
- Load Smoothing: If producers generate data at an irregular rate, a buffer can smooth out the flow to consumers.
- Concurrency: Producers and consumers can operate concurrently, potentially speeding up overall processing.
Example: File Processing with a Bounded Buffer
Let's imagine a scenario where we need to read lines from a large file (producer) and then process each line (consumer).
package main import ( "bufio" "fmt" "os" "strconv" "strings" "sync" "time" ) // LineProducer reads lines from a file and sends them to a channel. func LineProducer(filePath string, lines chan<- string, wg *sync.WaitGroup) { defer wg.Done() file, err := os.Open(filePath) if err != nil { fmt.Printf("Error opening file: %v\n", err) close(lines) // Ensure channel is closed on error return } defer file.Close() scanner := bufio.NewScanner(file) for scanner.Scan() { lines <- scanner.Text() // Send line to channel } if err := scanner.Err(); err != nil { fmt.Printf("Error reading file: %v\n", err) } close(lines) // Important: Close the channel to signal no more data } // LineConsumer processes lines received from a channel. func LineConsumer(id int, lines <-chan string, processedCount *int64, wg *sync.WaitGroup) { defer wg.Done() for line := range lines { // Simulate some CPU-intensive processing time.Sleep(time.Millisecond * 10) num, err := strconv.Atoi(strings.TrimSpace(line)) if err == nil { // fmt.Printf("Consumer %d processed: %d (squared: %d)\n", id, num, num*num) } else { // fmt.Printf("Consumer %d skipped non-integer line: %s\n", id, line) } // Safely increment processed count using a mutex or atomic operation // For simplicity, we'll use atomic.AddInt64 in main } fmt.Printf("Consumer %d finished.\n", id) } func main() { const ( numConsumers = 5 bufferSize = 100 // Buffered channel to smooth out producer/consumer rates filePath = "data.txt" ) // Create a dummy data.txt for demonstration createDummyFile(filePath, 1000) linesChannel := make(chan string, bufferSize) // Buffered channel var wg sync.WaitGroup var processed int64 // Use atomic for shared counter in real apps // Start the producer wg.Add(1) go LineProducer(filePath, linesChannel, &wg) // Start consumers for i := 0; i < numConsumers; i++ { wg.Add(1) go LineConsumer(i+1, linesChannel, &processed, &wg) } // Wait for all goroutines to finish wg.Wait() fmt.Printf("All producers and consumers finished.\n") } // Helper function to create a dummy file func createDummyFile(filePath string, numLines int) { file, err := os.Create(filePath) if err != nil { panic(err) } defer file.Close() writer := bufio.NewWriter(file) for i := 0; i < numLines; i++ { fmt.Fprintf(writer, "%d\n", i) } writer.Flush() fmt.Printf("Created dummy file: %s with %d lines.\n", filePath, numLines) }
In this example:
LineProduceris the producer, reading lines and sending them tolinesChannel.LineConsumerinstances are consumers, receiving lines fromlinesChanneland processing them.linesChannelacts as the bounded buffer. ThebufferSizeprevents the producer from running too far ahead of the consumers, potentially exhausting memory.sync.WaitGroupis crucial for main to wait until all producers and consumers have completed their work before exiting.- Closing the
linesChannelinLineProduceris vital. It signals to consumers that no more data will be sent, allowing theirfor line := range linesloops to terminate gracefully.
Pattern 2: Fan-out / Fan-in
The Fan-out / Fan-in pattern is about distributing a set of tasks among multiple worker goroutines (fan-out) and then collecting their results back into a single channel (fan-in). This pattern is excellent for parallelizing computations.
Why use it?
- Parallelism: Leverage multiple CPU cores or distribute work across a network.
- Scalability: Easily add more workers to handle increased load.
- Work Distribution: Break down a large problem into smaller, independent sub-problems.
Example: Parallel Squaring of Numbers
Let's say we have a list of numbers and we want to square them in parallel.
package main import ( "fmt" "sync" "time" ) // worker takes numbers from 'in' channel, squares them, and sends to 'out' channel. func worker(id int, in <-chan int, out chan<- int, wg *sync.WaitGroup) { defer wg.Done() for n := range in { squared := n * n // fmt.Printf("Worker %d: processing %d -> %d\n", id, n, squared) time.Sleep(time.Millisecond * 50) // Simulate work out <- squared } fmt.Printf("Worker %d finished.\n", id) } func main() { const ( numJobs = 20 numWorkers = 3 ) // Fan-out: Send jobs to multiple workers jobs := make(chan int, numJobs) results := make(chan int, numJobs) // Buffer results channel for fan-in var workerWG sync.WaitGroup // Start workers (Fan-out) for w := 1; w <= numWorkers; w++ { workerWG.Add(1) go worker(w, jobs, results, &workerWG) } // Send jobs to the jobs channel for j := 1; j <= numJobs; j++ { jobs <- j } close(jobs) // No more jobs to send // Wait for all workers to finish their current jobs // This also ensures all results are sent to the 'results' channel workerWG.Wait() close(results) // Important: Close results channel AFTER all workers are done // to signal that no more results will be produced for the fan-in collector. // Fan-in: Collect results fmt.Println("\nCollecting results:") for r := range results { fmt.Printf("Collected result: %d\n", r) } fmt.Println("All done!") }
Explanation:
jobschannel: This is where the initial tasks (numbers to be squared) are sent.resultschannel: This is where the squared numbers from all workers are collected.- Fan-out: We launch
numWorkersgoroutines (workerfunction), all reading from thejobschannel. - Job Distribution: The main goroutine sends numbers into the
jobschannel. Go's runtime automatically distributes these numbers to availableworkergoroutines. - Fan-in: The main goroutine then reads from the
resultschannel. Becauseresultsis closed only after all workers have finished and have had a chance to send their last results, thefor r := range resultsloop inmainwill correctly receive all produced results and then terminate.workerWGensures we wait for all workers.
Pattern 3: Pipelines
A pipeline is a series of stages, where the output of one stage becomes the input for the next. Each stage typically operates concurrently. In Go, pipelines are elegantly constructed using channels to connect the stages.
Why use it?
- Modularity: Break down complex operations into smaller, manageable, and reusable components.
- Concurrency: Each stage can run concurrently, processing data as it becomes available from the previous stage.
- Throughput: Data flows through the pipeline, often leading to higher throughput than sequential processing.
Example: Text Processing Pipeline
Let's build a pipeline that:
- Generates a sequence of numbers (producer).
- Filters out even numbers.
- Squares the remaining odd numbers.
- Prints the final results.
package main import ( "fmt" "sync" "time" ) // Generator stage: Produces numbers func generate(done <-chan struct{}, nums ...int) <-chan int { out := make(chan int) go func() { defer close(out) for _, n := range nums { select { case out <- n: case <-done: return } } }() return out } // Filter stage: Filters out even numbers func filterOdd(done <-chan struct{}, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { if n%2 != 0 { // Keep only odd numbers select { case out <- n: case <-done: return } } } }() return out } // Square stage: Squares numbers func square(done <-chan struct{}, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { squared := n * n select { case out <- squared: case <-done: return } } }() return out } func main() { // A done channel for graceful shutdown of all goroutines done := make(chan struct{}) defer close(done) // Ensure done is closed when main exits // Stage 1: Generate numbers numbers := generate(done, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) // Stage 2: Filter out even numbers oddNumbers := filterOdd(done, numbers) // Stage 3: Square the odd numbers squaredOddNumbers := square(done, oddNumbers) // Final stage: Consume and print results fmt.Println("Pipeline results:") for result := range squaredOddNumbers { fmt.Printf("Result: %d\n", result) time.Sleep(time.Millisecond * 10) // Simulate final processing } fmt.Println("Pipeline finished.") }
Key aspects of this pipeline:
- Chained Channels:
generatesends to a channel, which is then passed as input tofilterOdd, whose output channel is passed tosquare. <-chan intandchan<- int: Using these directional channel types improves safety and readability, clearly indicating whether a function sends or receives on a channel.- Graceful Shutdown (
donechannel): Thedonechannel is a common pattern for signaling to all goroutines in a pipeline to stop processing and exit. Whenmainexits,defer close(done)ensures that all goroutines listening ondonegracefully return, preventing goroutine leaks. This is especially important in long-running pipelines or if an error occurs early in the pipeline. - Each stage is an independent goroutine, working concurrently. As soon as
generateproduces a number,filterOddcan process it, and thensquarecan square it, without waiting for the entire input to be generated.
Combining Patterns and Best Practices
These patterns are not mutually exclusive; they can be combined to build sophisticated concurrent systems. For instance, a stage in a pipeline could itself be a fan-out/fan-in operation to parallelize a sub-task.
General Best Practices for Go Concurrency:
- Communicate by Sharing Memory, Not Share Memory by Communicating: This is Go's mantra. Use channels for communication and synchronization.
- Goroutines are Cheap, Use Them Liberally: Don't be afraid to launch many goroutines.
- Close Channels to Signal Completion: Always close channels when no more data will be sent. This unblocks
for ... rangeloops on the receiving end. - Use
sync.WaitGroupfor Waiting on Goroutines: Essential for ensuring all goroutines complete before the main program exits. - Handle Errors and Graceful Shutdowns: Implement mechanisms like the
donechannel or context for canceling operations and ensuring all goroutines clean up. - Avoid Global State where Possible: If shared state is unavoidable, protect it with
sync.Mutexorsync.RWMutex, or better yet, serialize access through a single goroutine (e.g., a "monitor" goroutine). - Consider
contextpackage for Cancellations and Deadlines: For more complex scenarios involving timeouts, deadlines, or cascaded cancellations, thecontextpackage is indispensable. - Buffer Channels Appropriately: Use buffered channels to smooth out bursts or to allow producers to get ahead without blocking, but be mindful of memory usage. Unbuffered channels enforce strict synchronization (rendezvous).
- Test Concurrency Thoroughly: Concurrency bugs can be subtle. Use the
-raceflag for the Go race detector (go run -race filename.goorgo test -race ./...).
Conclusion
Go's concurrency model, built upon goroutines and channels, provides an intuitive and powerful way to design concurrent applications. By understanding and applying patterns like Producer-Consumer, Fan-out/Fan-in, and Pipelines, developers can build robust, scalable, and efficient systems that effectively utilize modern multi-core processors. These patterns encourage modularity and maintainability, making concurrent programming in Go a much more enjoyable and less error-prone experience. Embrace them, and your Go applications will naturally become more concurrent and performant.

