首页 > 编程语言 >[NodeJS] Streams流式数据处理

[NodeJS] Streams流式数据处理

时间:2024-07-08 18:21:18浏览次数:23  
标签:fs http NodeJS res 流式 pipe Streams 可读

在现代应用开发中,数据处理的效率和资源管理尤为重要。NodeJS作为一种高效的JavaScript运行时环境,通过其强大的流(Stream)功能,提供了处理大规模数据的便捷方式。流式数据处理不仅能够优化内存使用,还可以提高数据处理的实时性和效率。下文将介绍NodeJS中的流概念、流的类型以及如何利用流来进行数据传输和处理。

流的基本概念

流式数据的特点是将数据分成一个一个的chunk,每次操作只针对其中的一小部分。

因此流式数据的读写操作不需要将整个数据保存在内存中(处理完就丢掉)。

常用于视频这种包含大量数据的应用场景,也可以在时间和空间角度上更有效地处理数据:

  • 时间:从开始读到流就可以处理数据并反馈给用户了,不需要等待全部数据到达,例如:ChatGPT的回答,就是流式数据传输,一个字一个字地显示出来;
  • 空间:如上文所说,在某些场景下不需要将整个数据保存在内存中。

NodeJS提供的API

NodeJS中的node:stream模块提供了对流数据进行处理的抽象接口。

NodeJS中的所有流对象都可以监听和触发事件,都是EventEmitter的实例对象。

下面的表格列出了每一种基本流常用且重要的事件

NodeJS中有四种基本的流类型:可读流、可写流、双工流和转换流。

描述 案例 事件 方法
可读流 Readable Streams 可用于读(消费)数据 1. http request
2. fs read streams
data
end
pipe()
read
可写流 Writable Streams 可用于写(生产)数据 1. http responses
2. fs write streams
drain
finish
write()
end()
双工流 Duplex Streams 可读可写 net 网络套接字
转换流 Transform Streams 双工流,在读写的时候可修改 zlib Gzip creation

流式数据传输案例

简介:创建一个比较大的文本文件,使用NodeJS启动一个服务,接口分别以三种方法返回文件内容。

代码

方法一 不使用流

读取整个文件的内容之后再返回;

读取大文件的时候不推荐这样写,因为整个文件会先被完整地从磁盘读取到内存中,再返回给客户端。

import fs from 'node:fs';
import http from 'node:http';

const server = http.createServer();

server.on('request', (req, res)=>{
  // CORS
  res.setHeader('Access-Control-Allow-Origin', '*');

  // Solution 1
  fs.readFile('test.txt', (err, data)=>{
    if(err)console.log(err);
    res.end(data);
  });
});

server.listen(3000, ()=>{
  console.log('listening...');
});
方法二 可读流

使用可读流,优点是边读文件边返回,只有当前处理的chunk会占据内存;

import fs from 'node:fs';
import http from 'node:http';

const server = http.createServer();

server.on('request', (req, res)=>{
  // CORS
  res.setHeader('Access-Control-Allow-Origin', '*');

  // Solution 2: Streams
  const readable = fs.createReadStream('test.txt');
  readable.on('data', (chunk)=>{
    res.write(chunk);
  });
  readable.on('end', ()=>{
    res.end();
  });
  readable.on('error', (err)=>{
    console.log(err);
    res.statusCode = 500;
    res.end('File reading error!');
  });
});

server.listen(3000, ()=>{
  console.log('listening...');
});
backpressure

这里介绍一下流控(Flow Controll)领域中的一个名词:Backpressure(翻译为 反压/背压)。

在Node.js和其他流处理系统中,backpressure(反压/背压)是指生产者生成数据的速度超过消费者处理数据的速度时产生的一种控制机制。

当可读流(Readable Stream)读取数据的速度快于可写流(Writable Stream)写入数据的速度时,就会产生backpressure。为了防止这种情况,可读流会根据可写流的消费能力进行控制,暂停或减慢读取数据的速度。

具体机制

  1. 可写流的缓冲区:可写流内部有一个缓冲区,用于暂存数据。如果这个缓冲区被填满,流会返回 false,表示消费者已经无法及时处理更多的数据。
  2. 暂停和恢复:当可写流返回 false 时,可读流会暂停读取数据。只有在可写流的缓冲区有足够的空间后,可读流才会恢复读取。
  3. 事件驱动:Node.js 流通过事件驱动的方式处理backpressure。当可写流的缓冲区有空间时,会触发 drain 事件,通知可读流继续读取数据。

示例代码:通过手动暂停和恢复合理利用缓冲区,避免数据丢失、内存溢出和资源耗尽。

import fs from 'node:fs';

const readableStream = fs.createReadStream('input.txt');
const writableStream = fs.createWriteStream('output.txt');

readableStream.on('data', (chunk)=>{
  const canWrite = writableStream.write(chunk);
  // 可写流的缓冲区空间不够了,暂停读数据(生产)
  if(!canWrite){
    readableStream.pause();
  }
});

// 当可写流的缓冲区空间足够,会触发`drain`事件
// 可以继续读数据
writableStream.on('drain', ()=>{
  readableStream.resume();
});

// 读取结束,停止写入
readableStream.on('end', ()=>{
  writableStream.end();
  console.log('done.');
});
pipe

在 Node.js 中,pipe 方法提供了一种更简单和自动化的方式来处理流之间的 backpressure 问题。pipe 方法可以连接可读流和可写流,并自动处理 backpressure,无需手动暂停和恢复流。

示例代码

import fs from 'node:fs';

const readableStream = fs.createReadStream('input.txt');
const writableStream = fs.createWriteStream('output.txt');

// 统一错误处理函数
function handleError(err) {
  console.error('发生错误:', err);
}

// 使用 pipe 连接可读流和可写流,并处理错误
readableStream.pipe(writableStream)
  .on('error', handleError);

// 处理可读流和可写流的错误
readableStream.on('error', handleError);
writableStream.on('error', handleError);

语法是:

readableSource.pipe(writableDestination);

接下来回到上文的关于流式数据网络传输的案例。

方法三 pipe

使用pipe可以简化许多代码,核心代码就是

readable.pipe(res);

示例代码:

import fs from 'node:fs';
import http from 'node:http';

const server = http.createServer();

server.on('request', (req, res)=>{
  // CORS
  res.setHeader('Access-Control-Allow-Origin', '*');

  // Solution 3: Pipe
  const readable = fs.createReadStream('test.txt');
  readable.pipe(res).on('error', ()=>{
    res.statusCode = 500;
    res.end('File reading error!');
  });
});

server.listen(3000, ()=>{
  console.log('listening...');
});

总结

  • 流(Stream)在NodeJS中的工作原理是将数据分成一个个小块进行处理,这样无需将整个数据加载到内存中,从而优化了内存使用和数据处理效率。
  • 流在NodeJS中有四种基本类型:可读流、可写流、双工流和转换流,每种类型都有其特定的应用场景和事件机制。
  • 流的应用场景主要包括视频播放、文件处理、实时数据传输等。在这些场景中,流通过边读边写、边处理边传输的方式,可以有效地提高数据处理的实时性和系统的性能。

参考

[1] B站 - NodeJS教程
[2] 知乎 - 如何形象的描述反应式编程中的背压(Backpressure)机制?

标签:fs,http,NodeJS,res,流式,pipe,Streams,可读
From: https://www.cnblogs.com/feixianxing/p/18290492/node-js-streams

相关文章

  • nodejs编写退出登录的接口逻辑
    目录1.安装必要的依赖2.登录成功生成和返回JWT3.在服务器端维护一个黑名单列表,记录已失效的JWT4.在验证JWT时检查黑名单5.退出登录时将JWT添加到黑名单中完整代码nodejs实现退出登录接口的逻辑,通常包括以下步骤:安装必要的依赖登录成功生成和返回JWT。在服务器......
  • nodejs登录成功生成token并验证
    目录1.安装必要的依赖包2.创建Express应用3.生成token4.使用`express-jwt`验证Token5.错误处理在Node.js中,nodejs登录成功生成token并验证通常涉及以下几个步骤:安装必要的依赖包:常用的库包括 `express`用于创建服务器,`jsonwebtoken`用于处理JWT(JSONWebToken),`expr......
  • [NodeJS] NodeJS运行原理简记
    NodeJS的基本组成NodeJS是JavaScript运行时,主要由V8引擎和libuv组成,其中V8使用javascript和c++编写,而libuv是纯c++编写的,二者都是开源的。V8引擎用于将javascript代码转换为计算机可以执行的机器码;而libuv则负责完成异步IO、与操作系统交互(文件系统和网络模块)、事件循......
  • nodejs和npm安装与配置
    nodejs官网:http://nodejs.cn/百度网盘下载链接:https://pan.baidu.com/s/1RfjeN1bt-I-tf351xi8cgw提取码:sybk下载官网的稳定版msi安装包nodejs默认安装配置了npm进入cmd命令行(以管理员身份打开)node-v npm-v        查看版本,检查安装设置npm淘宝镜像......
  • 将nodejs迁移到D盘
    参考双击安装,指定d盘查看文件夹查看环境变量,自动添加如下Path=D:\software\nodejs\打开cmd验证C:\Users\dogle>node-vv18.14.2C:\Users\dogle>npm-v9.5.0C:\Users\dogle>echo%PATH%C:\ProgramFiles(x86)\CommonFiles\Oracle\Java\javapath;C:\W......
  • nodejs 安装使用ip2region - 实时精准的IP地址到区域运营商查询
    ip2region简介ip2region是一个高性能且高准确度的离线IP地址定位库和IP定位数据管理框架。它能够根据IP地址解析出对应的位置信息,包括国家、地区、省份、城市以及互联网服务提供商(ISP)。以下是ip2region的一些关键特性:高准确率:它声称有99.9%的准确率,这使得它......
  • windows安装以及切换使用nodejs多版本
    1安装nvmnvm是一个简单的bash脚本,它是用来管理系统中多个已存的Node.js版本。可以先把系统已有的node卸载掉,也可不卸载,但是以防没必要的冲突,尽量还是卸掉。1.1下载nvm下载地址:https://github.com/coreybutler/nvm-windows/releases,下载.zip后缀的这个文件,下载后解压安装即可......
  • [NodeJS] timers阶段的源码解析
    timers阶段是Nodejs事件循环中的一个阶段,这一阶段主要是检查是否有到期的定时器,如果有则执行其回调。相关源码位置:timers阶段:node/deps/uv/src/timer.catmain·nodejs/node(github.com)timers阶段的代码比较少,这里直接贴出来,你也可以点进去上面的源码看自己感兴趣的部分......
  • [NodeJS] NodeJS事件循环
    JS是单线程的,如果出现阻塞会严重影响代码执行效率。NodeJS通过事件循环,尽可能地将耗时任务委派给系统内核来实现非阻塞IO。NodeJS提供了许多和异步相关的API,除了语言标准规定的setTimeout和setInterval,还有setImmediate和process.nextTick。经常和这几个出现在面试题里的还有Pr......
  • nodejs删除和重新安装
    若重新安装nodejs本人使用卸载并重新安装的方法,简单暴力卸载1.找到以前安装nodejs的文件路径,直接删除2.例如我的在D盘路径,直接卸载3.然后删除配置环境:右键此电脑——属性——高级系统设置——高级——环境变量4.找到用户变量在path关于node与npm并删除5.系统变......