XStream

A powerful stream processing component that transforms binary streams to structured data, with built-in support for Server-Sent Events (SSE).

When To Use

  • When you need to process streaming data from APIs, especially Server-Sent Events (SSE)
  • When working with AI services that provide streaming responses
  • When you need to transform binary streams into structured data
  • When you want to handle real-time updates efficiently

Examples

Default Protocol (SSE)

The default protocol handler in XStream is designed for Server-Sent Events (SSE). It automatically parses SSE streams into structured data.

Custom Protocol

XStream supports custom protocols through transformation functions. You can process any streaming format by providing your own transformation logic.

Examples

SSE - https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events

XStream provides built-in support for SSE protocol streaming. You can use XStream.CreateAsync(stream) directly to handle SSE streams, where stream can be any instance implementing Stream, such as the response stream from HttpClient: await httpClient.GetStreamAsync(url).

In this example, we use MemoryStream to simulate an SSE stream. In real applications, you might use it like this:

using var httpClient = new HttpClient();
var stream = await httpClient.GetStreamAsync("https://api.example.com/events");
await foreach (var output in XStream.CreateAsync(stream))
{
    // Handle SSE events
    Console.WriteLine($"Event: {output.Event}, Data: {output.Data}");
}
expand code expand code

In this example, we will demonstrate parsing the SIP protocol, which is commonly used for P2P audio and video session initiation.

Pass in a transformStream stream transformer; this parameter accepts a new TransformStream(...) instance.

expand code expand code

API#

XStream Static Methods

Method Description Type Default
CreateAsync(options, cancellationToken) Creates a stream processor with custom options XStreamOptions<TOutput> -
CreateAsync(stream, cancellationToken) Creates a stream processor with default SSE handling Stream -

XStreamOptions

Property Description Type Default
ReadableStream The input binary stream Stream -
TransformFunction Custom transformation function Func<IAsyncEnumerable<string>, IAsyncEnumerable<TOutput>>? null
Encoding Text encoding for decoding Encoding Encoding.UTF8
BufferSize Buffer size for reading int 4096

SSEOutput

Property Description Type Default
Data The data field of the event string? -
Event The event type string? -
Id The event ID string? -
Retry The retry time in milliseconds string? -

Extension Methods

Method Description Type
ToListAsync Collects all stream items into a list Task<List<T>>
WhereAsync Filters the stream based on a predicate IAsyncEnumerable<T>
SelectAsync Transforms each item in the stream IAsyncEnumerable<TResult>

Design Considerations

  • Uses modern C# async enumerable patterns for efficient streaming
  • Provides built-in support for SSE with extensibility for other protocols
  • Implements proper error handling and resource cleanup
  • Supports cancellation for long-running operations

FAQ

When should I use custom transformation?

Use custom transformation when:

  • Working with non-SSE protocols
  • Need to modify the data format
  • Want to implement custom parsing logic

How to handle errors?

XStream provides several error handling mechanisms:

  • Throws specific exceptions for common error cases
  • Supports try-catch blocks around stream processing
  • Allows cancellation of long-running operations

Is it thread-safe?

XStream is designed for async operations but is not thread-safe for concurrent access to the same stream instance. Each stream should be consumed by a single consumer.

ThoughtChain XRequest