ASP.NET Core: Pushing Data over a Live Connection

Today I am creating an application in ASP.NET Core 3.1.  The application needs to to continually push out updated information to any clients that are actively connected.  There are a few possible ways to do this.  I have decided to use PushStreamContent for this.

With this class I can have an open HTTP connection over which data can be arbitrarily pushed.  The stream itself is raw.  I could push out text, binary data, or anything else that I wish to serialize.  This means that any parsing and interpretation of data is my responsibility, but that is totally fine for this purpose.  But how do I use PushStreamContent to accomplish this?

I will start by creating a new ASP.NET Core project.

ASP.netCoreNewProject

For the ASP.NET Core Project type there are several paths to success.  The one that I am taking is not the only “correct” one.  I have chosen the template for “Web Application (Model-View-Controller)”.  A default web application is created and a basic HTML UI is put in place.

ASP.NETCoreProjectType

Now that the project is created, there are a few configuration items that we need to handle.  Startup.cs requires some updates since we are going to have WebAPI classes in this project.

ASP.NETCoreMVCProjectFiles

Within Startup.cs there is a method named ConfigureServices.

public void ConfigureServices(IServiceCollection services)
{
    services.AddControllersWithViews();
}

In this method an additional line is needed to support WebApiConventions.

public void ConfigureServices(IServiceCollection services)
{
    services.AddControllersWithViews();
    services.AddMvc().AddWebApiConventions();
}

For the sake of development the CORS headers for the application also need some updating.  Further down in Startup.cs is a method named Configure.

app.UseCors(builder =>  builder.AllowAnyHeader().AllowAnyOrigin().AllowAnyMethod());

If you are using Visual Studio as an editor you might see that AddApiWebConventions() has a red underline and an error about that method being undefined.  To resolve this there is a NuGet packet to add to the project.  Right-click on Dependencies, select “NuGet Package Manager.” Click on the “Browse” tab and search for a package named Microsoft.AspNetCore.Mvc.WebApiCompactShim.

Microsoft.aspNetCore.Mvc.WebApiCompactShim

After that is added the basic configuration is complete.  Now to add code.

Right-click on the Controllers folder and select “Add New”, “Controller.”  For the controller type select “API Controller – Empty.”  For the controller name I have selected “StreamUpdateController.”  There are three members that will need to be on the controller.

  • Get(HttpRequestMessage) – the way that a client will subscribe to the updates.  They come through an HTTP Get.
  • OnStreamAvailable(Strea, HttpContent, TransportContext) – this is a static method that is called when the response stream is ready to receive data.  Here it only needs to add a client to the collection of subscribers.
  • Clients – a collection of the currently attached clients subscribing to the message.

For the collection of clients an auto-generated property is sufficient.  A single line of code takes care of its implementation.

private static ConcurrentBag Clients { get; } = new ConcurrentBag();

The Get() method adds the new connection to the collection of connections and sets the response code and mime type.  The mime type used here is text/event-stream.

public HttpResponseMessage Get(HttpRequestMessage request)
{
    const String RESPONSE_TYPE = "text/event-stream";
    var response = new HttpResponseMessage(HttpStatusCode.Accepted)
    {
        Content = new PushStreamContent((a, b, c) =>
        { OnStreamAvailable(a, b, c); }, RESPONSE_TYPE)
    };
    response.Content.Headers.ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue(RESPONSE_TYPE);
    return response;
}

The static method, OnStreamAvailable, only has a couple of lines in which it adds the stream to the collection with a StreamWriter wrapped around it.

private void OnStreamAvailable(Stream stream, HttpContent content,
    TransportContext context)
{
    var client = new StreamWriter(stream);
    clients.Add(client);
}

As of yet there is not anything being written to the streams.  To keep this sample simple all clients will receive the same data to their streams.  Everything is in place to start writing data out to clients.  I will use a static timer deciding when to write something to the stream.  Every time its period has elapsed all clients connected to the stream will receive a message.

        private static System.Timers.Timer writeTimer = new System.Timers.Timer() { Interval = 1000, AutoReset = true };

        static StreamUpdateController()
        {
            writeTimer.Start();
            writeTimer.Elapsed += TimerElapsed;
            writeTimer.Start();
        }

The handler for the timer’s elapsed event will get the current date and print it to all client streams.

static async void TimerElapsed(object sender, ElapsedEventArgs args)
{
    var dataToWrite = DateTime.Now.ToString();
    foreach(var clientStream in Clients)
    {
        await clientStream.WriteLineAsync(dataToWrite);
    }
}

You can now test the stream out.  If you run the project and navigate to https://localhost:yourPort/api/StreamUpdate you can see the data being written…

…well, kind of.  The browser window is blank.  Something clearly has gone wrong, because there is no data here.  Maybe there was some type of error and we are not getting an error message.  Or perhaps the problems was caused by… wait a second, there is information on the screen.  What took so long?

The “problem” here is that writes are buffered.  This normally is not a problem and can actually contribute to performance improvements.  But we need for that buffer to be transmitted when a complete message is within it.  To resolve this the stream can be requested to clear its buffer. Stream.Flush() and Stream.FlushAsync() will do this.  The updated TimerElapsed method now looks like the following.

static async void TimerElapsed(object sender, ElapsedEventArgs args)
{
    var dataToWrite = DateTime.Now.ToString();
    foreach(var clientStream in Clients)
    {
        await clientStream.WriteLineAsync(dataToWrite);
        await clientStream.FlushAsync();

    }
}

Run the program again and open the URL to now see the messages rendered to the screen every second.  But how does a client receive this?

While it is easy for .Net programs to interact with this, I wanted to consume the data from HTML/JavaScript.  Provided that you use a certain data format there is an object named EventSource that makes interacting with the output from this stream easy.  I have not used that format.  The option available to me is to use XmlHttpRequest.  If you have used XmlHttpRequest before, then you know that after a request is complete an event is raised making the completed data available to you.  That does not help for getting the chunks of available data.  The object has another event of help, onprogress.

When onprogress is fired the responseText member has the accumulated data.  The length field indicates how long the accumulated data is.  Every time the progress event is raised the new characters added to the string can be examined to grab the next chunk of data.

function xmlListen() {
    var xhr = new XMLHttpRequest();
    var last_index = 0;
    xhr.open("GET", "/api/Stream/", true);
    //xhr.setRequestHeader("Content-Type", "text/event-stream");
    xhr.onload = function (e) {
        console.log(`readystate ${xhr.readyState}`);
        if (xhr.readyState === 4) {
            if (xhr.status >=200 && xhr.status <= 299) {                                  console.log(xhr.responseText);             } else {                 console.error(xhr.statusText);             }         }     };     xhr.onprogress = (p) => {

        var curr_index = xhr.responseText.length;
        if (last_index == curr_index) return;
        var s = xhr.responseText.substring(last_index, curr_index);
        last_index = curr_index;
        console.log("PROGRESS:", s);
    }
    xhr.onerror = function (e) {
        console.error(xhr.statusText);
    };
    xhr.send(null);
}

This works.  But I do have some questions about this implementation.  The XmlHttpRequest‘s responseText object appears to be just growing without end.  For smaller data sizes this might not be a big deal, but since I work on some HTML applications that may run for over 24 hours it could lead to unnecessarily increased memory pressure.  That is not desirable.  Let us go back to EventSource

The data being written is not in the right format for EventSource.  To make the adjustment, the data must be proceeded by the string “data: ” and followed by two new line characters.  That is it.  A downside of conforming to something compatible with EventSource is that all data must be expressible as text.

static async void TimerElapsed(object sender, ElapsedEventArgs args)
{
    var dataToWrite = $"data: {DateTime.Now.ToString()}\n\n";
    foreach(var clientStream in Clients)
    {
        await clientStream.WriteLineAsync(dataToWrite);
        await clientStream.FlushAsync();
    }
}

On the client side the following will create an EventSource that receives those messages.

function startEventSource() {
    var source = new EventSource('/api/Push/');
    source.onmessage = (message) => {
        console.log(message.id, message.data);
    }
    source.onerror = (e) => {
        console.error(e);
    }
    source.onopen = () => {
        console.log('opened');
    }
}

I would still like to be able to stream data less constrained through the same service.  To do this instead of having a collection of StreamWriter objects I have made a class to hold the streams along with another attribute that indicates the format in which data should be.  A client can specify a format through a query parameter.

enum StreamFormat
{
    Text,
    Binary
}
class Client
{
    public Client(Stream s, StreamFormat f)
    {
        this.Stream = s;
        this.Writer = new StreamWriter(s);
        this.Format = f;
    }

    public Stream Stream { get;  }
    public StreamWriter Writer { get; }
    public StreamFormat Format { get; }

}

public HttpResponseMessage Get(HttpRequestMessage request)
{
    var format = request.RequestUri.ParseQueryString()["format"] ?? "text";
    const String RESPONSE_TYPE = "text/event-stream";
    HttpResponseMessage response;
    if (format.Equals("binary"))
    {
        response = new HttpResponseMessage(HttpStatusCode.Accepted)
        {
            Content = new PushStreamContent((a, b, c) =>
            { OnBinaryStreamAvailable(a, b, c); }, RESPONSE_TYPE)
        };
    }
    else
    {
        response = new HttpResponseMessage(HttpStatusCode.Accepted)
        {
            Content = new PushStreamContent((a, b, c) =>
            { OnStreamAvailable(a, b, c); }, RESPONSE_TYPE)
        };
    }           
    return response;
}

static void OnStreamAvailable(Stream stream, HttpContent content, TransportContext context)
{                        
    Clients.Add(new Client(stream, StreamFormat.Text));
}

static void OnBinaryStreamAvailable(Stream stream, HttpContent content, TransportContext context)
{
    Clients.Add(new Client(stream, StreamFormat.Binary));
}

        static async void TimerElapsed(object sender, ElapsedEventArgs args)
        {
            var data = new byte[] { 0x01, 0x02, 0x03, 0x04, 0x10, 0x40 };

            List unsubscribeList = new List();
            foreach(var client in Clients)
            {
                try
                {
                    if (client.Format == StreamFormat.Binary)
                        await client.Stream.WriteAsync(data, 0, data.Length);
                    else 
                        await client.Writer.WriteLineAsync($"data: {ByteArrayToString(data)}\n\n");
                    await client.Writer.FlushAsync();
                } catch(Exception exc)
                {
                    unsubscribeList.Add(client);
                }
            }
            Clients.RemoveAll((i) => unsubscribeList.Contains(i));
        }
twitterLogofacebookLogoyoutubeLogoInstagram Logo

Linked In

 

 

 


Posts may contain products with affiliate links. When you make purchases using these links, we receive a small commission at no extra cost to you. Thank you for your support.

One thought on “ASP.NET Core: Pushing Data over a Live Connection

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.