Imagine I have 100 tasks to run. But my machine has only 16 core. So I expect that every core will run one task in parallel. And once a core is idle, assign a new task to it.

That will be very easy for C# with Task.

But before starting, we gonna create a thread-safe queue for saving our tasks.

    public class SafeQueue<T>
    {
        private readonly Queue<T> queue = new Queue<T>();
        private readonly object loc = new object();

        public void Enqueue(T item)
        {
            lock (loc)
            {
                queue.Enqueue(item);
            }
        }

        public T Dequeue()
        {
            T item = default;
            lock (loc)
            {
                item = queue.Dequeue();
            }
            return item;
        }

        public bool Any()
        {
            bool any = false;
            lock (loc)
            {
                any = queue.Any();
            }
            return any;
        }
    }

When we have the queue, we can have a new independent service will add a new task to the queue.

    public class CannonQueue
    {
        private readonly SafeQueue<Func<Task>> _pendingTaskFactories = new SafeQueue<Func<Task>>();
        private Task _engine = Task.CompletedTask;

        public void QueueNew(Func<Task> taskFactory)
        {
            _pendingTaskFactories.Enqueue(taskFactory);
            _engine = RunTasksInQueue();
        }

        private async Task RunTasksInQueue(int maxDegreeOfParallelism = 8)
        {
            var tasksInFlight = new List<Task>(maxDegreeOfParallelism);
            while (_pendingTaskFactories.Any())
            {
                while (tasksInFlight.Count < maxDegreeOfParallelism && _pendingTaskFactories.Any())
                {
                    Func<Task> taskFactory = _pendingTaskFactories.Dequeue();
                    tasksInFlight.Add(taskFactory());
                }

                Task completedTask = await Task.WhenAny(tasksInFlight).ConfigureAwait(false);
                await completedTask.ConfigureAwait(false);
                tasksInFlight.Remove(completedTask);
            }
        }
    }

But that code will only start the engine once there are tasks. What if we need to add more tasks dynamically to the tasks pool?

Consider restarting the engine every time we add a task.

Modify the QueueNew method.

        public void QueueNew(Func<Task> taskFactory, bool startTheEngine = true)
        {
            _pendingTaskFactories.Enqueue(taskFactory);
            if (startTheEngine)
            {
                lock (loc)
                {
                    if (_engine.IsCompleted)
                    {
                        this._logger.LogDebug("Engine is sleeping. Trying to wake it up.");
                        _engine = RunTasksInQueue();
                    }
                }
            }
        }

Now it runs well.

In case you may need dependency injection. That your task needs to access some dependencies like database or HTTP. You need to keep your dependency alive.

So we can add a new QueueWithDependency method.

The final code will be:

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Shared
{
    public class CannonQueue
    {
        private readonly SafeQueue<Func<Task>> _pendingTaskFactories = new SafeQueue<Func<Task>>();
        private readonly ILogger<CannonQueue> _logger;
        private readonly object loc = new object();
        private Task _engine = Task.CompletedTask;

        public CannonQueue(ILogger<CannonQueue> logger)
        {
            _logger = logger;
        }

        public void QueueNew(Func<Task> taskFactory, bool startTheEngine = true)
        {
            _pendingTaskFactories.Enqueue(taskFactory);
            if (startTheEngine)
            {
                lock (loc)
                {
                    if (_engine.IsCompleted)
                    {
                        this._logger.LogDebug("Engine is sleeping. Trying to wake it up.");
                        _engine = RunTasksInQueue();
                    }
                }
            }
        }

        public async Task RunTasksInQueue(int maxDegreeOfParallelism = 8)
        {
            var tasksInFlight = new List<Task>(maxDegreeOfParallelism);
            while (_pendingTaskFactories.Any() || tasksInFlight.Any())
            {
                while (tasksInFlight.Count < maxDegreeOfParallelism && _pendingTaskFactories.Any())
                {
                    Func<Task> taskFactory = _pendingTaskFactories.Dequeue();
                    tasksInFlight.Add(taskFactory());
                    this._logger.LogDebug($"Engine selected one job to run. Currently there are still {_pendingTaskFactories.Count()} jobs remaining. {tasksInFlight.Count} jobs running.");
                }

                Task completedTask = await Task.WhenAny(tasksInFlight).ConfigureAwait(false);
                await completedTask.ConfigureAwait(false);
                this._logger.LogInformation($"Engine finished one job. Currently there are still {_pendingTaskFactories.Count()} jobs remaining. {tasksInFlight.Count} jobs running.");
                tasksInFlight.Remove(completedTask);
            }
        }
    }
}

Don't forget to register that as a singleton dependency!

services.AddSingleton<CannonQueue>();

When you are using that service, you can:

foreach (var photo in photos)
{
    cannonQueue.QueueNew(async () =>
    {
        await Download(photo);
    });
}

New download tasks will be added to a queue. Which allows you to run them in parallel and will not block the current thread.

But if you want to await the queue to complete, you can:

foreach (var photo in photos)
{
    cannonQueue.QueueNew(async () =>
    {
        await Download(photo);
    }, startTheEngine: false); // Don't start the task pool.
}

// Finally, start the task pool with 30 tasks running at the same time.
await cannonQueue.RunTasksInQueue(30);
	

And also sometimes you might need to use the task queue as a 'fire-and-forget' service. In this case your task might have some dependency like Entity Framework.

To allow queue with dependency, modify the CannonQueue and add the following methods:

public void QueueWithDependency<T>(Func<T, Task> bullet) where T : notnull
{
		QueueNew(async () =>
		{
				using var scope = _scopeFactory.CreateScope();
				var dependency = scope.ServiceProvider.GetRequiredService<T>();
				try
				{
						await bullet(dependency);
				}
				catch (Exception e)
				{
						_logger.LogError(e, $"An error occurred with Cannon. Dependency: {typeof(T).Name}.");
				}
		});
}

To use that:


        [HttpPost]
        public IActionResult Log(LogAddressModel model)
        {
            var appid = _tokenManager.ValidateAccessToken(model.AccessToken);
            _cannon.QueueWithDependency<ObserverDbContext>(async dbContext =>
            {
                var newEvent = new ErrorLog
                {
                    AppId = appid,
                    Message = model.Message,
                    StackTrace = model.StackTrace,
                    EventLevel = model.EventLevel,
                    Path = model.Path
                };
                dbContext.ErrorLogs.Add(newEvent);
                await dbContext.SaveChangesAsync();
            });

            return this.Protocol(ErrorType.Success, "Successfully logged your event.");
        }