当前位置: 首页 > news >正文

Go 多进程编程-管道

多进程编程–管道

文章目录

  • 多进程编程--管道
    • 管道
      • 简单使用
      • 用管道连接命令
    • 命名管道
      • 简单示例
      • 在go中使用命名管道

文章是书籍: 《 Go 并发编程实战 》的读书笔记.

管道

管道(pipe), 一种半双工(或者说是单向)通信方式, 之可以用于父进程子进程之间, 以及同祖先的子进程之间的通信.

比如 shell 命令:

ps aux | grep go

shell 会为每个命令都创建一个进程, 然后将左侧命令的标准输出与右边命令的标准输入连起来.

管道优点就是很简单, 缺点就是只能够单向通信以及对于通信双方的关系有严格限制.

简单使用

Go为管道提供了支持, 可以用os/exec包中的API创建管道

cmd0 := exec.Command("echo", "-n", "My first command comes from golang.")

对应于shell命令就是:

echo -n "My first command comes from golang."

exec.Cmd类型上有一个 Start 方法, 使用该函数以启动命令:

cmd0 := exec.Command("echo", "-n", "My first command comes from golang.")if err := cmd0.Start(); err != nil {fmt.Printf("Error: The command No.0 can not be startup: %s\n", err)return
}

为了创建一个能够获取此命令的输出管道, 需要在 if 语句之前加入一个管道创建语句:

cmd0 := exec.Command("echo", "-n", "My first command comes from golang.")stdout0, err := cmd0.StdoutPipe()
if err != nil {fmt.Printf("Error: Couldn't obtain the stdout pipe for command No.0: %s\n", err)return
}if err := cmd0.Start(); err != nil {fmt.Printf("Error: The command No.0 can not be startup: %s\n", err)return
}

变量cmd0StdoutPipe方法将会返回一个输出管道, 此处赋值给了变量 stdout0, stdout0 类型为io.ReadCloser, 这是一个接口, 他扩展了接口 io.Reader, 在此之上定义了可关闭的数据读取行为.

有了 stdout0, 启动上边这个命令之后, 可以调用 Read 方法获取命令输出:

cmd0 := exec.Command("echo", "-n", "My first command comes from golang.")stdout0, err := cmd0.StdoutPipe()
if err != nil {fmt.Printf("Error: Couldn't obtain the stdout pipe for command No.0: %s\n", err)return
}if err := cmd0.Start(); err != nil {fmt.Printf("Error: The command No.0 can not be startup: %s\n", err)return
}output0 := make([]byte, 30)
n, err := stdout0.Read(output0)
if err != nil {fmt.Printf("Error: Couldn't read data from the pipr: %s\n", err)return
}
fmt.Printf("%s\n", output0[:n])

调用 stdout0Read 方法, 将读出的输出数据写入切片 output0 ,

n根据情况不同而不同:

  • 如果命令的输出小于 output0 的长度 , 那么n就是 命令实际输出的字节数,
  • 否则n就是output0的长度(在这就是30)

一般来说发生第二种情况意味着没有将输出管道中的数据读取完(只读了一部分),

这时候需要再读一次或多次(用循环遍历读取),

如果输出管道中已经没有可以读的数据了(读完了), 那么Read方法的第二个返回值就将是io.EOF. 以此判断是否读完.

cmd0 := exec.Command("echo", "-n", "My first command comes from golang.")stdout0, err := cmd0.StdoutPipe()
if err != nil {fmt.Printf("Error: Couldn't obtain the stdout pipe for command No.0: %s\n", err)return
}if err := cmd0.Start(); err != nil {fmt.Printf("Error: The command No.0 can not be startup: %s\n", err)return
}var outputBuf0 bytes.Bufferfor {tempOutput := make([]byte, 5)n, err := stdout0.Read(tempOutput)if err != nil {if err == io.EOFbreakelse {fmt.Printf("Error: Couldn't read data from the pipe: %s\n", err)return}}if n > 0 {outputBuf0.Write(tempOutput[:n])}
}fmt.Printf("%s\n", outputBuf0.String())

为了观察方便将 tempOutput 设置为了一个较小值5, 为了收集每次迭代读取到的输出内容,将他们依次存放在一个缓冲区 outputBuf0

实际上为了方便起见, 可以一开始就使用带缓冲的读取器从输出管道中读数据:

cmd0 := exec.Command("echo", "-n", "My first command comes from golang.")stdout0, err := cmd0.StdoutPipe()
if err != nil {fmt.Printf("Error: Couldn't obtain the stdout pipe for command No.0: %s\n", err)return
}if err := cmd0.Start(); err != nil {fmt.Printf("Error: The command No.0 can not be startup: %s\n", err)return
}outputBuf0 := bufio.NewReader(stdout0)
output0, _, err := outputBuf0.ReadLine()
if err != nil {fmt.Printf("Error: Couldn't read data from the pipe: %s\n", err)
}fmt.Printf("%s\n", String(output0))

stdout0是一个 io.Reader 类型的变量, 因此能把他作为 bufio.NewReader 函数的参数传入,

函数 bufio.NewReader 返回一个 bufio.Reader 类型的值, 也就是一个缓冲读取器.

默认情况下, 这个缓冲读取器有一个长度为4096的缓冲区, 也就是说最大可以存放4096个字节.

cmd0 命令执行后只会输出一行内容, 所以直接用 ReadLine 读取即可.

ReadLine 第二个参数为bool类型, 代表当前行是否读取完毕, 如果为false那么还要继续读, 例子是一定能读完所以将该结果弃置.

用缓冲读取器的好处就是灵活且方便, 不使用的话就只能读出所有内容之后再处理.

用管道连接命令

管道最大作用就是将一个命令的输出作为另一个命令的输入:

cmd1 := exec.Command("ps", "aux")
cmd2 := exec.Command("grep", "apipe")

下面首先设置了 cmd1Stdout 字段, 然后启动 cmd1 等待其运行完毕

var outputBuf1 bytes.Buffer
cmd1.Stdout = &outputBuf1if err := cmd1.Start(); err != nil {fmt.Printf("Error: The first command can not be startup %s\n", err)return
}
if err := cmd1.Wait(); err != nil {fmt.Printf("Error: Couldn't wait for the first command: %s\n", err)return
}

cmd1Wait 方法调用将一直阻塞直到 cmd1 完全运行结束为止

之后再设置 cmd2StdinStdout 字段, 启动 cmd2 等待其运行完毕

cmd2.Stdin = &outputBuf1var outputBuf2 bytes.Buffer
cmd2.Stdout = &outputBuf2if err := cmd2.Start(); err != nil {fmt.Printf("Error: The second command can not be startup %s\n", err)return
}
if err := cmd2.Wait(); err != nil {fmt.Printf("Error: Couldn't wait for the second command: %s\n", err)return
}

在以上的两段代码中, outputBuf1 起到了管道的作用, 原因在于 *bytes.Buffer 类型实现了 io.Reader 接口, 因此可以把它赋值给 cmd1.Stdout 以及 cmd2.Stdin.

命名管道

简单示例

mkfifo -m 644 myfifo1
tee dst.log < myfifo1 &
cat src.log > myfifo1

在上面的示例中, 使用命令 mkfifo 在当前目录下创建了一个命名管道 myfifo1, 然后使用该命名管道和命令 teesrc.log 文件中的内容写入到了 dst.log 中.

作为一个简单的示例只是使用命名管道搬运了数据, 实际上还能在此基础上实现许多功能, 比如:

  • 数据过滤
  • 数据转换
  • 管道多路复用

命名管道默认为阻塞式, 也就是只有对一个命名管道的读操作和写操作都准备就绪之后数据才能流转

命名管道仍然是一个单向操作, 由因为能够实现多路复用, 所以有时要考虑多个进程同时向命名管道写数据情况下操作的原子性问题.

在go中使用命名管道

Go 在标准库 os 中提供了创建独立管道的 API :

reader, writer, err := os.Pipe()

函数 os.Pipe() 返回三个值:

  • reader 是一个 *os.File 类型的值, 代表了该管道输出端
  • writer 是一个 *os.File 类型的值, 代表了该管道输入端
  • 以上两者共同成为数据传递的渠道
  • err 代表可能发生的错误, 无错误时显然是 nil

Go 用系统函数创建管道, 将两端封装成两个 *os.File 类型的值, 例如有以下两段代码:

n, err := writer.Write(input)
if err != nil {fmt.Printf("Error: Couldn't write data to the named pipe: %s\n", err)
}
fmt.Printf("Written %d byte(s). [file-based pipe]\n", n)
output := make([]byte, 100)
m, err := reader.Read(output)
if err != nil {fmt.Printf("Error: Couldn't read data from the named pipe: %s\n", err)
}
fmt.Printf("Written %d byte(s). [file-based pipe]\n", n)

如果二者是并发运行的, 那么在 reader 之上调用 Read 方法就可以按顺序获取到之前通过调用 writerWrite 方法写入的数据.

此处强调并发运行, 原因在于系统提供的命名管道是一个阻塞的管道, Go 提供的也同样是系统风格的命名管道, 在这意味着如果串行执行, 那么代码百分百会阻塞在先调用的方法那里( (n, err := writer.Write(input)) 或者是 m, err := reader.Read(output)其一 ).

这里要注意: Go 通过 os.Pipe 函数生成的管道底层都是系统级别的管道, 也就意味着他们在关键行为上和系统管道保持一致, 比如:

  • 匿名管道会在管道缓冲区被写满之后使写数据的进程阻塞
  • 命名管道会在其中一段未就绪之前阻塞另一端的进程

命名管道是可以被多路复用的, 所以当有多个输入端同时写入数据时, 必须要考虑操作的原子性问题.

系统提供的管道不支持原子操作, 但是Go标准库的 io 包提供了一个基于内存的有原子性操作保证的管道(下面称为内存管道):

reader, writer := io.Pipe()

函数 io.Pipe() 返回二值:

  • 第一个值是类型为 *io.PipeReader 的值, 代表了该管道输出端, 只能调用 Read 方法从管道读数据
  • 第一个值是类型为 *io.PipeWriter 的值, 代表了该管道输入端, 只能调用 Write 方法向管道写数据

使用 Close 方法关闭管道某一端之后, 在另一端读或写将会得到一个预定义的 error 类型的值. 也可以通过调用 CloseWithError 自定义这种情况下得到的 error 类型的值.

Go 保证内存管道内部的操作都是原子的(通过 sync 包的API完成), 因此可以放心地并发读写数据. 这种管道(内存管道)不是基于文件系统的, 没有缓冲区作为中介, 所以通过该管道传递的数据只会被复制一次, 这样就提高了数据传递效率.

道输入端, 只能调用 Write 方法向管道写数据

使用 Close 方法关闭管道某一端之后, 在另一端读或写将会得到一个预定义的 error 类型的值. 也可以通过调用 CloseWithError 自定义这种情况下得到的 error 类型的值.

Go 保证内存管道内部的操作都是原子的(通过 sync 包的API完成), 因此可以放心地并发读写数据. 这种管道(内存管道)不是基于文件系统的, 没有缓冲区作为中介, 所以通过该管道传递的数据只会被复制一次, 这样就提高了数据传递效率.

http://www.dtcms.com/a/326123.html

相关文章:

  • C++方向知识汇总(三)
  • 面试实战 问题二十三 如何判断索引是否生效,什么样的sql会导致索引失效
  • git:分支
  • 3Ds Max的魔改利器:RailClone - 程序化建模的革命者
  • MySQL 经典练习 50 题(完美解答版,提供解题思路)
  • Spring Framework源码解析——DisposableBean
  • Oracle数据库中的Library cache lock和pin介绍
  • Java多线程并发控制:使用ReentrantLock实现生产者-消费者模型
  • js异步操作 Promise :fetch API 带来的网络请求变革—仙盟创梦IDE
  • 机器翻译:Bahdanau注意力和Luong注意力详解
  • 【浮点数存储】double类型注意点
  • 理解LangChain — Part 3:链式工作流与输出解析器
  • Notepad--:国产跨平台文本编辑器,Notepad++ 的理想替代方案
  • 写一篇Ping32和IP-Guard的对比,重点突出Ping32
  • 循环控制:break和continue用法
  • 鸿蒙flutter项目接入极光推送
  • Java项目基本流程(三)
  • Orange的运维学习日记--38.MariaDB详解与服务部署
  • linux安装和使用git
  • Elasticsearch 官方 Node.js 从零到生产
  • docker部署elasticsearch-8.11.1
  • 网络的基本概念、通信原理以及网络安全问题
  • YOLOv6深度解析:实时目标检测的新突破
  • 时序数据库为什么选IoTDB?
  • 爬虫与数据分析结合案例
  • STM32 HAL驱动MPU6050传感器
  • p6spy和p6spy-spring-boot-starter的SpringBoot3集成配置
  • 高性能Web服务器
  • java基础概念(二)----变量(附练习题)
  • Go 语言三大核心数据结构深度解析:数组、切片(Slice)与映射(Map)