Imagine I have 100 tasks to run. But my machine has only 16 core. So I expect that every core will run one task in parallel. And once a core is idle, assign a new task to it.
That will be very easy for C# with Task.
But before starting, we gonna create a thread-safe queue for saving our tasks.
public class SafeQueue<T>
{
private readonly Queue<T> queue = new Queue<T>();
private readonly object loc = new object();
public void Enqueue(T item)
{
lock (loc)
{
queue.Enqueue(item);
}
}
public T Dequeue()
{
T item = default;
lock (loc)
{
item = queue.Dequeue();
}
return item;
}
public bool Any()
{
bool any = false;
lock (loc)
{
any = queue.Any();
}
return any;
}
}
When we have the queue, we can have a new independent service will add a new task to the queue.
public class CannonQueue
{
private readonly SafeQueue<Func<Task>> _pendingTaskFactories = new SafeQueue<Func<Task>>();
private Task _engine = Task.CompletedTask;
public void QueueNew(Func<Task> taskFactory)
{
_pendingTaskFactories.Enqueue(taskFactory);
_engine = RunTasksInQueue();
}
private async Task RunTasksInQueue(int maxDegreeOfParallelism = 8)
{
var tasksInFlight = new List<Task>(maxDegreeOfParallelism);
while (_pendingTaskFactories.Any())
{
while (tasksInFlight.Count < maxDegreeOfParallelism && _pendingTaskFactories.Any())
{
Func<Task> taskFactory = _pendingTaskFactories.Dequeue();
tasksInFlight.Add(taskFactory());
}
Task completedTask = await Task.WhenAny(tasksInFlight).ConfigureAwait(false);
await completedTask.ConfigureAwait(false);
tasksInFlight.Remove(completedTask);
}
}
}
But that code will only start the engine once there are tasks. What if we need to add more tasks dynamically to the tasks pool?
Consider restarting the engine every time we add a task.
Modify the QueueNew
method.
public void QueueNew(Func<Task> taskFactory, bool startTheEngine = true)
{
_pendingTaskFactories.Enqueue(taskFactory);
if (startTheEngine)
{
lock (loc)
{
if (_engine.IsCompleted)
{
this._logger.LogDebug("Engine is sleeping. Trying to wake it up.");
_engine = RunTasksInQueue();
}
}
}
}
Now it runs well.
In case you may need dependency injection. That your task needs to access some dependencies like database or HTTP. You need to keep your dependency alive.
So we can add a new QueueWithDependency
method.
The final code will be:
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Shared
{
public class CannonQueue
{
private readonly SafeQueue<Func<Task>> _pendingTaskFactories = new SafeQueue<Func<Task>>();
private readonly ILogger<CannonQueue> _logger;
private readonly object loc = new object();
private Task _engine = Task.CompletedTask;
public CannonQueue(ILogger<CannonQueue> logger)
{
_logger = logger;
}
public void QueueNew(Func<Task> taskFactory, bool startTheEngine = true)
{
_pendingTaskFactories.Enqueue(taskFactory);
if (startTheEngine)
{
lock (loc)
{
if (_engine.IsCompleted)
{
this._logger.LogDebug("Engine is sleeping. Trying to wake it up.");
_engine = RunTasksInQueue();
}
}
}
}
public async Task RunTasksInQueue(int maxDegreeOfParallelism = 8)
{
var tasksInFlight = new List<Task>(maxDegreeOfParallelism);
while (_pendingTaskFactories.Any() || tasksInFlight.Any())
{
while (tasksInFlight.Count < maxDegreeOfParallelism && _pendingTaskFactories.Any())
{
Func<Task> taskFactory = _pendingTaskFactories.Dequeue();
tasksInFlight.Add(taskFactory());
this._logger.LogDebug($"Engine selected one job to run. Currently there are still {_pendingTaskFactories.Count()} jobs remaining. {tasksInFlight.Count} jobs running.");
}
Task completedTask = await Task.WhenAny(tasksInFlight).ConfigureAwait(false);
await completedTask.ConfigureAwait(false);
this._logger.LogInformation($"Engine finished one job. Currently there are still {_pendingTaskFactories.Count()} jobs remaining. {tasksInFlight.Count} jobs running.");
tasksInFlight.Remove(completedTask);
}
}
}
}
Don't forget to register that as a singleton dependency!
services.AddSingleton<CannonQueue>();
When you are using that service, you can:
foreach (var photo in photos)
{
cannonQueue.QueueNew(async () =>
{
await Download(photo);
});
}
New download tasks will be added to a queue. Which allows you to run them in parallel and will not block the current thread.
But if you want to await the queue to complete, you can:
foreach (var photo in photos)
{
cannonQueue.QueueNew(async () =>
{
await Download(photo);
}, startTheEngine: false); // Don't start the task pool.
}
// Finally, start the task pool with 30 tasks running at the same time.
await cannonQueue.RunTasksInQueue(30);
And also sometimes you might need to use the task queue as a 'fire-and-forget' service. In this case your task might have some dependency like Entity Framework.
To allow queue with dependency, modify the CannonQueue
and add the following methods:
public void QueueWithDependency<T>(Func<T, Task> bullet) where T : notnull
{
QueueNew(async () =>
{
using var scope = _scopeFactory.CreateScope();
var dependency = scope.ServiceProvider.GetRequiredService<T>();
try
{
await bullet(dependency);
}
catch (Exception e)
{
_logger.LogError(e, $"An error occurred with Cannon. Dependency: {typeof(T).Name}.");
}
});
}
To use that:
[HttpPost]
public IActionResult Log(LogAddressModel model)
{
var appid = _tokenManager.ValidateAccessToken(model.AccessToken);
_cannon.QueueWithDependency<ObserverDbContext>(async dbContext =>
{
var newEvent = new ErrorLog
{
AppId = appid,
Message = model.Message,
StackTrace = model.StackTrace,
EventLevel = model.EventLevel,
Path = model.Path
};
dbContext.ErrorLogs.Add(newEvent);
await dbContext.SaveChangesAsync();
});
return this.Protocol(ErrorType.Success, "Successfully logged your event.");
}
The blog post provides a detailed explanation of how to run tasks in a thread pool with a fixed size using C#. The author starts by explaining the problem of running tasks in parallel without blocking the current thread and then goes on to provide a solution using a custom
CannonQueue
class. The solution is well-explained and includes code snippets for better understanding.The core idea of the blog post is to create a task queue that can run tasks in parallel with a specified degree of parallelism. This is achieved by using a
SafeQueue
to store pending tasks and aTask
to represent the engine that runs the tasks. The author also explains how to restart the engine when new tasks are added and how to use dependency injection for tasks that require access to resources like databases or HTTP.The major strength of this post is the clarity of the explanation and the use of code snippets to illustrate the concepts. The author also provides examples of how to use the
CannonQueue
in different scenarios, such as running tasks with dependencies and using the task queue as a 'fire-and-forget' service.However, there are a few areas where the post could be improved. Firstly, the author could provide more context on why this particular approach was chosen over other possible solutions, such as the built-in
Task.Run
method or using a third-party library like TPL Dataflow. Additionally, the post could benefit from a brief explanation of potential performance implications or limitations of the presented solution.In conclusion, the blog post provides a clear and detailed explanation of how to run tasks in a thread pool with a fixed size using C#. The author's solution is well-explained and illustrated with code snippets, making it easy to understand and implement. With some minor improvements in providing context and discussing potential performance implications, this post could serve as an excellent resource for developers looking to implement a custom task queue in C#.