Go: Streaming Data via io.Pipe()
Go: Streaming Data via io.Pipe()
In your production app, you might have come across a scenario where you have a large number of records sitting in a database and you need to send it over a network, without blowing up the application memory. A typical example is wanting to extract all of that data and dump it into S3, or send it over an HTTP endpoint. The go standard library has a simple, yet powerful tool that can be leveraged for this.
io.Pipe() provides an interface to create a unidirectional channel for synchronous flow of data between a sender and a receiver, without the additional overhead of a temporary in-memory buffer. If you have used unix pipes before, the idea is exactly the same. Here is how it works.
Send data to stdout
In this simple demonstration, a stream of records are written to the channel within a goroutine. Another goroutine (the main goroutine) reads the records from the stream, as they become available, and passes it to stdout for display.
package main
import (
"encoding/json"
"io"
"log"
"os"
"time"
)
func main() {
rdr, wrtr := io.Pipe()
go func() {
defer wrtr.Close() // writer must be closed to signal end of stream to reader
data := `["list record 1", "list record 2"]`
if writeE := json.NewEncoder(wrtr).Encode(data); writeE != nil {
log.Fatalf("error writing to pipe: %s\n", writeE)
}
time.Sleep(1 * time.Second)
data = `["list record 3", "list record 4"]`
if writeE := json.NewEncoder(wrtr).Encode(data); writeE != nil {
log.Fatalf("error writing to pipe: %s\n", writeE)
}
}()
defer rdr.Close()
if _, err := io.Copy(os.Stdout, rdr); err != nil {
log.Fatal(err)
}
}
Send data over HTTP
Similarly, you can send data over an HTTP endpoint in chunks without explicit buffering.
package main
import (
"encoding/json"
"io"
"log"
"net/http"
"os"
"time"
)
func main() {
rdr, wrtr := io.Pipe()
go func() {
defer wrtr.Close() // writer must be closed to signal end of stream to reader
log.Println("sending batch 1")
data := `["list record 1", "list record 2"]`
if writeE := json.NewEncoder(wrtr).Encode(data); writeE != nil {
log.Fatalf("error writing to pipe: %s\n", writeE)
}
time.Sleep(1 * time.Second)
log.Println("sending batch 2")
data = `["list record 3", "list record 4"]`
if writeE := json.NewEncoder(wrtr).Encode(data); writeE != nil {
log.Fatalf("error writing to pipe: %s\n", writeE)
}
}()
defer rdr.Close()
resp, err := http.Post("https://httpbin.org/anything", "application/json", rdr)
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
io.Copy(os.Stdout, resp.Body) // display the response
}
This approach can be used in orchestrating a memory efficient workflow to pull data in chunks from one source, and sending it over the network without allocating unnecessary in-memory buffers. An example would be pulling a large dataset from the database and streaming it directly to S3 via a multipart upload.
Points to note
- The reader blocks until the writer sends bytes to the pipe or the write end of the pipe is closed.
- A write call blocks until the reader consumes all the bytes written to the pipe or the read end is closed.
- Due to the synchronous blocking nature, read and write operations must be performed in separate goroutines to prevent deadlocks.
io.Pipe()is probably an overkill for simple use cases processing smaller payloads that are better off using an in-memory buffer.
Under the hood
Current version of Go implements the in-memory pipe using channels for synchronization and passing the data between read and write ends.