Nodejs(③Stream)
Transform
就像水管中间加了个 “净水器”,水流过的时候会被处理:
const { Transform } = require('stream');// 想象成一个“过滤器”:只保留数字,过滤掉其他字符
const numberFilter = new Transform({transform(chunk, encoding, callback) {// chunk 是流过来的“一小块数据”(比如用户输入的一段文字)const data = chunk.toString();// 只保留数字const numbersOnly = data.replace(/[^0-9]/g, '');// 把处理后的数据传到下一段“水管”this.push(numbersOnly);callback(); // 告诉系统“这块数据处理完了,可以传下一块了”}
});// 用法:键盘输入(水龙头开)→ 过滤器 → 控制台输出(接水)
process.stdin.pipe(numberFilter).pipe(process.stdout);
测试:运行后在键盘输入 abc123def456
,会直接输出 123456
(其他字符被过滤了)
pipeline
如果需要多个 “过滤器”(比如先过滤→再加密),用 pipeline
把它们连起来,就像接水管:
const { pipeline } = require('stream/promises');
const fs = require('fs');
const { Transform } = require('stream');// 第一个过滤器:只保留字母
const letterFilter = new Transform({transform(chunk, encoding, callback) {const letters = chunk.toString().replace(/[^a-zA-Z]/g, '');this.push(letters);callback();}
});// 第二个过滤器:转成大写
const upperCaseFilter = new Transform({transform(chunk, encoding, callback) {this.push(chunk.toString().toUpperCase());callback();}
});// 用 pipeline 连接:读文件 → 过滤字母 → 转大写 → 写入新文件
async function run() {await pipeline(fs.createReadStream('input.txt'), // 读文件(水源)letterFilter, // 第一步处理upperCaseFilter, // 第二步处理fs.createWriteStream('output.txt') // 写文件(终点));console.log('处理完成!');
}run();
背压控制
你往一个杯子里倒水(生产者),杯子下面有个小洞在漏水(消费者)。
如果倒水速度(生产)比漏水速度(消费)慢,杯子里的水不会满,一切正常。
如果倒水太快,杯子里的水会越积越多(缓冲),快满的时候,你肯定会下意识放慢倒水速度(这就是背压控制在起作用),否则水就会溢出(内存爆炸)。
比如用 fs.createReadStream 读文件(生产者,速度快),用 process.stdout.write 打印(消费者,速度慢,因为打印要等终端处理)。
当读得太快,数据会先存在 Stream 内部的缓冲区里。
当缓冲区快满时,Stream 会自动告诉生产者:“慢点!我处理不过来了!”,生产者就会暂停读取,直到缓冲区有空间了再继续。
const fs = require('fs');
// 读一个大文件(生产者)
const readStream = fs.createReadStream('超大文件.txt');
// 打印到控制台(消费者,速度慢)
readStream.on('data', (chunk) => {// 如果控制台处理不过来,write 会返回 false,Stream 会自动暂停读取const canWrite = process.stdout.write(chunk);if (!canWrite) {readStream.pause(); // 暂停读取process.stdout.once('drain', () => {readStream.resume(); // 等控制台处理完了再继续读});}
});
缓冲区(Buffer)在 Node.js 里就是一块内存区域,专门用来临时存放数据,起到 “中转站” 的作用。