查看: 110|回复: 1

Node.js I/O流操作实战:可读流、可写流与双工流详解

[复制链接]
发表于 昨天 23:00 | 显示全部楼层 |阅读模式
I/O流是Node.js处理文件读写、网络数据传输的核心机制。流将数据分割成小块,避免一次性加载大文件造成内存溢出。本文基于Node.js内置stream模块,详细讲解可读流、可写流、双工流和转换流的创建、属性、方法、事件及典型应用场景。
  1. const stream = require('stream');
复制代码

一、流的基本概念

流分为四种基本类型:Readable(可读流)、Writable(可写流)、Duplex(可读可写流,即双工流)和Transform(转换流,继承自Duplex,在读写过程中可修改数据)。流通过Buffer(内存区域)缓存数据,Buffer大小由构造函数的highWaterMark选项决定,默认值通常为16KB(对象模式为16个对象)。当可读缓冲区达到highWaterMark阈值时,流暂停从数据源读取,直到缓冲区数据被释放。

二、可读流

2.1 读取模式与状态
可读流有两种模式:流动模式(flowing)和暂停模式(paused)。所有可读流以暂停模式开始,此时可通过read()方法按需读取;流动模式则会根据highWaterMark持续读取,直至文件读完。模式可通过data事件、resume()、pipe()切换为流动模式,通过pause()、unpipe()切换为暂停模式。

可读流的状态有三种:null(初始状态)、true(流动状态)、false(非流动状态)。初始状态下没有数据消费者;监听data事件或调用pipe()/resume()会变为流动状态(true),流开始主动产生数据。调用pause()或unpipe()变为非流动状态(false),之后即使再监听data事件也不会再变回流动状态。

2.2 创建可读流
方式一:直接使用stream模块的Readable类
  1. const readable = new stream.Readable();
复制代码
方式二:使用fs模块的createReadStream方法(最常用)
  1. const fs = require('fs');
  2. const read = fs.createReadStream('春夜喜雨.txt', {
  3.   flags: 'r',
  4.   encoding: null,
  5.   fd: null,
  6.   mode: 0o666,
  7.   autoClose: true,
  8.   emitClose: true,
  9.   start: 0,
  10.   end: Infinity,
  11.   highWaterMark: 64 * 1024  // 64KB
  12. });
复制代码

2.3 核心属性、方法和事件
常用属性:destroyed(是否销毁)、readable(是否可读)、readableEncoding(编码)、readableEnded(数据是否读完)、readableFlowing(当前状态)、readableHighWaterMark(highWaterMark值)、readableLength(队列中字节数)。

常用方法:
read([size]) — 从流中读取指定字节的数据。
setEncoding(encoding) — 设置编码,如utf8,使读取数据直接返回字符串而非Buffer。
pause() — 暂停data事件,切换为非流动模式。
resume() — 恢复data事件,切换为流动模式。
ispaused() — 判断当前流是否处于暂停状态。
destroy([error]) — 销毁流,可选参数用于触发error事件。
pipe(destination[, options]) — 将可读流绑定到可写流,自动切换流动模式,数据直接传输到可写流。
unpipe([destination]) — 解绑可写流。

常用事件:close(关闭)、data(传送数据块)、end(无数据)、error(错误)、pause(暂停)、readable(有数据可读)、resume(恢复)。

2.4 常见操作示例

读取数据(暂停模式):
  1. const fs = require('fs');
  2. const read = fs.createReadStream('春夜喜雨.txt', {encoding: 'utf-8'});
  3. read.on('readable', () => {
  4.   let chunk;
  5.   while (null !== (chunk = read.read(25))) {
  6.     console.log(chunk);
  7.   }
  8. });
复制代码

设置编码后直接得到字符串:
  1. const fs = require('fs');
  2. const read = fs.createReadStream('春夜喜雨.txt');
  3. read.setEncoding('utf8');
  4. read.on('readable', function () {
  5.   let chunk;
  6.   while (null !== (chunk = read.read(25))) {
  7.     console.log(chunk);
  8.   }
  9. });
复制代码

暂停与恢复(流动模式):
  1. const fs = require('fs');
  2. const read = fs.createReadStream('春夜喜雨.txt', {highWaterMark: 25});
  3. read.setEncoding('utf8');
  4. read.on('data', (chunk) => {
  5.   console.log(chunk.toString());
  6.   read.pause();
  7.   setTimeout(() => { read.resume(); }, 1000);
  8. });
  9. read.on('close', () => { console.log('读取完毕'); });
复制代码

判断状态:
  1. const readable = new stream.Readable();
  2. console.log(readable.ispaused());  // true(初始状态为暂停)
  3. readable.pause();
  4. console.log(readable.ispaused());  // true
复制代码

销毁流(会导致数据清空):
  1. const read = fs.createReadStream('春夜喜雨.txt');
  2. read.setEncoding('utf8');
  3. read.on('data', (chunk) => { console.log(chunk.toString()); });
  4. read.destroy();  // 没有任何输出,因为流已被销毁
复制代码

绑定与解绑可写流:
  1. const fs = require('fs');
  2. const read = fs.createReadStream('春夜喜雨.txt');
  3. const write = fs.createWriteStream('aaa.txt', {flags: 'a'});
  4. read.pipe(write);  // 将可读流数据追加到aaa.txt
  5. console.log('已完成');
复制代码
  1. const fs = require('fs');
  2. const read = fs.createReadStream('春夜喜雨.txt');
  3. const write = fs.createWriteStream('bbb.txt', {flags: 'a'});
  4. read.pipe(write);
  5. console.log('已绑定可写流');
  6. read.unpipe(write);
  7. console.log('已解绑可写流');
复制代码

三、可写流

3.1 创建可写流
使用stream.Writable类(需重写write方法):
  1. const stream = require('stream');
  2. const writable = new stream.Writable({
  3.   write: function (chunk, encoding, next) {
  4.     console.log(chunk.toString());
  5.     next();
  6.   }
  7. });
复制代码
使用fs.createWriteStream(更常用):
  1. const fs = require('fs');
  2. const write = fs.createWriteStream('demo.txt', {
  3.   flags: 'w',   // 若要追加内容改为'a'
  4.   encoding: null,
  5.   fd: null,
  6.   mode: 0o666,
  7.   autoClose: true,
  8.   emitClose: true,
  9.   start: 0,
  10.   highWaterMark: 16 * 1024
  11. });
复制代码

3.2 核心属性、方法和事件
常用属性:destroyed、writable、writableEnded、writableCorked、writableFinished、writableHighWaterMark、writableLength、writableNeedDrain。

常用方法:
write(chunk[, encoding][, callback]) — 写入数据。
end([chunk][, encoding][, callback]) — 通知写结束,关闭流。
setDefaultEncoding(encoding) — 设置默认编码。
destroy([error]) — 销毁流。
cork() — 强制将所有写入的数据缓冲到内存。
uncork() — 输出cork后缓冲的所有数据。

常用事件:close、open、drain(写入缓冲区空)、error、finish(end后缓冲区数据已写入底层)、pipe、unpipe。

3.3 常见操作示例

写入数据(追加):
  1. const fs = require('fs');
  2. let txt = '这首诗抓住了边塞风光景物的一些特点...';
  3. let decr = fs.createWriteStream('凉州词.txt', {flags: 'a'});
  4. decr.write('\n鉴赏:\n' + txt, 'utf8');
复制代码

设置编码:
  1. const writeSteam = fs.createWriteStream('demo.txt');
  2. writeSteam.setDefaultEncoding('utf8');
  3. writeSteam.write('测试数据');
复制代码

关闭流(end后不可再写入):
  1. const writeSteam = fs.createWriteStream('demo.txt');
  2. writeSteam.setDefaultEncoding('utf8');
  3. writeSteam.write('测试数据');
  4. writeSteam.end('写入完成');
  5. // writeSteam.write('继续写入');  // 此行会报错
复制代码

销毁流(导致写入失效):
  1. const writeSteam = fs.createWriteStream('demo.txt');
  2. writeSteam.setDefaultEncoding('utf8');
  3. writeSteam.write('测试数据');
  4. writeSteam.destroy();  // demo.txt为空
复制代码

缓冲写入(cork/uncork):
  1. const stream = require('stream');
  2. const writable = new stream.Writable({
  3.   write: function (chunk, encoding, next) {
  4.     console.log(chunk.toString());
  5.     next();
  6.   }
  7. });
  8. writable.write('天气晴朗');
  9. writable.cork();
  10. writable.write('阳光明媚');  // 此时不会输出
  11. writable.uncork();  // 输出'阳光明媚'
复制代码

四、双工流

双工流(Duplex)同时具备可读和可写功能,实现Readable和Writable。需要继承Duplex并实现_read()和_write()方法。
  1. const Duplex = require('stream').Duplex;
  2. const myDuplex = new Duplex({
  3.   _read(size) {
  4.     // 可读逻辑
  5.   },
  6.   _write(chunk, encoding, callback) {
  7.     // 可写逻辑
  8.   }
  9. });
复制代码

示例:
  1. const stream = require('stream');
  2. const duplexStream = new stream.Duplex();
  3. duplexStream._read = function () {
  4.   this.push('读取数据');
  5.   this.push(null);
  6. };
  7. duplexStream._write = function (data, enc, next) {
  8.   console.log(data.toString());
  9.   next();
  10. };
  11. duplexStream.on('data', data => console.log('监听:' + data.toString()));
  12. duplexStream.on('end', data => console.log('监听:读取完成'));
  13. duplexStream.write('写入数据');
  14. duplexStream.end();
  15. duplexStream.on('finish', data => console.log('监听:写入完成'));
复制代码

输出结果:
  1. 写入数据
  2. 监听:读取数据
  3. 监听:写入完成
  4. 监听:读取完成
复制代码

五、转换流

转换流(Transform)继承自Duplex,但可读端和可写端自动关联,数据经过_transform()方法处理后再输出。常用于压缩(zlib)、加密等场景。

实现步骤:
1. 继承Transform类
2. 实现_transform(data, encoding, callback)方法。在该方法内调用this.push(data)输出数据,或通过callback(err, data)输出。必须调用callback(无错误时第一个参数为null)。
  1. const stream = require('stream');
  2. class TransformReverse extends stream.Transform {
  3.   constructor() {
  4.     super();
  5.   }
  6.   _transform(data, encoding, callback) {
  7.     this.push(data);
  8.     callback();
  9.   }
  10. }
复制代码

使用示例:
  1. const Stream = require('stream');
  2. class MyTransform extends Stream.Transform {
  3.   constructor() {
  4.     super();
  5.   }
  6.   _transform(chunk, encoding, callback) {
  7.     // 假设我们要将数据转为大写(仅为演示)
  8.     this.push(chunk.toString().toUpperCase());
  9.     callback();
  10.   }
  11. }
  12. const transform = new MyTransform();
  13. transform.write('hello');
  14. transform.end();
  15. transform.on('data', chunk => console.log(chunk.toString()));  // 输出: HELLO
复制代码

小结

Node.js的流机制是高效处理大文件和数据管道的基石。可读流和可写流分别负责输入和输出,通过pipe()实现自动背压控制;双工流和转换流则扩展了流的能力,允许同时读写或数据变换。理解流的模式切换、缓冲策略(highWaterMark)以及cork/uncork机制,能帮助开发者写出内存友好、性能优良的Node.js应用程序。
回复

使用道具 举报

发表于 昨天 23:10 | 显示全部楼层

Re: Node.js I/O流操作实战:可读流、可写流与双工流详解

楼主这篇Node.js I/O流的讲解非常详尽,从基本概念到四种流类型的区别、可读流的双模式切换,还有实际代码示例,都梳理得很清楚。特别是对readableFlowing状态的三种情况说明,以及暂停/恢复操作的演示,对新手理解流的工作机制很有帮助。 我试着跑了一下“暂停与恢复”那段代码,发现结合setTimeout实现限速读取确实很直观。另外想问一下楼主:文中提到通过pipe()绑定可写流时,可读流会自动切换到流动模式,那如果我自己在pipe之前手动调用了pause(),pipe还会生效吗?还是说pipe会覆盖pause状态?希望楼主能再指点一下这块的细节。 再次感谢分享,收藏了!
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 注册

本版积分规则

指导单位

江苏省公安厅

江苏省通信管理局

浙江省台州刑侦支队

DEFCON GROUP 86025

Hacking Group 021A

旗下站点

态势感知中心

应急响应中心

红盟安全

联系我们

官方QQ群:112851260

官方邮箱:security#ihonker.org(#改成@)

官方核心成员

关注微信公众号

Archiver|手机版|小黑屋| ( 沪ICP备2021026908号 )

GMT+8, 2026-6-13 01:04 , Processed in 0.039346 second(s), 18 queries , Gzip On, Redis On.

Powered by ihonker.com

Copyright © 2015-现在.

  • 返回顶部