Go: Streaming Data via io.Pipe()

In your production app, you might have come across a scenario where you have a large number of records sitting in a database and you need to send it over a network, without blowing up the application memory. A typical example is wanting to extract all of that data and dump it into S3, or send it over an HTTP endpoint. The go standard library has a simple, yet powerful tool that can be leveraged for this.

io.Pipe() provides an interface to create a unidirectional channel for synchronous flow of data between a sender and a receiver, without the additional overhead of a temporary in-memory buffer. If you have used unix pipes before, the idea is exactly the same. Here is how it works.

Send data to stdout

In this simple demonstration, a stream of records are written to the channel within a goroutine. Another goroutine (the main goroutine) reads the records from the stream, as they become available, and passes it to stdout for display.

package main

import (
	"encoding/json"
	"io"
	"log"
	"os"
	"time"
)

func main() {
	rdr, wrtr := io.Pipe()

	go func() {
		defer wrtr.Close() // writer must be closed to signal end of stream to reader

		data := `["list record 1", "list record 2"]`
		if writeE := json.NewEncoder(wrtr).Encode(data); writeE != nil {
			log.Fatalf("error writing to pipe: %s\n", writeE)
		}

		time.Sleep(1 * time.Second)

		data = `["list record 3", "list record 4"]`
		if writeE := json.NewEncoder(wrtr).Encode(data); writeE != nil {
			log.Fatalf("error writing to pipe: %s\n", writeE)
		}
	}()

	defer rdr.Close()
	if _, err := io.Copy(os.Stdout, rdr); err != nil {
		log.Fatal(err)
	}
}

Send data over HTTP

Similarly, you can send data over an HTTP endpoint in chunks without explicit buffering.

package main

import (
	"encoding/json"
	"io"
	"log"
	"net/http"
	"os"
	"time"
)

func main() {
	rdr, wrtr := io.Pipe()

	go func() {
		defer wrtr.Close() // writer must be closed to signal end of stream to reader

		log.Println("sending batch 1")
		data := `["list record 1", "list record 2"]`
		if writeE := json.NewEncoder(wrtr).Encode(data); writeE != nil {
			log.Fatalf("error writing to pipe: %s\n", writeE)
		}

		time.Sleep(1 * time.Second)

		log.Println("sending batch 2")
		data = `["list record 3", "list record 4"]`
		if writeE := json.NewEncoder(wrtr).Encode(data); writeE != nil {
			log.Fatalf("error writing to pipe: %s\n", writeE)
		}
	}()

	defer rdr.Close()
	resp, err := http.Post("https://httpbin.org/anything", "application/json", rdr)
	if err != nil {
		log.Fatal(err)
	}
	defer resp.Body.Close()
	io.Copy(os.Stdout, resp.Body) // display the response
}

This approach can be used in orchestrating a memory efficient workflow to pull data in chunks from one source, and sending it over the network without allocating unnecessary in-memory buffers. An example would be pulling a large dataset from the database and streaming it directly to S3 via a multipart upload.

Points to note

  • The reader blocks until the writer sends bytes to the pipe or the write end of the pipe is closed.
  • A write call blocks until the reader consumes all the bytes written to the pipe or the read end is closed.
  • Due to the synchronous blocking nature, read and write operations must be performed in separate goroutines to prevent deadlocks.
  • io.Pipe() is probably an overkill for simple use cases processing smaller payloads that are better off using an in-memory buffer.

Under the hood

Current version of Go implements the in-memory pipe using channels for synchronization and passing the data between read and write ends.