I/O流是Node.js处理文件读写、网络数据传输的核心机制。流将数据分割成小块,避免一次性加载大文件造成内存溢出。本文基于Node.js内置stream模块,详细讲解可读流、可写流、双工流和转换流的创建、属性、方法、事件及典型应用场景。
- const stream = require('stream');
复制代码
一、流的基本概念
流分为四种基本类型:Readable(可读流)、Writable(可写流)、Duplex(可读可写流,即双工流)和Transform(转换流,继承自Duplex,在读写过程中可修改数据)。流通过Buffer(内存区域)缓存数据,Buffer大小由构造函数的highWaterMark选项决定,默认值通常为16KB(对象模式为16个对象)。当可读缓冲区达到highWaterMark阈值时,流暂停从数据源读取,直到缓冲区数据被释放。
二、可读流
2.1 读取模式与状态
可读流有两种模式:流动模式(flowing)和暂停模式(paused)。所有可读流以暂停模式开始,此时可通过read()方法按需读取;流动模式则会根据highWaterMark持续读取,直至文件读完。模式可通过data事件、resume()、pipe()切换为流动模式,通过pause()、unpipe()切换为暂停模式。
可读流的状态有三种:null(初始状态)、true(流动状态)、false(非流动状态)。初始状态下没有数据消费者;监听data事件或调用pipe()/resume()会变为流动状态(true),流开始主动产生数据。调用pause()或unpipe()变为非流动状态(false),之后即使再监听data事件也不会再变回流动状态。
2.2 创建可读流
方式一:直接使用stream模块的Readable类- const readable = new stream.Readable();
复制代码 方式二:使用fs模块的createReadStream方法(最常用)- const fs = require('fs');
- const read = fs.createReadStream('春夜喜雨.txt', {
- flags: 'r',
- encoding: null,
- fd: null,
- mode: 0o666,
- autoClose: true,
- emitClose: true,
- start: 0,
- end: Infinity,
- highWaterMark: 64 * 1024 // 64KB
- });
复制代码
2.3 核心属性、方法和事件
常用属性:destroyed(是否销毁)、readable(是否可读)、readableEncoding(编码)、readableEnded(数据是否读完)、readableFlowing(当前状态)、readableHighWaterMark(highWaterMark值)、readableLength(队列中字节数)。
常用方法:
read([size]) — 从流中读取指定字节的数据。
setEncoding(encoding) — 设置编码,如utf8,使读取数据直接返回字符串而非Buffer。
pause() — 暂停data事件,切换为非流动模式。
resume() — 恢复data事件,切换为流动模式。
ispaused() — 判断当前流是否处于暂停状态。
destroy([error]) — 销毁流,可选参数用于触发error事件。
pipe(destination[, options]) — 将可读流绑定到可写流,自动切换流动模式,数据直接传输到可写流。
unpipe([destination]) — 解绑可写流。
常用事件:close(关闭)、data(传送数据块)、end(无数据)、error(错误)、pause(暂停)、readable(有数据可读)、resume(恢复)。
2.4 常见操作示例
读取数据(暂停模式):- const fs = require('fs');
- const read = fs.createReadStream('春夜喜雨.txt', {encoding: 'utf-8'});
- read.on('readable', () => {
- let chunk;
- while (null !== (chunk = read.read(25))) {
- console.log(chunk);
- }
- });
复制代码
设置编码后直接得到字符串:- const fs = require('fs');
- const read = fs.createReadStream('春夜喜雨.txt');
- read.setEncoding('utf8');
- read.on('readable', function () {
- let chunk;
- while (null !== (chunk = read.read(25))) {
- console.log(chunk);
- }
- });
复制代码
暂停与恢复(流动模式):- const fs = require('fs');
- const read = fs.createReadStream('春夜喜雨.txt', {highWaterMark: 25});
- read.setEncoding('utf8');
- read.on('data', (chunk) => {
- console.log(chunk.toString());
- read.pause();
- setTimeout(() => { read.resume(); }, 1000);
- });
- read.on('close', () => { console.log('读取完毕'); });
复制代码
判断状态:- const readable = new stream.Readable();
- console.log(readable.ispaused()); // true(初始状态为暂停)
- readable.pause();
- console.log(readable.ispaused()); // true
复制代码
销毁流(会导致数据清空):- const read = fs.createReadStream('春夜喜雨.txt');
- read.setEncoding('utf8');
- read.on('data', (chunk) => { console.log(chunk.toString()); });
- read.destroy(); // 没有任何输出,因为流已被销毁
复制代码
绑定与解绑可写流:- const fs = require('fs');
- const read = fs.createReadStream('春夜喜雨.txt');
- const write = fs.createWriteStream('aaa.txt', {flags: 'a'});
- read.pipe(write); // 将可读流数据追加到aaa.txt
- console.log('已完成');
复制代码- const fs = require('fs');
- const read = fs.createReadStream('春夜喜雨.txt');
- const write = fs.createWriteStream('bbb.txt', {flags: 'a'});
- read.pipe(write);
- console.log('已绑定可写流');
- read.unpipe(write);
- console.log('已解绑可写流');
复制代码
三、可写流
3.1 创建可写流
使用stream.Writable类(需重写write方法):- const stream = require('stream');
- const writable = new stream.Writable({
- write: function (chunk, encoding, next) {
- console.log(chunk.toString());
- next();
- }
- });
复制代码 使用fs.createWriteStream(更常用):- const fs = require('fs');
- const write = fs.createWriteStream('demo.txt', {
- flags: 'w', // 若要追加内容改为'a'
- encoding: null,
- fd: null,
- mode: 0o666,
- autoClose: true,
- emitClose: true,
- start: 0,
- highWaterMark: 16 * 1024
- });
复制代码
3.2 核心属性、方法和事件
常用属性:destroyed、writable、writableEnded、writableCorked、writableFinished、writableHighWaterMark、writableLength、writableNeedDrain。
常用方法:
write(chunk[, encoding][, callback]) — 写入数据。
end([chunk][, encoding][, callback]) — 通知写结束,关闭流。
setDefaultEncoding(encoding) — 设置默认编码。
destroy([error]) — 销毁流。
cork() — 强制将所有写入的数据缓冲到内存。
uncork() — 输出cork后缓冲的所有数据。
常用事件:close、open、drain(写入缓冲区空)、error、finish(end后缓冲区数据已写入底层)、pipe、unpipe。
3.3 常见操作示例
写入数据(追加):- const fs = require('fs');
- let txt = '这首诗抓住了边塞风光景物的一些特点...';
- let decr = fs.createWriteStream('凉州词.txt', {flags: 'a'});
- decr.write('\n鉴赏:\n' + txt, 'utf8');
复制代码
设置编码:- const writeSteam = fs.createWriteStream('demo.txt');
- writeSteam.setDefaultEncoding('utf8');
- writeSteam.write('测试数据');
复制代码
关闭流(end后不可再写入):- const writeSteam = fs.createWriteStream('demo.txt');
- writeSteam.setDefaultEncoding('utf8');
- writeSteam.write('测试数据');
- writeSteam.end('写入完成');
- // writeSteam.write('继续写入'); // 此行会报错
复制代码
销毁流(导致写入失效):- const writeSteam = fs.createWriteStream('demo.txt');
- writeSteam.setDefaultEncoding('utf8');
- writeSteam.write('测试数据');
- writeSteam.destroy(); // demo.txt为空
复制代码
缓冲写入(cork/uncork):- const stream = require('stream');
- const writable = new stream.Writable({
- write: function (chunk, encoding, next) {
- console.log(chunk.toString());
- next();
- }
- });
- writable.write('天气晴朗');
- writable.cork();
- writable.write('阳光明媚'); // 此时不会输出
- writable.uncork(); // 输出'阳光明媚'
复制代码
四、双工流
双工流(Duplex)同时具备可读和可写功能,实现Readable和Writable。需要继承Duplex并实现_read()和_write()方法。
- const Duplex = require('stream').Duplex;
- const myDuplex = new Duplex({
- _read(size) {
- // 可读逻辑
- },
- _write(chunk, encoding, callback) {
- // 可写逻辑
- }
- });
复制代码
示例:- const stream = require('stream');
- const duplexStream = new stream.Duplex();
- duplexStream._read = function () {
- this.push('读取数据');
- this.push(null);
- };
- duplexStream._write = function (data, enc, next) {
- console.log(data.toString());
- next();
- };
- duplexStream.on('data', data => console.log('监听:' + data.toString()));
- duplexStream.on('end', data => console.log('监听:读取完成'));
- duplexStream.write('写入数据');
- duplexStream.end();
- duplexStream.on('finish', data => console.log('监听:写入完成'));
复制代码
输出结果:- 写入数据
- 监听:读取数据
- 监听:写入完成
- 监听:读取完成
复制代码
五、转换流
转换流(Transform)继承自Duplex,但可读端和可写端自动关联,数据经过_transform()方法处理后再输出。常用于压缩(zlib)、加密等场景。
实现步骤:
1. 继承Transform类
2. 实现_transform(data, encoding, callback)方法。在该方法内调用this.push(data)输出数据,或通过callback(err, data)输出。必须调用callback(无错误时第一个参数为null)。
- const stream = require('stream');
- class TransformReverse extends stream.Transform {
- constructor() {
- super();
- }
- _transform(data, encoding, callback) {
- this.push(data);
- callback();
- }
- }
复制代码
使用示例:- const Stream = require('stream');
- class MyTransform extends Stream.Transform {
- constructor() {
- super();
- }
- _transform(chunk, encoding, callback) {
- // 假设我们要将数据转为大写(仅为演示)
- this.push(chunk.toString().toUpperCase());
- callback();
- }
- }
- const transform = new MyTransform();
- transform.write('hello');
- transform.end();
- transform.on('data', chunk => console.log(chunk.toString())); // 输出: HELLO
复制代码
小结
Node.js的流机制是高效处理大文件和数据管道的基石。可读流和可写流分别负责输入和输出,通过pipe()实现自动背压控制;双工流和转换流则扩展了流的能力,允许同时读写或数据变换。理解流的模式切换、缓冲策略(highWaterMark)以及cork/uncork机制,能帮助开发者写出内存友好、性能优良的Node.js应用程序。 |