Linux 管道(pipe/FIFO)全指南:概念、语义、原子性、阻塞规则、实战代码与最佳实践
文章目录
- Linux 管道(pipe/FIFO)全指南:概念、语义、原子性、阻塞规则、实战代码与最佳实践
- 1. 管道是什么?(匿名管道 & 命名管道)
- 1.1 匿名管道(anonymous pipe)
- 1.2 命名管道(FIFO)
- 2. 标准用法:Shell 管道与 `dup2` 重定向
- 2.1 Shell 管道 `|`
- 2.2 自己在 C 里做一次
- 3. 阻塞语义与 EOF:open/read/write 的行为
- 3.1 打开 FIFO 的阻塞规则
- 3.2 读写与 EOF
- 3.3 非阻塞
- 4. 原子性(atomicity)与 `PIPE_BUF`
- 4.1 原子写的保证
- 4.2 `PIPE_BUF` vs `_PC_PIPE_BUF`
- 4.3 > `PIPE_BUF` 的写入怎么做?
- 5. 管道容量:满了会怎样?能否调大?
- 6. `pipe2`、`O_CLOEXEC`、`O_NONBLOCK`
- 7. 多路复用:`select` / `poll` / `epoll` 监控管道
- 8. FIFO 读写的健壮模板
- 8.1 写端(producer)
- 8.2 读端(consumer)
- 9. 进阶:双向通信、与 stdio 配合、零拷贝
- 9.1 双向通信
- 9.2 与 `stdio` 缓冲配合
- 9.3 零拷贝:`splice/tee`
- 10. 常见坑与排错
- 11. 与其它 IPC 的取舍
- 12. 参考小抄:关键 API 原型
- 13. 最后,用两段能直接运行的小示例收个尾
- 13.1 FIFO 写(10MB,原子分块 + 忽略 SIGPIPE)
- 13.2 FIFO 读(统计字节并打印行数)
- 一句话总结
Linux 管道(pipe/FIFO)全指南:概念、语义、原子性、阻塞规则、实战代码与最佳实践
一篇就够:从 shell 的
|到 C 语言的pipe()、mkfifo(),再到原子写入、阻塞/非阻塞、容量、dup2重定向、select/poll/epoll多路复用、SIGPIPE、pipe2、F_SETPIPE_SZ扩容、splice/tee零拷贝……把管道讲清楚、用明白。
1. 管道是什么?(匿名管道 & 命名管道)
1.1 匿名管道(anonymous pipe)
- 由
pipe(int fd[2])创建,返回两个文件描述符:fd[0](读端)、fd[1](写端)。 - 只存在于内存,没有路径名;通常用于 父子进程(或兄弟进程)间通信。
- 半双工:一个方向一条管道;想全双工用两条管道或直接用
socketpair()。
示意:
父进程 ──fork──▶ 子进程│ │├─写:fd[1] ─────┼─读:fd[0]└─读:fd[0] ─────┤
1.2 命名管道(FIFO)
- 由
mkfifo(const char* path, mode_t mode)创建,有文件路径(例如/tmp/myfifo)。 - 可以在无亲缘关系的进程间通信:一端
open(..., O_WRONLY),另一端open(..., O_RDONLY)。 - 依然是半双工;想双向通信要两条 FIFO。
2. 标准用法:Shell 管道与 dup2 重定向
2.1 Shell 管道 |
ls -l | grep foo | wc -l
- shell 背后做了:为每个
|建立pipe(),在对应子进程中用dup2(fd[1], STDOUT_FILENO)把标准输出接到管道写端;用dup2(fd[0], STDIN_FILENO)把标准输入接到管道读端,然后exec各个程序。
2.2 自己在 C 里做一次
int p[2]; pipe(p);
pid_t pid = fork();
if (pid == 0) { // 子:把 stdin 接到管道读端close(p[1]);dup2(p[0], STDIN_FILENO);close(p[0]);execlp("wc", "wc", "-l", (char*)NULL);_exit(127);
} else { // 父:往管道写端发送数据close(p[0]);dprintf(p[1], "a\nb\nc\n");close(p[1]); // 写端关掉,子端 read 才会看到 EOF 结束waitpid(pid, NULL, 0);
}
关键点:dup2 让“写标准输出/读标准输入”的代码不用改,就能通过管道通信。
3. 阻塞语义与 EOF:open/read/write 的行为
3.1 打开 FIFO 的阻塞规则
open(path, O_RDONLY):若无写端打开 → 阻塞(除非O_NONBLOCK)。open(path, O_WRONLY):若无读端打开 → 阻塞(除非O_NONBLOCK)。open(path, O_RDWR):既读又写,可避免阻塞,但很少用于真实通信(更像自连通道)。
3.2 读写与 EOF
- 读端:
read()在无数据但写端仍在时阻塞;当所有写端都关闭时,read()返回 0(EOF)。 - 写端:当没有任何读端时,
write()触发SIGPIPE(默认终止进程),且write()返回 -1/EPIPE。
如果不想被信号杀死:signal(SIGPIPE, SIG_IGN);让write()返回 EPIPE 自行处理。
3.3 非阻塞
fcntl(fd, F_SETFL, O_NONBLOCK)或在open里加O_NONBLOCK。- 非阻塞读:无数据返回 -1/EAGAIN;非阻塞写:缓冲满返回 -1/EAGAIN。
4. 原子性(atomicity)与 PIPE_BUF
4.1 原子写的保证
- POSIX 要求:写入长度 ≤
PIPE_BUF的一次write(),对同一管道是原子的——不会被其他进程的写入穿插。 - 常见
PIPE_BUF至少 4096 字节(4KB),但实际值可更大。
4.2 PIPE_BUF vs _PC_PIPE_BUF
PIPE_BUF:编译期常量(最小保证值)。_PC_PIPE_BUF:用于pathconf/fpathconf(fd, _PC_PIPE_BUF)的查询项,可得到运行时实际值。
long atom = fpathconf(fd, _PC_PIPE_BUF); // 运行时查询
4.3 > PIPE_BUF 的写入怎么做?
- 需要手动分块(每块 ≤ 原子写大小),循环写完;并处理部分写与 EINTR。
ssize_t write_all(int fd, const void* buf, size_t len) {const char* p = buf;while (len) {ssize_t n = write(fd, p, len);if (n < 0) { if (errno == EINTR) continue; return -1; }p += n; len -= n;}return 0;
}
5. 管道容量:满了会怎样?能否调大?
- 管道有容量(缓存区大小),写端写满后会在 阻塞模式下阻塞,在 非阻塞模式下返回 -1/EAGAIN。
- Linux 可用
fcntl(fd, F_GETPIPE_SZ/F_SETPIPE_SZ)查询/设置容量(需要权限;有上限)。
int sz = fcntl(fd, F_GETPIPE_SZ);
fcntl(fd, F_SETPIPE_SZ, 1<<20); // 尝试设置为 1MB
6. pipe2、O_CLOEXEC、O_NONBLOCK
-
pipe2(int fd[2], int flags):在创建时就设置标志,原子且更安全。O_CLOEXEC: 在exec时自动关闭,防“fd 泄漏到子进程”。O_NONBLOCK: 直接创建为非阻塞。
int p[2];
pipe2(p, O_CLOEXEC | O_NONBLOCK);
7. 多路复用:select / poll / epoll 监控管道
- 适合同时监听多个管道/套接字。
- 读就绪:管道中有数据可读或对端已关闭(读到 0)。
- 写就绪:管道有空间可写或对端关闭(写返回 EPIPE)。
fd_set r; FD_ZERO(&r); FD_SET(fd, &r);
select(fd+1, &r, NULL, NULL, NULL);
if (FD_ISSET(fd, &r)) { /* read */ }
8. FIFO 读写的健壮模板
8.1 写端(producer)
int fd = open("/tmp/myfifo", O_WRONLY); // 可能阻塞,等读端就绪
signal(SIGPIPE, SIG_IGN); // 避免被杀
long atom = fpathconf(fd, _PC_PIPE_BUF); // 原子写上限(兜底4096)
size_t chunk = (atom > 0 ? atom : 4096);char* buf = malloc(chunk);
memset(buf, 'A', chunk);for (long long left = 10LL<<20; left > 0; ) { // 写 10MBsize_t n = left < chunk ? left : chunk;ssize_t m = write(fd, buf, n);if (m < 0) { if (errno == EINTR) continue; if (errno == EPIPE) break; perror("write"); break; }left -= m;
}
close(fd);
8.2 读端(consumer)
int fd = open("/tmp/myfifo", O_RDONLY); // 可能阻塞,等写端就绪
char buf[8192];
for (;;) {ssize_t n = read(fd, buf, sizeof(buf));if (n == 0) break; // 写端都关闭了 → EOFif (n < 0) { if (errno == EINTR) continue; perror("read"); break; }// 处理数据(注意二进制无 '\0')
}
close(fd);
9. 进阶:双向通信、与 stdio 配合、零拷贝
9.1 双向通信
- 两条管道(A→B 一条,B→A 一条),或直接用
socketpair(AF_UNIX, SOCK_STREAM, 0, sv)(更简单,真全双工)。
9.2 与 stdio 缓冲配合
fdopen()可把 fd 包装成FILE*,用fgets/fprintf等:
FILE* in = fdopen(p[0], "r");
FILE* out = fdopen(p[1], "w");
setvbuf(out, NULL, _IOLBF, 0); // 行缓冲
fprintf(out, "hello\n"); fflush(out);
- 注意
stdio有用户态缓冲,需要fflush()才可及时见到对端数据(或用行缓冲)。
9.3 零拷贝:splice/tee
splice():在两个 fd 之间移动数据,绕过用户态缓冲,提高吞吐。tee():把一个管道的内容“复制”一份到另一个管道(也零拷贝)。
这些属于高阶优化,写日志/IPC 大多不必用。
10. 常见坑与排错
- 忘记关闭无用端:父子进程里不需要的
fd[0]/fd[1]不关,会导致对端收不到 EOF。 - 写入 >
PIPE_BUF:多生产者会交错,务必分块。 SIGPIPE被默认杀死:给写端signal(SIGPIPE, SIG_IGN),或捕获处理。O_NONBLOCK:EAGAIN 不是错误,表示“此刻不可写/无数据”,需要重试或select/poll等待。argv[0]误用:传参给 exec 后的子程序时,参数从argv[1]开始。- FIFO 打开时机:单独
open(O_WRONLY)没读端会阻塞;调试时可以让读端先运行。 - 权限与安全:FIFO 是文件,有权限;在多用户环境注意 MODE 与 umask。
- 观察:
lsof -p <PID>看打开的 fd;ls -l /proc/<PID>/fd;strace -f看系统调用。
11. 与其它 IPC 的取舍
| IPC | 优点 | 局限/适用 |
|---|---|---|
| 管道/FIFO | 简单、顺序字节流、内核缓冲、与命令行/stdio 天然契合 | 半双工;无“消息边界”;大数据吞吐一般 |
| Unix 域套接字 | 真双工、面向字节/流/数据报、可传文件描述符 | 接口略复杂,但最通用 |
| 共享内存 | 最快、适合大数据 | 需同步原语(信号量/互斥量);编程复杂 |
| 消息队列 | 有“消息边界”、排队 | 内核资源管理、限制多;结构化消息 |
| 信号 | 事件通知、打断 | 不传数据或仅少量附加数据;要配合别的通道 |
12. 参考小抄:关键 API 原型
// 匿名管道
int pipe(int fd[2]);
int pipe2(int fd[2], int flags); // O_CLOEXEC, O_NONBLOCK// 命名管道
int mkfifo(const char *path, mode_t mode);
int open(const char *path, int flags, ...); // O_RDONLY/O_WRONLY/O_NONBLOCK
ssize_t read(int fd, void *buf, size_t count);
ssize_t write(int fd, const void *buf, size_t count);
int close(int fd);// 属性与控制
long fpathconf(int fd, int name); // _PC_PIPE_BUF
int fcntl(int fd, int cmd, ...); // F_GETFL/F_SETFL(O_NONBLOCK), F_GETPIPE_SZ/F_SETPIPE_SZ
int dup2(int oldfd, int newfd);// 多路复用
int select(int nfds, fd_set *r, fd_set *w, fd_set *e, struct timeval *to);
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
int epoll_create1(int flags); int epoll_ctl(...); int epoll_wait(...);// 信号
typedef void (*sighandler_t)(int);
sighandler_t signal(int signum, sighandler_t handler);
13. 最后,用两段能直接运行的小示例收个尾
13.1 FIFO 写(10MB,原子分块 + 忽略 SIGPIPE)
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <signal.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>int main() {const char *path = "/tmp/myfifo";mkfifo(path, 0666);int fd = open(path, O_WRONLY); // 等读端if (fd == -1) { perror("open"); return 1; }signal(SIGPIPE, SIG_IGN); // 避免被杀long atom = fpathconf(fd, _PC_PIPE_BUF);if (atom < 0) atom = 4096;char *buf = malloc(atom);for (long i=0;i<atom;i++) buf[i] = 'A' + (i%26);long long left = 10LL<<20; // 10MBwhile (left > 0) {size_t n = left < atom ? left : atom;ssize_t m = write(fd, buf, n);if (m < 0) { if (errno==EINTR) continue; perror("write"); break; }left -= m;}free(buf); close(fd);return 0;
}
13.2 FIFO 读(统计字节并打印行数)
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <errno.h>int main() {const char *path = "/tmp/myfifo";mkfifo(path, 0666);int fd = open(path, O_RDONLY); // 等写端if (fd == -1) { perror("open"); return 1; }char buf[8192];long long total=0, lines=0;for (;;) {ssize_t n = read(fd, buf, sizeof buf);if (n == 0) break; // EOFif (n < 0) { if (errno==EINTR) continue; perror("read"); break; }total += n;for (ssize_t i=0;i<n;i++) if (buf[i]=='\n') lines++;}printf("received %lld bytes, %lld lines\n", total, lines);close(fd);return 0;
}
一句话总结
- 管道是最“Unix 味”的 IPC:顺序字节流、简单、与 shell/stdio 天然契合。
- 记住三件事:阻塞规则(open/read/write)、原子性(
PIPE_BUF)、EOF 与SIGPIPE。 - 再加上
dup2、select/poll/epoll、O_NONBLOCK、O_CLOEXEC与适度的容量调优,就能写出健壮、可维护、可扩展的管道程序。
