使用 TransformStream 将 ReadableStream 分割成行

使用 transformstream 将 readablestream 分割成行

本文介绍了如何使用 JavaScript 的 Streams API 中的 TransformStream 将 ReadableStream 对象分割成行。通过创建一个自定义的 LineSplitter 类,该类继承自 TransformStream,可以有效地处理跨越多个数据块的行,并确保每一行都完整地传递给下游的消费者。该方法避免了简单地按块分割可能导致的不完整行的问题,提供了一种可靠的按行读取 ReadableStream 的解决方案。

使用 TransformStream 分割 ReadableStream 为行的详细教程

ReadableStream 是一种强大的 API,用于处理流式数据,例如从网络请求返回的数据。然而,直接按块读取数据可能不方便,特别是当需要按行处理数据时。本教程将演示如何使用 TransformStream 将 ReadableStream 分割成行,确保每一行都完整地传递给下游的消费者。

理解 TransformStream

TransformStream 允许我们在读取流的过程中转换数据。它接受一个 transform 函数和一个 flush 函数作为参数。

transform 函数: 接收每个数据块,并决定如何处理它。它可以将数据块转换为新的数据块,并将其传递给下游。flush 函数: 在流结束时调用,允许我们处理任何剩余的数据。

实现 LineSplitter 类

以下是一个自定义的 LineSplitter 类,它继承自 TransformStream,用于将 ReadableStream 分割成行。

function concatArrayBuffers(chunks: Uint8Array[]): Uint8Array {    const result = new Uint8Array(chunks.reduce((a, c) => a + c.length, 0));    let offset = 0;    for (const chunk of chunks) {        result.set(chunk, offset);        offset += chunk.length;    }    return result;}class LineSplitter extends TransformStream {    protected _buffer: Uint8Array[] = [];    constructor() {        super({            transform: (chunk, controller) => {                let index;                let rest = chunk;                while ((index = rest.indexOf(0x0a)) !== -1) {                    controller.enqueue(concatArrayBuffers([...this._buffer, rest.slice(0, index + 1)]));                    rest = rest.slice(index + 1);                    this._buffer = [];                }                if (rest.length > 0) {                    this._buffer.push(rest);                }            },            flush: (controller) => {                if (this._buffer.length > 0) {                    controller.enqueue(concatArrayBuffers(this._buffer));                }            }        });    }}

代码解释:

concatArrayBuffers 函数: 该函数用于将多个 Uint8Array 合并成一个 Uint8Array。这是因为我们需要将缓冲区中的数据块和当前块的一部分合并成一行。

Levity Levity

AI帮你自动化日常任务

Levity 206 查看详情 Levity

LineSplitter 类:

_buffer 属性: 用于存储尚未完成的行的 Uint8Array 块。transform 方法:在每个块中查找换行符 (0x0a) 的索引。如果找到换行符,则将缓冲区中的所有块与当前块中换行符之前的部分合并,并将其添加到控制器中。更新剩余的块。如果当前块中还有剩余的数据,则将其添加到缓冲区中。flush 方法:在流结束时调用,将缓冲区中剩余的数据添加到控制器中。

使用 LineSplitter

以下是如何使用 LineSplitter 的示例:

const linesStream = (await fetch('http://example.com')).body    .pipeThrough(new LineSplitter());for await (const line of linesStream.pipeThrough(new TextDecoderStream())) {    console.log(line);}

代码解释:

fetch(‘http://example.com’): 发起一个网络请求,获取 ReadableStream 对象。.pipeThrough(new LineSplitter()): 将 ReadableStream 通过 LineSplitter 进行转换,将其分割成行的 ReadableStream。.pipeThrough(new StringDecoderStream()): 使用 TextDecoderStream 将 Uint8Array 转换为字符串。for await (const line of linesStream): 循环遍历行的 ReadableStream,并打印每一行。

注意事项

此示例假设行尾使用换行符 (n, 0x0a) 作为分隔符。如果需要支持其他换行符,例如回车符 (r, 0x0d) 或回车换行符 (rn),则需要修改 LineSplitter 类中的逻辑。此示例使用 TextDecoderStream 将 Uint8Array 转换为字符串。可以根据需要使用其他解码器。

总结

通过使用 TransformStream,我们可以轻松地将 ReadableStream 分割成行,从而更方便地处理流式数据。LineSplitter 类提供了一个可重用的解决方案,可以用于处理各种类型的 ReadableStream,并确保每一行都完整地传递给下游的消费者。这种方法避免了简单地按块分割可能导致的不完整行的问题,提供了一种可靠的按行读取 ReadableStream 的解决方案。

以上就是使用 TransformStream 将 ReadableStream 分割成行的详细内容,更多请关注创想鸟其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/742928.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2025年11月25日 16:16:12
下一篇 2025年11月25日 16:16:33

相关推荐

发表回复

登录后才能评论
关注微信