XStream流
何时使用
- 当需要处理来自 API 的流数据,特别是服务器发送事件(SSE)时
- 当使用提供流式响应的 AI 服务时
- 当需要将二进制流转换为结构化数据时
- 当需要高效处理实时更新时
代码示例
默认协议(SSE)
XStream 的默认协议处理器是为服务器发送事件(SSE)设计的。它可以自动将 SSE 流解析为结构化数据。
自定义协议
XStream 通过转换函数支持自定义协议。您可以通过提供自己的转换逻辑来处理任何流格式。
代码演示
API#
XStream 静态方法
| 方法 | 说明 | 类型 | 默认值 |
|---|---|---|---|
| CreateAsync |
使用自定义选项创建流处理器 | XStreamOptions<TOutput> |
- |
| CreateAsync(stream, cancellationToken) | 使用默认 SSE 处理创建流处理器 | Stream |
- |
XStreamOptions
| 属性 | 说明 | 类型 | 默认值 |
|---|---|---|---|
| ReadableStream | 输入二进制流 | Stream |
- |
| TransformFunction | 自定义转换函数 | Func<IAsyncEnumerable<string>, IAsyncEnumerable<TOutput>>? |
null |
| Encoding | 文本解码编码 | Encoding |
Encoding.UTF8 |
| BufferSize | 读取缓冲区大小 | int |
4096 |
SSEOutput
| 属性 | 说明 | 类型 | 默认值 |
|---|---|---|---|
| Data | 事件的数据字段 | string? |
- |
| Event | 事件类型 | string? |
- |
| Id | 事件 ID | string? |
- |
| Retry | 重试时间(毫秒) | string? |
- |
扩展方法
| 方法 | 说明 | 类型 |
|---|---|---|
| ToListAsync | 将所有流项收集到列表中 | Task<List<T>> |
| WhereAsync | 基于谓词过滤流 | IAsyncEnumerable<T> |
| SelectAsync | 转换流中的每个项 | IAsyncEnumerable<TResult> |
设计考虑
- 使用现代 C# async enumerable 模式实现高效流处理
- 为 SSE 提供内置支持,同时支持其他协议的扩展
- 实现适当的错误处理和资源清理
- 支持长时间运行操作的取消
FAQ
什么时候应该使用自定义转换?
在以下情况使用自定义转换:
- 处理非 SSE 协议时
- 需要修改数据格式时
- 想要实现自定义解析逻辑时
如何处理错误?
XStream 提供多种错误处理机制:
- 为常见错误情况抛出特定异常
- 支持在流处理周围使用 try-catch 块
- 允许取消长时间运行的操作
是否线程安全?
XStream 设计用于异步操作,但对于同一流实例的并发访问不是线程安全的。每个流应该由单个消费者使用。