XStream
A powerful stream processing component that transforms binary streams to structured data, with built-in support for Server-Sent Events (SSE).
When To Use
- When you need to process streaming data from APIs, especially Server-Sent Events (SSE)
- When working with AI services that provide streaming responses
- When you need to transform binary streams into structured data
- When you want to handle real-time updates efficiently
Examples
Default Protocol (SSE)
The default protocol handler in XStream is designed for Server-Sent Events (SSE). It automatically parses SSE streams into structured data.
Custom Protocol
XStream supports custom protocols through transformation functions. You can process any streaming format by providing your own transformation logic.
Examples
API#
XStream Static Methods
Method | Description | Type | Default |
---|---|---|---|
CreateAsync |
Creates a stream processor with custom options | XStreamOptions<TOutput> |
- |
CreateAsync(stream, cancellationToken) | Creates a stream processor with default SSE handling | Stream |
- |
XStreamOptions
Property | Description | Type | Default |
---|---|---|---|
ReadableStream | The input binary stream | Stream |
- |
TransformFunction | Custom transformation function | Func<IAsyncEnumerable<string>, IAsyncEnumerable<TOutput>>? |
null |
Encoding | Text encoding for decoding | Encoding |
Encoding.UTF8 |
BufferSize | Buffer size for reading | int |
4096 |
SSEOutput
Property | Description | Type | Default |
---|---|---|---|
Data | The data field of the event | string? |
- |
Event | The event type | string? |
- |
Id | The event ID | string? |
- |
Retry | The retry time in milliseconds | string? |
- |
Extension Methods
Method | Description | Type |
---|---|---|
ToListAsync | Collects all stream items into a list | Task<List<T>> |
WhereAsync | Filters the stream based on a predicate | IAsyncEnumerable<T> |
SelectAsync | Transforms each item in the stream | IAsyncEnumerable<TResult> |
Design Considerations
- Uses modern C# async enumerable patterns for efficient streaming
- Provides built-in support for SSE with extensibility for other protocols
- Implements proper error handling and resource cleanup
- Supports cancellation for long-running operations
FAQ
When should I use custom transformation?
Use custom transformation when:
- Working with non-SSE protocols
- Need to modify the data format
- Want to implement custom parsing logic
How to handle errors?
XStream provides several error handling mechanisms:
- Throws specific exceptions for common error cases
- Supports try-catch blocks around stream processing
- Allows cancellation of long-running operations
Is it thread-safe?
XStream is designed for async operations but is not thread-safe for concurrent access to the same stream instance. Each stream should be consumed by a single consumer.