XStream流
何时使用
- 当需要处理来自 API 的流数据,特别是服务器发送事件(SSE)时
- 当使用提供流式响应的 AI 服务时
- 当需要将二进制流转换为结构化数据时
- 当需要高效处理实时更新时
代码示例
默认协议(SSE)
XStream 的默认协议处理器是为服务器发送事件(SSE)设计的。它可以自动将 SSE 流解析为结构化数据。
自定义协议
XStream 通过转换函数支持自定义协议。您可以通过提供自己的转换逻辑来处理任何流格式。
代码演示
API#
XStream 静态方法
方法 | 说明 | 类型 | 默认值 |
---|---|---|---|
CreateAsync |
使用自定义选项创建流处理器 | XStreamOptions<TOutput> |
- |
CreateAsync(stream, cancellationToken) | 使用默认 SSE 处理创建流处理器 | Stream |
- |
XStreamOptions
属性 | 说明 | 类型 | 默认值 |
---|---|---|---|
ReadableStream | 输入二进制流 | Stream |
- |
TransformFunction | 自定义转换函数 | Func<IAsyncEnumerable<string>, IAsyncEnumerable<TOutput>>? |
null |
Encoding | 文本解码编码 | Encoding |
Encoding.UTF8 |
BufferSize | 读取缓冲区大小 | int |
4096 |
SSEOutput
属性 | 说明 | 类型 | 默认值 |
---|---|---|---|
Data | 事件的数据字段 | string? |
- |
Event | 事件类型 | string? |
- |
Id | 事件 ID | string? |
- |
Retry | 重试时间(毫秒) | string? |
- |
扩展方法
方法 | 说明 | 类型 |
---|---|---|
ToListAsync | 将所有流项收集到列表中 | Task<List<T>> |
WhereAsync | 基于谓词过滤流 | IAsyncEnumerable<T> |
SelectAsync | 转换流中的每个项 | IAsyncEnumerable<TResult> |
设计考虑
- 使用现代 C# async enumerable 模式实现高效流处理
- 为 SSE 提供内置支持,同时支持其他协议的扩展
- 实现适当的错误处理和资源清理
- 支持长时间运行操作的取消
FAQ
什么时候应该使用自定义转换?
在以下情况使用自定义转换:
- 处理非 SSE 协议时
- 需要修改数据格式时
- 想要实现自定义解析逻辑时
如何处理错误?
XStream 提供多种错误处理机制:
- 为常见错误情况抛出特定异常
- 支持在流处理周围使用 try-catch 块
- 允许取消长时间运行的操作
是否线程安全?
XStream 设计用于异步操作,但对于同一流实例的并发访问不是线程安全的。每个流应该由单个消费者使用。