Streaming IO
前言
Go io 包提供了 io.Reader
和 io.Writer
接口,分别用于数据的输入和输出操作,如下图所示:
Go 附带了许多 API,这些 API 支持数据源来自内存struct、文件、网络连接等资源流式 IO
这篇文章教你如何使用及自定义接口 io.Reader
和 io.Writer
io.Reader
接口 io.Reader
可以将数据源读取到传输缓冲区中,在那里可以流式传输和使用它,如下所示:
io.Reader
接口定义如下:
type Reader interface {
Read(p []byte) (n int, err error)
}
Read()
方法的实现应该返回读取的字节数,如果发生错误则返回错误。 如果已经读完,Read 的 err 应该返回 io.EOF。
reader都干了哪些事
Reader的行为取决于它的实现,但是通过 io.Reader
文档,当你调用read方法的时候:
- 1, Read() 将尽可能的读满,也就是将 len(p) 读入 p。
- 2, Read() 被调用后,n 可能小于 len(p)。
- 3, 出错时,Read() 可能会返回缓冲区 p 中返回 n 个字节。 例如,从突然关闭的 TCP 套接字中读取。 根据你的需求,你可以选择将字节保留在 p 中或重试。
- 4, 当 Read() 把数据全部读完时,读取器可能会返回非零 n 和
err=io.EOF
。 但是,根据实现,Reader可能在流的末尾返回非零 n 和 err = nil。 在这种情况下,任何后续的Read()都必须返回 n=0,err=io.EOF。 - 5, 最后需要注意的是, 调用 Read() 返回
n=0
和err=nil
并不意味着EOF作为下次Read()可能会返回更多数据。
As you can see, 直接从Reader中正确读取流可能很棘手。 幸运的是,标准库中的Reader遵循了易于流式传输的合理方法。 不过,在使用阅读器之前,最好查阅一下文档。
Streaming data from readers
利用标准库进行流式传输非常容易,方法 Read 被设计为在循环中调用,在每次迭代中,它从源数据读取一块数据并将其放入缓冲区 p 中。 此循环将继续,直到该方法返回 错误 io.EOF
。
下面是个利用标准库 string reader 进行流式传输的简单例子
package main
import (
"fmt"
"io"
"os"
"strings"
)
func main() {
reader := strings.NewReader("Clear is better than clever")
p := make([]byte, 4)
for {
n, err := reader.Read(p)
if err != nil {
if err == io.EOF {
fmt.Println(string(p[:n])) //should handle any remainding bytes.
break
}
fmt.Println(err)
os.Exit(1)
}
fmt.Println(string(p[:n]))
}
}
上述代码创建了一个4字节的缓冲区p,缓冲区故意保持小于源字符串的长度。 这是为了演示如何正确地从大于缓冲区的源中流式传输数据块。
Implementing a custom io.Reader
自定义一个io.Reader
, 从一段数据中筛选出字母字符
package main
import (
"fmt"
"io"
)
type alphaReader struct {
src string
cur int
}
func newAlphaReader(src string) *alphaReader {
return &alphaReader{src: src}
}
func alpha(r byte) byte {
if (r >= 'A' && r <= 'Z') || (r >= 'a' && r <= 'z') {
return r
}
return 0
}
func (a *alphaReader) Read(p []byte) (int, error) {
if a.cur >= len(a.src) {
return 0, io.EOF
}
x := len(a.src) - a.cur
n, bound := 0, 0
if x >= len(p) {
bound = len(p)
} else if x <= len(p) {
bound = x
}
buf := make([]byte, bound)
for n < bound {
if char := alpha(a.src[a.cur]); char != 0 {
buf[n] = char
}
n++
a.cur++
}
copy(p, buf)
return n, nil
}
func main() {
reader := newAlphaReader("Hello! It's 9am, where is the sun?")
p := make([]byte, 4)
for {
n, err := reader.Read(p)
if err == io.EOF {
break
}
fmt.Print(string(p[:n]))
}
fmt.Println()
}
Chaining Readers
标准库已经实现了许多Reader。 将一个Reader作为另一个Reader的源是一种常见的用法。
这种Reader链允许一个Reader重用另一个Reader的逻辑,如下面的代码所示,它更新 alphaReader 以接受 io.Reader 作为其源。
这通过将流管理问题推送到根阅读器来降低代码的复杂性
package main
import (
"fmt"
"io"
"strings"
)
type alphaReader struct {
reader io.Reader
}
func newAlphaReader(reader io.Reader) *alphaReader {
return &alphaReader{reader: reader}
}
func alpha(r byte) byte {
if (r >= 'A' && r <= 'Z') || (r >= 'a' && r <= 'z') {
return r
}
return 0
}
func (a *alphaReader) Read(p []byte) (int, error) {
n, err := a.reader.Read(p)
if err != nil {
return n, err
}
buf := make([]byte, n)
for i := 0; i < n; i++ {
if char := alpha(p[i]); char != 0 {
buf[i] = char
}
}
copy(p, buf)
return n, nil
}
func main() {
// use an io.Reader as source for alphaReader
reader := newAlphaReader(strings.NewReader("Hello! It's 9am, where is the sun?"))
p := make([]byte, 4)
for {
n, err := reader.Read(p)
if err == io.EOF {
break
}
fmt.Print(string(p[:n]))
}
fmt.Println()
}
这种方法的另一个优点是 alphaReader 现在能够从任何阅读器实现中读取。 例如,下面的代码片段显示了如何将 alphaReader 与 os.File 源结合以过滤掉文件中的非字母字符:
package main
import (
"fmt"
"io"
"os"
)
type alphaReader struct {
reader io.Reader
}
func newAlphaReader(reader io.Reader) *alphaReader {
return &alphaReader{reader: reader}
}
func alpha(r byte) byte {
if (r >= 'A' && r <= 'Z') || (r >= 'a' && r <= 'z') {
return r
}
return 0
}
func (a *alphaReader) Read(p []byte) (int, error) {
n, err := a.reader.Read(p)
if err != nil {
return n, err
}
buf := make([]byte, n)
for i := 0; i < n; i++ {
if char := alpha(p[i]); char != 0 {
buf[i] = char
}
}
copy(p, buf)
return n, nil
}
func main() {
// use an os.File as source for alphaReader
file, err := os.Open("./a.txt")
if err != nil {
fmt.Println(err)
os.Exit(1)
}
defer file.Close()
reader := newAlphaReader(file)
p := make([]byte, 4)
for {
n, err := reader.Read(p)
if err == io.EOF {
break
}
fmt.Print(string(p[:n]))
}
fmt.Println()
}
io.Writer
Writer的典型代表io.Writer
, 从缓冲区读取数据然后写入到另一个目标源中,如下图
所有的写入流必须实现下面的接口, Write方法从buffer p中读取数据 然后写入指定到目标源中
type Writer interface {
Write(p []byte) (n int, err error)
}
Using writers
标准库提供了许多预先实现的 io.Writer
类型。 直接使用这些 writer 会使写入操作会变得非常简单,如下面的代码片段所示,它使用类型 bytes.Buffer
作为 io.Writer
将数据写入内存缓冲区。
package main
import (
"bytes"
"fmt"
"os"
)
func main() {
proverbs := []string{
"Channels orchestrate mutexes serialize",
"Cgo is not Go",
"Errors are values",
"Don't panic",
}
var writer bytes.Buffer
for _, p := range proverbs {
n, err := writer.Write([]byte(p))
if err != nil {
fmt.Println(err)
os.Exit(1)
}
if n != len(p) {
fmt.Println("failed to write data")
os.Exit(1)
}
}
fmt.Println(writer.String())
}
Implementing a custom io.Writer
接着带大家来实现一个io.Writer
,这里我们叫它chanWriter,chanWriter 将其内容作为字节序列写入 Golang的channel中。
package main
import "fmt"
type chanWriter struct {
ch chan byte
}
func newChanWriter() *chanWriter {
return &chanWriter{make(chan byte, 1024)}
}
func (w *chanWriter) Chan() <-chan byte {
return w.ch
}
func (w *chanWriter) Write(p []byte) (int, error) {
n := 0
for _, b := range p {
w.ch <- b
n++
}
return n, nil
}
func (w *chanWriter) Close() error {
close(w.ch)
return nil
}
func main() {
writer := newChanWriter()
go func() {
defer writer.Close()
writer.Write([]byte("Stream "))
writer.Write([]byte("me!"))
}()
for c := range writer.Chan() {
fmt.Printf("%c", c)
}
fmt.Println()
}
由于chanWriter
还实现了io.Closer
接口,调用方法 writer.Close()
以正确关闭channel,以避免在访问channel时出现任何死锁。
Useful types and packages for IO
golang标准库提供了许多好用的方法和数据类型使我们操作io时变得非常容易
os.File
类型 os.File
表示本地系统上的文件。 它同时实现了 io.Reader
和 io.Writer
,因此可以在任何流式 IO 上下文中使用。
例如,以下示例显示了如何将连续的字符串切片直接写入文件:
package main
import (
"fmt"
"os"
)
func main() {
wording := []string{
"How are you?\n",
"I'm Scott\n",
"Nice to see you!\n",
"Don't panic\n",
}
file, err := os.Create("./test.txt")
if err != nil {
fmt.Println(err)
os.Exit(1)
}
defer file.Close()
for _, p := range wording {
n, err := file.Write([]byte(p))
if err != nil {
fmt.Println(err)
os.Exit(1)
}
if n != len(p) {
fmt.Println("failed to write data")
os.Exit(1)
}
}
fmt.Println("file write done")
}
io.File
不仅可以作为Writer,还可以用作Reader,从本地文件系统中流式传输文件的内容。 例如,以下代码读取文件并打印其内容
package main
import (
"fmt"
"io"
"os"
)
func main() {
file, err := os.Open("./test.txt")
if err != nil {
fmt.Println(err)
os.Exit(1)
}
defer file.Close()
p := make([]byte, 4)
for {
n, err := file.Read(p)
if err == io.EOF {
break
}
fmt.Print(string(p[:n]))
}
}
Standard output, input, and error
os 包暴露了三个变量,os.Stdout
、os.Stdin
和 os.Stderr
,它们的类型为 *os.File
,分别表示 OS 标准输出、输入和错误的文件句柄。
例如,以下代码直接将结果打印到标准输出:
package main
import (
"fmt"
"os"
)
func main() {
wording := []string{
"How are you?\n",
"I'm Scott\n",
"Nice to see you!\n",
"Don't panic\n",
}
for _, p := range wording {
n, err := os.Stdout.Write([]byte(p))
if err != nil {
fmt.Println(err)
os.Exit(1)
}
if n != len(p) {
fmt.Println("failed to write data")
os.Exit(1)
}
}
}
io.Copy()
函数 func Copy(dst Writer, src Reader) (written int64, err error)
可以轻松地将数据流从Reader传输到Writer中。 它抽象出 for-loop
模式(到目前为止我们已经看到)并正确处理了 io.EOF
和字节计数。
下面代码示例是之前程序的简化版本,它将Reader(内存)的内容复制到Writer(文件)中:
package main
import (
"bytes"
"fmt"
"io"
"os"
)
func main() {
wording := new(bytes.Buffer)
wording.WriteString("How are you?\n")
wording.WriteString("I'm Scott\n")
wording.WriteString("Nice to see you!\n")
wording.WriteString("Don't panic\n")
file, err := os.Create("./test.txt")
if err != nil {
fmt.Println(err)
os.Exit(1)
}
defer file.Close()
// copy from reader data into writer file
if _, err := io.Copy(file, wording); err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Println("file created")
}
可以看到test.txt被成功创建
类似地,我们可以使用 io.Copy()
函数重写之前从文件读取并打印到标准输出的程序,如下所示:
package main
import (
"fmt"
"io"
"os"
)
func main() {
file, err := os.Open("./test.txt")
if err != nil {
fmt.Println(err)
os.Exit(1)
}
defer file.Close()
if _, err := io.Copy(os.Stdout, file); err != nil {
fmt.Println(err)
os.Exit(1)
}
}
io.WriteString()
此函数func WriteString(w Writer, s string) (n int, err error)
提供了将字符串写入指定Writer
的便利:
package main
import (
"fmt"
"io"
"os"
)
func main() {
file, err := os.Create("./test.txt")
if err != nil {
fmt.Println(err)
os.Exit(1)
}
defer file.Close()
if _, err := io.WriteString(file, "Scott is very nice!"); err != nil {
fmt.Println(err)
os.Exit(1)
}
}
text.txt内容:
运行代码之后,可以看到原来的数据被覆盖了
Pipe writers & readers
io.PipeWriter
和 io.PipeReader
类型对 IO 操作进行建模,就像在内存管道中一样。 数据被写入管道的写入端,然后使用goroutine 在管道的读取端读取。 下面使用 io.Pipe()
创建管道 PipeWriter/PipeReader 对,然后把缓冲区的数据写入到 io.Stdout
:
func Pipe() (*PipeReader, *PipeWriter)
package main
import (
"bytes"
"io"
"os"
)
func main() {
wording := new(bytes.Buffer)
wording.WriteString("How are you?\n")
wording.WriteString("I'm Scott\n")
wording.WriteString("Nice to see you!\n")
wording.WriteString("Don't panic\n")
piper, pipew := io.Pipe()
// write in writer end of pipe
go func() {
defer pipew.Close()
io.Copy(pipew, wording)
}()
// read from reader end of pipe.
io.Copy(os.Stdout, piper)
piper.Close()
}
Buffered IO
Go 通过包 bufio 支持缓冲 IO,这使得处理文本内容变得容易。 例如,以下程序逐行读取以值 \n
分隔的文件的内容:
package main
import (
"bufio"
"fmt"
"io"
"os"
)
func main() {
file, err := os.Open("./test.txt")
if err != nil {
fmt.Println(err)
os.Exit(1)
}
defer file.Close()
reader := bufio.NewReader(file) //func NewReader(rd io.Reader) *bufio.Reader
for {
line, err := reader.ReadString('\n')
if err != nil {
if err == io.EOF {
break
} else {
fmt.Println(err)
os.Exit(1)
}
}
fmt.Print(line)
}
}
可以看到bufio.Reader在调用ReadString()方法时是有bug的:最后一句没有读出来,原因是最后一行没有分隔符
改善代码,建议用func (b *Reader) ReadLine() (line []byte, isPrefix bool, err error)
来逐行读取
package main
import (
"bufio"
"fmt"
"io"
"os"
)
func main() {
file, err := os.Open("./test.txt")
if err != nil {
fmt.Println(err)
os.Exit(1)
}
defer file.Close()
reader := bufio.NewReader(file)
for {
line, _, err := reader.ReadLine()
if err != nil {
if err == io.EOF {
break
} else {
fmt.Println(err)
os.Exit(1)
}
}
fmt.Print(string(line) + "\n")
}
}
完美运行
Util package
ioutil 包是 io 的子包,为 IO 提供了几个方便的功能,下面是它的API
示例 ReadFile 读取文件并输出
package main
import (
"fmt"
"io/ioutil"
"os"
)
func main() {
bytes, err := ioutil.ReadFile("./test.txt")
if err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Printf("%s", string(bytes))
}