Skip to main content

Streaming IO

前言

Go io 包提供了 io.Readerio.Writer 接口,分别用于数据的输入和输出操作,如下图所示:

Go 附带了许多 API,这些 API 支持数据源来自内存struct、文件、网络连接等资源流式 IO

这篇文章教你如何使用及自定义接口 io.Readerio.Writer

io.Reader

接口 io.Reader 可以将数据源读取到传输缓冲区中,在那里可以流式传输和使用它,如下所示:

io.Reader接口定义如下:

type Reader interface {
Read(p []byte) (n int, err error)
}
如何自定义Reader?

Read() 方法的实现应该返回读取的字节数,如果发生错误则返回错误。 如果已经读完,Read 的 err 应该返回 io.EOF。

reader都干了哪些事

Reader的行为取决于它的实现,但是通过 io.Reader 文档,当你调用read方法的时候:

  • 1, Read() 将尽可能的读满,也就是将 len(p) 读入 p。
  • 2, Read() 被调用后,n 可能小于 len(p)。
  • 3, 出错时,Read() 可能会返回缓冲区 p 中返回 n 个字节。 例如,从突然关闭的 TCP 套接字中读取。 根据你的需求,你可以选择将字节保留在 p 中或重试。
  • 4, 当 Read() 把数据全部读完时,读取器可能会返回非零 n 和 err=io.EOF。 但是,根据实现,Reader可能在流的末尾返回非零 n 和 err = nil。 在这种情况下,任何后续的Read()都必须返回 n=0,err=io.EOF。
  • 5, 最后需要注意的是, 调用 Read() 返回 n=0err=nil 并不意味着EOF作为下次Read()可能会返回更多数据。

As you can see, 直接从Reader中正确读取流可能很棘手。 幸运的是,标准库中的Reader遵循了易于流式传输的合理方法。 不过,在使用阅读器之前,最好查阅一下文档。

Streaming data from readers

利用标准库进行流式传输非常容易,方法 Read 被设计为在循环中调用,在每次迭代中,它从源数据读取一块数据并将其放入缓冲区 p 中。 此循环将继续,直到该方法返回 错误 io.EOF

下面是个利用标准库 string reader 进行流式传输的简单例子

package main

import (
"fmt"
"io"
"os"
"strings"
)

func main() {
reader := strings.NewReader("Clear is better than clever")
p := make([]byte, 4)

for {
n, err := reader.Read(p)
if err != nil {
if err == io.EOF {
fmt.Println(string(p[:n])) //should handle any remainding bytes.
break
}
fmt.Println(err)
os.Exit(1)
}
fmt.Println(string(p[:n]))
}
}

上述代码创建了一个4字节的缓冲区p,缓冲区故意保持小于源字符串的长度。 这是为了演示如何正确地从大于缓冲区的源中流式传输数据块。

Implementing a custom io.Reader

自定义一个io.Reader, 从一段数据中筛选出字母字符

package main

import (
"fmt"
"io"
)

type alphaReader struct {
src string
cur int
}

func newAlphaReader(src string) *alphaReader {
return &alphaReader{src: src}
}

func alpha(r byte) byte {
if (r >= 'A' && r <= 'Z') || (r >= 'a' && r <= 'z') {
return r
}
return 0
}

func (a *alphaReader) Read(p []byte) (int, error) {
if a.cur >= len(a.src) {
return 0, io.EOF
}

x := len(a.src) - a.cur
n, bound := 0, 0
if x >= len(p) {
bound = len(p)
} else if x <= len(p) {
bound = x
}

buf := make([]byte, bound)
for n < bound {
if char := alpha(a.src[a.cur]); char != 0 {
buf[n] = char
}
n++
a.cur++
}
copy(p, buf)
return n, nil
}

func main() {
reader := newAlphaReader("Hello! It's 9am, where is the sun?")
p := make([]byte, 4)
for {
n, err := reader.Read(p)
if err == io.EOF {
break
}
fmt.Print(string(p[:n]))
}
fmt.Println()
}

Chaining Readers

标准库已经实现了许多Reader。 将一个Reader作为另一个Reader的源是一种常见的用法。

这种Reader链允许一个Reader重用另一个Reader的逻辑,如下面的代码所示,它更新 alphaReader 以接受 io.Reader 作为其源。

这通过将流管理问题推送到根阅读器来降低代码的复杂性

package main

import (
"fmt"
"io"
"strings"
)

type alphaReader struct {
reader io.Reader
}

func newAlphaReader(reader io.Reader) *alphaReader {
return &alphaReader{reader: reader}
}

func alpha(r byte) byte {
if (r >= 'A' && r <= 'Z') || (r >= 'a' && r <= 'z') {
return r
}
return 0
}

func (a *alphaReader) Read(p []byte) (int, error) {
n, err := a.reader.Read(p)
if err != nil {
return n, err
}
buf := make([]byte, n)
for i := 0; i < n; i++ {
if char := alpha(p[i]); char != 0 {
buf[i] = char
}
}

copy(p, buf)
return n, nil
}

func main() {
// use an io.Reader as source for alphaReader
reader := newAlphaReader(strings.NewReader("Hello! It's 9am, where is the sun?"))
p := make([]byte, 4)
for {
n, err := reader.Read(p)
if err == io.EOF {
break
}
fmt.Print(string(p[:n]))
}
fmt.Println()
}

这种方法的另一个优点是 alphaReader 现在能够从任何阅读器实现中读取。 例如,下面的代码片段显示了如何将 alphaReader 与 os.File 源结合以过滤掉文件中的非字母字符:

package main

import (
"fmt"
"io"
"os"
)

type alphaReader struct {
reader io.Reader
}

func newAlphaReader(reader io.Reader) *alphaReader {
return &alphaReader{reader: reader}
}

func alpha(r byte) byte {
if (r >= 'A' && r <= 'Z') || (r >= 'a' && r <= 'z') {
return r
}
return 0
}

func (a *alphaReader) Read(p []byte) (int, error) {
n, err := a.reader.Read(p)
if err != nil {
return n, err
}
buf := make([]byte, n)
for i := 0; i < n; i++ {
if char := alpha(p[i]); char != 0 {
buf[i] = char
}
}

copy(p, buf)
return n, nil
}

func main() {
// use an os.File as source for alphaReader
file, err := os.Open("./a.txt")
if err != nil {
fmt.Println(err)
os.Exit(1)
}
defer file.Close()

reader := newAlphaReader(file)
p := make([]byte, 4)
for {
n, err := reader.Read(p)
if err == io.EOF {
break
}
fmt.Print(string(p[:n]))
}
fmt.Println()
}

io.Writer

Writer的典型代表io.Writer, 从缓冲区读取数据然后写入到另一个目标源中,如下图

所有的写入流必须实现下面的接口, Write方法从buffer p中读取数据 然后写入指定到目标源中

type Writer interface {
Write(p []byte) (n int, err error)
}

Using writers

标准库提供了许多预先实现的 io.Writer 类型。 直接使用这些 writer 会使写入操作会变得非常简单,如下面的代码片段所示,它使用类型 bytes.Buffer 作为 io.Writer 将数据写入内存缓冲区。

package main

import (
"bytes"
"fmt"
"os"
)

func main() {
proverbs := []string{
"Channels orchestrate mutexes serialize",
"Cgo is not Go",
"Errors are values",
"Don't panic",
}
var writer bytes.Buffer

for _, p := range proverbs {
n, err := writer.Write([]byte(p))
if err != nil {
fmt.Println(err)
os.Exit(1)
}
if n != len(p) {
fmt.Println("failed to write data")
os.Exit(1)
}
}

fmt.Println(writer.String())
}

Implementing a custom io.Writer

接着带大家来实现一个io.Writer,这里我们叫它chanWriter,chanWriter 将其内容作为字节序列写入 Golang的channel中。

package main

import "fmt"

type chanWriter struct {
ch chan byte
}

func newChanWriter() *chanWriter {
return &chanWriter{make(chan byte, 1024)}
}

func (w *chanWriter) Chan() <-chan byte {
return w.ch
}

func (w *chanWriter) Write(p []byte) (int, error) {
n := 0
for _, b := range p {
w.ch <- b
n++
}
return n, nil
}

func (w *chanWriter) Close() error {
close(w.ch)
return nil
}

func main() {
writer := newChanWriter()
go func() {
defer writer.Close()
writer.Write([]byte("Stream "))
writer.Write([]byte("me!"))
}()
for c := range writer.Chan() {
fmt.Printf("%c", c)
}
fmt.Println()
}

由于chanWriter还实现了io.Closer接口,调用方法 writer.Close() 以正确关闭channel,以避免在访问channel时出现任何死锁。

Useful types and packages for IO

golang标准库提供了许多好用的方法和数据类型使我们操作io时变得非常容易

os.File

类型 os.File 表示本地系统上的文件。 它同时实现了 io.Readerio.Writer,因此可以在任何流式 IO 上下文中使用。

例如,以下示例显示了如何将连续的字符串切片直接写入文件:

package main

import (
"fmt"
"os"
)

func main() {
wording := []string{
"How are you?\n",
"I'm Scott\n",
"Nice to see you!\n",
"Don't panic\n",
}
file, err := os.Create("./test.txt")
if err != nil {
fmt.Println(err)
os.Exit(1)
}
defer file.Close()

for _, p := range wording {
n, err := file.Write([]byte(p))
if err != nil {
fmt.Println(err)
os.Exit(1)
}
if n != len(p) {
fmt.Println("failed to write data")
os.Exit(1)
}
}
fmt.Println("file write done")
}

io.File 不仅可以作为Writer,还可以用作Reader,从本地文件系统中流式传输文件的内容。 例如,以下代码读取文件并打印其内容

package main

import (
"fmt"
"io"
"os"
)

func main() {
file, err := os.Open("./test.txt")
if err != nil {
fmt.Println(err)
os.Exit(1)
}
defer file.Close()

p := make([]byte, 4)
for {
n, err := file.Read(p)
if err == io.EOF {
break
}
fmt.Print(string(p[:n]))
}
}

Standard output, input, and error

os 包暴露了三个变量,os.Stdoutos.Stdinos.Stderr,它们的类型为 *os.File,分别表示 OS 标准输出、输入和错误的文件句柄。

例如,以下代码直接将结果打印到标准输出:

package main

import (
"fmt"
"os"
)

func main() {
wording := []string{
"How are you?\n",
"I'm Scott\n",
"Nice to see you!\n",
"Don't panic\n",
}

for _, p := range wording {
n, err := os.Stdout.Write([]byte(p))
if err != nil {
fmt.Println(err)
os.Exit(1)
}
if n != len(p) {
fmt.Println("failed to write data")
os.Exit(1)
}
}
}

io.Copy()

函数 func Copy(dst Writer, src Reader) (written int64, err error) 可以轻松地将数据流从Reader传输到Writer中。 它抽象出 for-loop 模式(到目前为止我们已经看到)并正确处理了 io.EOF 和字节计数。

下面代码示例是之前程序的简化版本,它将Reader(内存)的内容复制到Writer(文件)中:

package main

import (
"bytes"
"fmt"
"io"
"os"
)

func main() {
wording := new(bytes.Buffer)
wording.WriteString("How are you?\n")
wording.WriteString("I'm Scott\n")
wording.WriteString("Nice to see you!\n")
wording.WriteString("Don't panic\n")

file, err := os.Create("./test.txt")
if err != nil {
fmt.Println(err)
os.Exit(1)
}
defer file.Close()

// copy from reader data into writer file
if _, err := io.Copy(file, wording); err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Println("file created")
}

可以看到test.txt被成功创建

类似地,我们可以使用 io.Copy() 函数重写之前从文件读取并打印到标准输出的程序,如下所示:

package main

import (
"fmt"
"io"
"os"
)

func main() {
file, err := os.Open("./test.txt")
if err != nil {
fmt.Println(err)
os.Exit(1)
}
defer file.Close()

if _, err := io.Copy(os.Stdout, file); err != nil {
fmt.Println(err)
os.Exit(1)
}
}

io.WriteString()

此函数func WriteString(w Writer, s string) (n int, err error)提供了将字符串写入指定Writer的便利:

package main

import (
"fmt"
"io"
"os"
)

func main() {
file, err := os.Create("./test.txt")
if err != nil {
fmt.Println(err)
os.Exit(1)
}
defer file.Close()
if _, err := io.WriteString(file, "Scott is very nice!"); err != nil {
fmt.Println(err)
os.Exit(1)
}
}

text.txt内容:

运行代码之后,可以看到原来的数据被覆盖了

Pipe writers & readers

io.PipeWriterio.PipeReader 类型对 IO 操作进行建模,就像在内存管道中一样。 数据被写入管道的写入端,然后使用goroutine 在管道的读取端读取。 下面使用 io.Pipe() 创建管道 PipeWriter/PipeReader 对,然后把缓冲区的数据写入到 io.Stdout

  • func Pipe() (*PipeReader, *PipeWriter)
package main

import (
"bytes"
"io"
"os"
)

func main() {
wording := new(bytes.Buffer)
wording.WriteString("How are you?\n")
wording.WriteString("I'm Scott\n")
wording.WriteString("Nice to see you!\n")
wording.WriteString("Don't panic\n")

piper, pipew := io.Pipe()

// write in writer end of pipe
go func() {
defer pipew.Close()
io.Copy(pipew, wording)
}()

// read from reader end of pipe.
io.Copy(os.Stdout, piper)
piper.Close()
}

Buffered IO

Go 通过包 bufio 支持缓冲 IO,这使得处理文本内容变得容易。 例如,以下程序逐行读取以值 \n分隔的文件的内容:

package main

import (
"bufio"
"fmt"
"io"
"os"
)

func main() {
file, err := os.Open("./test.txt")
if err != nil {
fmt.Println(err)
os.Exit(1)
}
defer file.Close()
reader := bufio.NewReader(file) //func NewReader(rd io.Reader) *bufio.Reader

for {
line, err := reader.ReadString('\n')
if err != nil {
if err == io.EOF {
break
} else {
fmt.Println(err)
os.Exit(1)
}
}
fmt.Print(line)
}
}

可以看到bufio.Reader在调用ReadString()方法时是有bug的:最后一句没有读出来,原因是最后一行没有分隔符

改善代码,建议用func (b *Reader) ReadLine() (line []byte, isPrefix bool, err error)来逐行读取

package main

import (
"bufio"
"fmt"
"io"
"os"
)

func main() {
file, err := os.Open("./test.txt")
if err != nil {
fmt.Println(err)
os.Exit(1)
}
defer file.Close()
reader := bufio.NewReader(file)

for {
line, _, err := reader.ReadLine()
if err != nil {
if err == io.EOF {
break
} else {
fmt.Println(err)
os.Exit(1)
}
}
fmt.Print(string(line) + "\n")
}
}

完美运行

Util package

ioutil 包是 io 的子包,为 IO 提供了几个方便的功能,下面是它的API

示例 ReadFile 读取文件并输出

package main

import (
"fmt"
"io/ioutil"
"os"
)

func main() {
bytes, err := ioutil.ReadFile("./test.txt")
if err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Printf("%s", string(bytes))
}