
本文探讨了在使用 NodeJS Streams 的 pipeline 处理大型文件时,如何在满足特定条件后提前结束读取流,同时确保已读取的数据块能够完成处理。文章提供了两种解决方案:一种是在转换流中“吞噬”后续数据,另一种是利用 AbortController 中止 pipeline,并详细讲解了实现方法和注意事项,旨在帮助开发者更有效地处理流数据。
在使用 NodeJS streams 的 pipeline 处理大型文件时,有时需要在特定条件满足时提前结束读取流,但同时又希望已读取的数据块能够继续完成处理。直接销毁读取流可能会导致 ERR_STREAM_PREMATURE_CLOSE 错误,并且不够优雅。本文将介绍两种更佳的解决方案,帮助你安全且高效地实现这一需求。
方案一:在转换流中“吞噬”后续数据
这种方法的核心思想是在检测到需要停止读取的条件后,让转换流“吞噬”后续的所有数据,使其不再向下传递。这样,读取流会一直读取到文件末尾,但下游的流只处理到满足停止条件之前的数据。
以下是示例代码:
const { Transform } = require("node:stream");const { pipeline } = require("node:stream/promises");const fs = require("node:fs");let shouldStop = false;const firstStream = fs.createReadStream("./lg.txt");const secondStream = new Transform({ transform(chunk, encoding, callback) { if (shouldStop) { // 吞噬剩余数据 callback(null, ""); } else { const text = chunk.toString(); const foundText = text.search("CHAPTER 9") !== -1; if (foundText) { // 设置标志位,吞噬剩余数据 shouldStop = true; } callback(null, text.toUpperCase()); } },});const lastStream = process.stdout;pipeline(firstStream, secondStream, lastStream) .then(() => console.log("Pipeline completed successfully.")) .catch(err => console.error("Pipeline failed.", err));
代码解释:
shouldStop 变量用于标记是否需要停止处理数据。在 secondStream 的 transform 函数中,如果 shouldStop 为 true,则直接调用 callback(null, “”),表示吞噬当前数据块,不向下传递。如果 shouldStop 为 false,则检查当前数据块中是否包含目标文本。如果包含,则设置 shouldStop 为 true,并对数据进行转换后向下传递。
优点:
逻辑简单,易于理解和实现。不需要中断 pipeline,避免了潜在的错误。
缺点:
Jenni AI
使用最先进的 AI 写作助手为您的写作增光添彩。
48 查看详情
读取流会一直读取到文件末尾,可能会浪费一些资源。
方案二:使用 AbortController 中止 Pipeline
AbortController 提供了一种更优雅的方式来中止 pipeline,并且可以进行清理工作。通过 AbortController,可以向 pipeline 发送一个中止信号,pipeline 会在完成当前数据块的处理后停止。
以下是示例代码:
const { Transform } = require("node:stream");const { pipeline } = require("node:stream/promises");const fs = require("node:fs");const firstStream = fs.createReadStream("./lg.txt");const ac = new AbortController();const signal = ac.signal;const secondStream = new Transform({ transform(chunk, encoding, callback) { const text = chunk.toString(); const foundText = text.search("CHAPTER 9") !== -1; callback(null, text.toUpperCase()); if (foundText) { ac.abort(new Error("reading terminated, match found")); } },});const lastStream = process.stdout;pipeline(firstStream, secondStream, lastStream, { signal }).then(() => { console.log("nall done without match");}).catch((err) => { if (err.code === "ABORT_ERR") { console.log(`n${signal.reason.message}`); } else { console.log(err); }});
代码解释:
创建 AbortController 实例 ac,并获取其 signal 属性。在 secondStream 的 transform 函数中,如果检测到目标文本,则调用 ac.abort(new Error(“reading terminated, match found”)),发送中止信号。在调用 pipeline 函数时,将 signal 作为选项传递。使用 try…catch 捕获 pipeline 函数可能抛出的错误。如果错误代码为 ABORT_ERR,则表示 pipeline 被中止,可以获取中止原因。
优点:
更优雅地中止 pipeline,可以进行清理工作。可以获取中止原因,方便调试。
缺点:
代码相对复杂一些。
注意事项:
跨 Chunk 边界问题: 在搜索目标文本时,需要注意目标文本可能跨越 chunk 边界的情况。为了避免漏检,可以保留每个 chunk 的最后 N-1 个字符,并将其添加到下一个 chunk 的开头,其中 N 为目标文本的长度。错误处理: 在使用 pipeline 函数时,需要注意错误处理。可以使用 try…catch 语句捕获可能抛出的错误,并进行相应的处理。资源释放: 在中止 pipeline 后,需要确保所有资源都得到正确释放。
总结
本文介绍了两种在 NodeJS Streams 的 pipeline 中提前结束读取流的解决方案。第一种方案是在转换流中“吞噬”后续数据,逻辑简单,但可能会浪费一些资源。第二种方案是使用 AbortController 中止 pipeline,更优雅,可以进行清理工作,但代码相对复杂。选择哪种方案取决于具体的应用场景和需求。同时,需要注意跨 chunk 边界问题和错误处理,确保程序的稳定性和可靠性。
以上就是NodeJS Streams:在 Pipeline 中优雅地提前结束读取流的详细内容,更多请关注创想鸟其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/752406.html
微信扫一扫
支付宝扫一扫