NodeJS Streams:在 Pipeline 中优雅地提前结束读取流

nodejs streams:在 pipeline 中优雅地提前结束读取流

本文探讨了在使用 NodeJS Streams 的 pipeline 处理大型文件时,如何在满足特定条件后提前结束读取流,同时确保已读取的数据块能够完成处理。文章提供了两种解决方案:一种是在转换流中“吞噬”后续数据,另一种是利用 AbortController 中止 pipeline,并详细讲解了实现方法和注意事项,旨在帮助开发者更有效地处理流数据。

在使用 NodeJS streams 的 pipeline 处理大型文件时,有时需要在特定条件满足时提前结束读取流,但同时又希望已读取的数据块能够继续完成处理。直接销毁读取流可能会导致 ERR_STREAM_PREMATURE_CLOSE 错误,并且不够优雅。本文将介绍两种更佳的解决方案,帮助你安全且高效地实现这一需求。

方案一:在转换流中“吞噬”后续数据

这种方法的核心思想是在检测到需要停止读取的条件后,让转换流“吞噬”后续的所有数据,使其不再向下传递。这样,读取流会一直读取到文件末尾,但下游的流只处理到满足停止条件之前的数据。

以下是示例代码:

const { Transform } = require("node:stream");const { pipeline } = require("node:stream/promises");const fs = require("node:fs");let shouldStop = false;const firstStream = fs.createReadStream("./lg.txt");const secondStream = new Transform({    transform(chunk, encoding, callback) {        if (shouldStop) {            // 吞噬剩余数据            callback(null, "");        } else {            const text = chunk.toString();            const foundText = text.search("CHAPTER 9") !== -1;            if (foundText) {                // 设置标志位,吞噬剩余数据                shouldStop = true;            }            callback(null, text.toUpperCase());        }    },});const lastStream = process.stdout;pipeline(firstStream, secondStream, lastStream)    .then(() => console.log("Pipeline completed successfully."))    .catch(err => console.error("Pipeline failed.", err));

代码解释:

shouldStop 变量用于标记是否需要停止处理数据。在 secondStream 的 transform 函数中,如果 shouldStop 为 true,则直接调用 callback(null, “”),表示吞噬当前数据块,不向下传递。如果 shouldStop 为 false,则检查当前数据块中是否包含目标文本。如果包含,则设置 shouldStop 为 true,并对数据进行转换后向下传递。

优点:

逻辑简单,易于理解和实现。不需要中断 pipeline,避免了潜在的错误。

缺点:

Jenni AI Jenni AI

使用最先进的 AI 写作助手为您的写作增光添彩。

Jenni AI 48 查看详情 Jenni AI 读取流会一直读取到文件末尾,可能会浪费一些资源。

方案二:使用 AbortController 中止 Pipeline

AbortController 提供了一种更优雅的方式来中止 pipeline,并且可以进行清理工作。通过 AbortController,可以向 pipeline 发送一个中止信号,pipeline 会在完成当前数据块的处理后停止。

以下是示例代码:

const { Transform } = require("node:stream");const { pipeline } = require("node:stream/promises");const fs = require("node:fs");const firstStream = fs.createReadStream("./lg.txt");const ac = new AbortController();const signal = ac.signal;const secondStream = new Transform({    transform(chunk, encoding, callback) {        const text = chunk.toString();        const foundText = text.search("CHAPTER 9") !== -1;        callback(null, text.toUpperCase());        if (foundText) {            ac.abort(new Error("reading terminated, match found"));        }    },});const lastStream = process.stdout;pipeline(firstStream, secondStream, lastStream, { signal }).then(() => {    console.log("nall done without match");}).catch((err) => {    if (err.code === "ABORT_ERR") {        console.log(`n${signal.reason.message}`);    } else {        console.log(err);    }});

代码解释:

创建 AbortController 实例 ac,并获取其 signal 属性。在 secondStream 的 transform 函数中,如果检测到目标文本,则调用 ac.abort(new Error(“reading terminated, match found”)),发送中止信号。在调用 pipeline 函数时,将 signal 作为选项传递。使用 try…catch 捕获 pipeline 函数可能抛出的错误。如果错误代码为 ABORT_ERR,则表示 pipeline 被中止,可以获取中止原因。

优点:

更优雅地中止 pipeline,可以进行清理工作。可以获取中止原因,方便调试。

缺点:

代码相对复杂一些。

注意事项:

跨 Chunk 边界问题: 在搜索目标文本时,需要注意目标文本可能跨越 chunk 边界的情况。为了避免漏检,可以保留每个 chunk 的最后 N-1 个字符,并将其添加到下一个 chunk 的开头,其中 N 为目标文本的长度。错误处理: 在使用 pipeline 函数时,需要注意错误处理。可以使用 try…catch 语句捕获可能抛出的错误,并进行相应的处理。资源释放: 在中止 pipeline 后,需要确保所有资源都得到正确释放。

总结

本文介绍了两种在 NodeJS Streams 的 pipeline 中提前结束读取流的解决方案。第一种方案是在转换流中“吞噬”后续数据,逻辑简单,但可能会浪费一些资源。第二种方案是使用 AbortController 中止 pipeline,更优雅,可以进行清理工作,但代码相对复杂。选择哪种方案取决于具体的应用场景和需求。同时,需要注意跨 chunk 边界问题和错误处理,确保程序的稳定性和可靠性。

以上就是NodeJS Streams:在 Pipeline 中优雅地提前结束读取流的详细内容,更多请关注创想鸟其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/752406.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2025年11月25日 21:07:18
下一篇 2025年11月25日 21:07:40

相关推荐

发表回复

登录后才能评论
关注微信