为了能更好地了解您的商业使用需求,请参与 Ant Design Blazor 商业应用调查,一起建设商业应用社区,为企业系统研发赋能!

XStream

何时使用

  • 当需要处理来自 API 的流数据,特别是服务器发送事件(SSE)时
  • 当使用提供流式响应的 AI 服务时
  • 当需要将二进制流转换为结构化数据时
  • 当需要高效处理实时更新时

代码示例

默认协议(SSE)

XStream 的默认协议处理器是为服务器发送事件(SSE)设计的。它可以自动将 SSE 流解析为结构化数据。

自定义协议

XStream 通过转换函数支持自定义协议。您可以通过提供自己的转换逻辑来处理任何流格式。

代码演示

SSE - https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events

XStream 默认支持 SSE 协议的流处理。你可以直接使用 XStream.CreateAsync(stream) 来处理 SSE 流,其中 stream 可以是任何实现了 Stream 的实例,比如 HttpClient 的响应流:await httpClient.GetStreamAsync(url)

示例中,我们使用 MemoryStream 模拟了一个 SSE 流,实际应用中你可能会这样使用:

using var httpClient = new HttpClient();
var stream = await httpClient.GetStreamAsync("https://api.example.com/events");
await foreach (var output in XStream.CreateAsync(stream))
{
    // 处理 SSE 事件
    Console.WriteLine($"Event: {output.Event}, Data: {output.Data}");
}
expand code expand code

在本示例中,我们将演示如何解析 SIP 协议, 该协议常用于 P2P 音视频会话协商。

传入 transformStream 流转换器,该参数需接收一个 new TransformStream(...) 实例。

expand code expand code

API#

XStream 静态方法

方法 说明 类型 默认值
CreateAsync(options, cancellationToken) 使用自定义选项创建流处理器 XStreamOptions<TOutput> -
CreateAsync(stream, cancellationToken) 使用默认 SSE 处理创建流处理器 Stream -

XStreamOptions

属性 说明 类型 默认值
ReadableStream 输入二进制流 Stream -
TransformFunction 自定义转换函数 Func<IAsyncEnumerable<string>, IAsyncEnumerable<TOutput>>? null
Encoding 文本解码编码 Encoding Encoding.UTF8
BufferSize 读取缓冲区大小 int 4096

SSEOutput

属性 说明 类型 默认值
Data 事件的数据字段 string? -
Event 事件类型 string? -
Id 事件 ID string? -
Retry 重试时间(毫秒) string? -

扩展方法

方法 说明 类型
ToListAsync 将所有流项收集到列表中 Task<List<T>>
WhereAsync 基于谓词过滤流 IAsyncEnumerable<T>
SelectAsync 转换流中的每个项 IAsyncEnumerable<TResult>

设计考虑

  • 使用现代 C# async enumerable 模式实现高效流处理
  • 为 SSE 提供内置支持,同时支持其他协议的扩展
  • 实现适当的错误处理和资源清理
  • 支持长时间运行操作的取消

FAQ

什么时候应该使用自定义转换?

在以下情况使用自定义转换:

  • 处理非 SSE 协议时
  • 需要修改数据格式时
  • 想要实现自定义解析逻辑时

如何处理错误?

XStream 提供多种错误处理机制:

  • 为常见错误情况抛出特定异常
  • 支持在流处理周围使用 try-catch 块
  • 允许取消长时间运行的操作

是否线程安全?

XStream 设计用于异步操作,但对于同一流实例的并发访问不是线程安全的。每个流应该由单个消费者使用。

ThoughtChain思维链 XRequest请求