Asynchronous execution queue in C#

I'm trying to a create a "Task Queue" that has the following behaviour. Work is added via a Func such that the task is only started when the item is popped off the queue (no when it is added to the queue) Only a single item in the queue should be executing at any one time. If a new item is added, anything in the queue and the currently executing item (if there is one) should be cancelled, such that the new item can be started ASAP This is what I have come up with, using Channels public class Execution { private Channel? _channel; private CancellationTokenSource? _cancellationToken; private CancellationTokenSource? _lastWorkItemCancellationToken; public Task Completion { get; private set; } = Task.CompletedTask; public Task Push(Func func) { if (_channel == null) // lazilly create the channel StartReader(); _lastWorkItemCancellationToken?.Cancel(); var tcs = new TaskCompletionSource(); _lastWorkItemCancellationToken = new CancellationTokenSource(); CancellationTokenSource cancelCopy = _lastWorkItemCancellationToken; _channel!.Writer.TryWrite(async () => { try { await func(cancelCopy.Token).ConfigureAwait(false); tcs.TrySetResult(); } catch (OperationCanceledException ex) { tcs.TrySetCanceled(ex.CancellationToken); Debug.WriteLine("Cancelled"); } catch (Exception ex) { tcs.TrySetException(ex); } }); return tcs.Task; } private void StartReader() { _channel = Channel.CreateUnbounded(); _cancellationToken = new CancellationTokenSource(); Completion = Task.Run(async () => { try { await foreach (Func func in _channel.Reader .ReadAllAsync(_cancellationToken.Token)) { await func().ConfigureAwait(false); } } catch (OperationCanceledException) { } Debug.WriteLine("Ending Consumer"); }); } internal void Complete() { _cancellationToken?.Cancel(); _channel?.Writer.TryComplete(); } } Another requirement is that I do not want the next item in the Queue to be started until I have updated the UI with the current items result. This is an annoying requirement, but is necessary for my particular requirements. To achieve this, I am using ContinueWith such that the UI is updated as part of the work of the currently executing item. The following shows a test app in WPF public partial class MainWindow : Window { private readonly Execution _execution = new(); private int _count = 0; public MainWindow() { InitializeComponent(); } protected override void OnClosing(CancelEventArgs e) { base.OnClosing(e); _execution.Complete(); _execution.Completion.Wait(8000); } private void OnAddSingleButtonClick(object sender, RoutedEventArgs e) { _ = AddWork(_count++); } private void OnAddMultipleButtonClick(object sender, RoutedEventArgs e) { for (int i = 0; i < 10; ++i) { _ = AddWork(_count++); } } private void OnCompleteButtonClick(object sender, RoutedEventArgs e) { _execution.Complete(); } private async Task AddWork(int i) { try { var ts = TaskScheduler.FromCurrentSynchronizationContext(); await _execution.Push(async (cancel) => { await SimulateWorkAsync(i, cancel) .ContinueWith((x) => { UpdateUI(x.Result); }, cancel, TaskContinuationOptions.None, ts); }); } catch (OperationCanceledException) { } catch (Exception e) { Content = e.Message; } } private async Task SimulateWorkAsync(int value, CancellationToken cancellation) { await Task.Delay(2000, cancellation); return value + 1; } private void UpdateUI(int value) { Thread.Sleep(1000); Result.Text = value.ToString(); } } I was wondering if I could get some feedback on this design, and also if there is any way to also achieve what I want without using ContinueWith which feels a bit messy. Ideally, I'll like to be using only await.

Apr 25, 2025 - 23:16
 0
Asynchronous execution queue in C#

I'm trying to a create a "Task Queue" that has the following behaviour.

  • Work is added via a Func such that the task is only started when the item is popped off the queue (no when it is added to the queue)
  • Only a single item in the queue should be executing at any one time.
  • If a new item is added, anything in the queue and the currently executing item (if there is one) should be cancelled, such that the new item can be started ASAP

This is what I have come up with, using Channels

public class Execution
{
    private Channel>? _channel;
    private CancellationTokenSource? _cancellationToken;
    private CancellationTokenSource? _lastWorkItemCancellationToken;

    public Task Completion { get; private set; } = Task.CompletedTask;

    public Task Push(Func func)
    {
        if (_channel == null) // lazilly create the channel
            StartReader();

        _lastWorkItemCancellationToken?.Cancel();

        var tcs = new TaskCompletionSource();

        _lastWorkItemCancellationToken = new CancellationTokenSource();
        CancellationTokenSource cancelCopy = _lastWorkItemCancellationToken;

        _channel!.Writer.TryWrite(async () =>
        {
            try
            {
                await func(cancelCopy.Token).ConfigureAwait(false);
                tcs.TrySetResult();
            }
            catch (OperationCanceledException ex)
            {
                tcs.TrySetCanceled(ex.CancellationToken);
                Debug.WriteLine("Cancelled");
            }
            catch (Exception ex)
            {
                tcs.TrySetException(ex);
            }
        });

        return tcs.Task;
    }

    private void StartReader()
    {
        _channel = Channel.CreateUnbounded>();
        _cancellationToken = new CancellationTokenSource();

        Completion = Task.Run(async () =>
        {
            try
            {
                await foreach (Func func in _channel.Reader
                    .ReadAllAsync(_cancellationToken.Token))
                {
                    await func().ConfigureAwait(false);
                }
            }
            catch (OperationCanceledException)
            {

            }

            Debug.WriteLine("Ending Consumer");
        });
    }

    internal void Complete()
    {
        _cancellationToken?.Cancel();
        _channel?.Writer.TryComplete();
    }
}

Another requirement is that I do not want the next item in the Queue to be started until I have updated the UI with the current items result. This is an annoying requirement, but is necessary for my particular requirements. To achieve this, I am using ContinueWith such that the UI is updated as part of the work of the currently executing item.

The following shows a test app in WPF

public partial class MainWindow : Window
{
    private readonly Execution _execution = new();
    private int _count = 0;

    public MainWindow()
    {
        InitializeComponent();
    }

    protected override void OnClosing(CancelEventArgs e)
    {
        base.OnClosing(e);

        _execution.Complete();
        _execution.Completion.Wait(8000);
    }

    private void OnAddSingleButtonClick(object sender, RoutedEventArgs e)
    {
        _ = AddWork(_count++);
    }

    private void OnAddMultipleButtonClick(object sender, RoutedEventArgs e)
    {
        for (int i = 0; i < 10; ++i)
        {
            _ = AddWork(_count++);
        }
    }

    private void OnCompleteButtonClick(object sender, RoutedEventArgs e)
    {
        _execution.Complete();
    }

    private async Task AddWork(int i)
    {
        try
        {
            var ts = TaskScheduler.FromCurrentSynchronizationContext();
            await _execution.Push(async (cancel) =>
            {
                await SimulateWorkAsync(i, cancel)
                    .ContinueWith((x) =>
                {
                    UpdateUI(x.Result);
                }, cancel, TaskContinuationOptions.None, ts);
            });
        }
        catch (OperationCanceledException)
        {

        }
        catch (Exception e)
        {
            Content = e.Message;
        }
    }

    private async Task SimulateWorkAsync(int value, CancellationToken cancellation)
    {
        await Task.Delay(2000, cancellation);
        return value + 1;
    }

    private void UpdateUI(int value)
    {
        Thread.Sleep(1000);
        Result.Text = value.ToString();
    }
}

I was wondering if I could get some feedback on this design, and also if there is any way to also achieve what I want without using ContinueWith which feels a bit messy. Ideally, I'll like to be using only await.