Beginner tutorials are ok for getting a handle of the basics. But, as in every language, there is a big difference between simple tutorials we see, such as Go by example, on StackOverflow, or even by tech bloggers that simply repost the basics, and the kind of code that belongs in Production. Many of these concepts can be transposed into languages with asynchronous I/O such as node.js and python.
Read on if you want to see an opinionated pattern for:
Performant processing for I/O bound tasks
Elegant usage of Channels that adds concurrency (and maybe parallelism)
Fan-out/Fan-in with Channels for additional parallel processing
Usage of Contexts with Channels
Naive processing
consider the following pseudo code block to read a CSV, do something with the values, and then write out the file. Something like this can be found in beginner tutorials.
csvtransform.go:
lines := csv.ReadAll("data.csv")
outputLines := make([]string, len(lines))
for _, line := range lines {
outputLines = append(outputLines, transform(line))
}
csv.WriteLines("output.csv", outputLines)
Here at Elegant Engineer we consider the pros/cons of code and love patterns which deliver far superior properties despite being similar difficulty to implement.
Now lets consider the pros and cons of such code:
Pro: It’s simple to understand and reason about when errors happen (each step is all or nothing)
Con: The entirety of the
data.csv
file will be stored in memory multiple times. Imagine you had to process a 10GB file? (This was a production bug at a previous employer)Con: The entirety of the
data.csv
file must be read before the first transform() call occurs. For a 10GB file that could be20 seconds
of reading from an SSD.Con: The cpu will sit largely idle while all records are read from the CSV. Then a single core will be pinned while all the records are passed to transform() calls. Then it will be largely idle again when writing out.
Using Channels for Concurrency (Opinionated)
… and maybe parallelism too. (“Concurrency is not Parallelism”).
Consider the next block of pseudocode
csv_with_channels.go:
records := csv.Open("data.csv") // records is <-chan DataRecord
outRecords := transform(records) // outRecords is <-chan OutputRecord
csv.Write(outRecords) // reads all, writing each as received
func transform(inputs <-chan DataRecord) <-chan OutputRecord {
output := make(chan OutputRecord)
go func() {
defer close(output)
for input := range inputs {
output <- OutputRecord{
input.Name,
input.Salary * 2,
}
}
}()
return output
}
It contains a few opinionated patterns. Firstly, the scope which creates a channel should be the only one which closes it. That is, the producer on a channel is the only one which closes it, and only when it’s signaling that production of values on the channel is done. Second, the function returns a read-only version of that channel. No other producers should exist on that channel. It’s owned by that invocation of the function. Finally, we short return the channel allowing a go routine to concurrently begin producing items on the channel.
Now lets consider the pros and cons of such code:
Con: Uses a less familiar concept — channels. This is an O(1) cost as we learn to write Go, but benefits us many times over it’s execution and our careers.
Con: ultra low level performance details like processor/cache affinity or minimizing allocs will be even harder to control/predict in this design (advice: if you need that level of performance, Go may not be the right tool for your use case)
Pro: Only about 3 records will be in memory at a given time (depending on buffering in the io.Reader)
Pro: The first call to transform will happen immediately after the first record is read (eg: maybe 10K of data read instead of 10GB, 1 million times reduction)
Pro: Both in and out data streams will be concurrently active. Potentially allowing downstream first byte processing to occur sooner (if the output is a TCP connection, for example)
Pro: The CPU will have small spikes in usage as each record is produced, potentially using multiple cores (something like one for each step) though it could still be I/O bound in the case of SSD/Network access.
Fan-out/Fan-in for performance
… it’s not named after your CPU cooler.
Fan out / Fan in has a more formal meaning in logic gates. Used here Fan Out is when we spread out work across many processors. Fan In is when we fold in many streams into a single consumer. Here’s example code for Fan-Out, Fan-In:
fan_out.go:
// Example of fan-out into 10 output channels
func produceItemsFanOut(records csv.Reader) []<-chan []string {
outputs := make([]chan []string, 10)
go func() {
for i := 0; i < len(outputs); i++ {
defer close(outputs[i]) // close all channels when done
}
for i := 0; true; i = (i + 1) % 10 {
record, err := records.Read()
if err == io.EOF {
return
}
outputs[i] <- record // Round Robin the records across channels
// NB: buffered channels would avoid blocking on a channel
}
}()
// slice of ReadWrite is not compatible with slice of ReadOnly
returnVal := make([]<-chan []string, len(outputs))
for i := range outputs {
returnVal = append(returnVal, outputs[i])
}
return returnVal
}
fan_in.go:
// Fan in all the channels and add the values to emit in a single channel of output
func consumeItemsFanIn(out csv.Writer, inputChans ...<-chan []string) <-chan int {
output := make(chan int)
go func() {
var result int
defer close(output)
for len(inputChans) > 0 { // while remaining channels
for i := range inputChans {
select {
case record, open := <-inputChans[i]:
if !open {
// Remove the channel from the slice
inputChans = append(inputChans[:i], inputChans[i+1:]...)
}
out.Write(record)
result += 1 // counting records
default:
continue // do not block, go to next channel
}
}
}
output <- result
}()
return output
}
While we may be tempted to go for higher performance, we should always instrument our apps and ensure we have the data that this is a CPU heavy process and that we’re not ultimately I/O bound on the disk or network before doing something as complex as Fan-in/Fan-out.
Usage of Contexts with Channels
Ok. We’re still not production ready though. In production 𝖘𝖍𝖎𝖙 happens. HTTP requests timeout, users cancel operations, errors occur, and we may realize that processing a file has become useless due to expired deadlines. Contexts can be of help in such scenarios. And despite being about 7.5 yrs old, there still seems to be some mystery and confusion around the context package. This is going to brief, but comment below if you want a full write up of contexts later. The gist is that when a producer encounters a Done/Err context, close the channel allowing down stream code to unblock on their synchronous channel reads. Further, context.Cause added in go 1.20 can be used to pass along reasons the context ended.
// When looping over a channel (var inputs chan int)
for val := range inputs { // Ends when inputs is closed
if ctx.Err() != nil { // See context.Cancelled which is an error
return // break out of value production
}
outputs <- 2 * val
}
// When using select
select { // Blocks on first available channel
case <-ctx.Done():
return
case val := <-inputs[i]:
outputs <- 2 * val
}