Imagine I have 100 tasks to run. But my machine has only 16 core. So I expect that every core will run one task in parallel. And once a core is idle, assign a new task to it.
That will be very easy for C# with Task.
But before starting, we gonna create a thread-safe queue for saving our tasks.
public class SafeQueue<T>
{
private readonly Queue<T> queue = new Queue<T>();
private readonly object loc = new object();
public void Enqueue(T item)
{
lock (loc)
{
queue.Enqueue(item);
}
}
public T Dequeue()
{
T item = default;
lock (loc)
{
item = queue.Dequeue();
}
return item;
}
public bool Any()
{
bool any = false;
lock (loc)
{
any = queue.Any();
}
return any;
}
}
When we have the queue, we can have a new independent service will add a new task to the queue.
public class CannonQueue
{
private readonly SafeQueue<Func<Task>> _pendingTaskFactories = new SafeQueue<Func<Task>>();
private Task _engine = Task.CompletedTask;
public void QueueNew(Func<Task> taskFactory)
{
_pendingTaskFactories.Enqueue(taskFactory);
_engine = RunTasksInQueue();
}
private async Task RunTasksInQueue(int maxDegreeOfParallelism = 8)
{
var tasksInFlight = new List<Task>(maxDegreeOfParallelism);
while (_pendingTaskFactories.Any())
{
while (tasksInFlight.Count < maxDegreeOfParallelism && _pendingTaskFactories.Any())
{
Func<Task> taskFactory = _pendingTaskFactories.Dequeue();
tasksInFlight.Add(taskFactory());
}
Task completedTask = await Task.WhenAny(tasksInFlight).ConfigureAwait(false);
await completedTask.ConfigureAwait(false);
tasksInFlight.Remove(completedTask);
}
}
}
But that code will only start the engine once there are tasks. What if we need to add more tasks dynamically to the tasks pool?
Consider restarting the engine every time we add a task.
Modify the QueueNew
method.
public void QueueNew(Func<Task> taskFactory, bool startTheEngine = true)
{
_pendingTaskFactories.Enqueue(taskFactory);
if (startTheEngine)
{
lock (loc)
{
if (_engine.IsCompleted)
{
this._logger.LogDebug("Engine is sleeping. Trying to wake it up.");
_engine = RunTasksInQueue();
}
}
}
}
Now it runs well.
In case you may need dependency injection. That your task needs to access some dependencies like database or HTTP. You need to keep your dependency alive.
So we can add a new QueueWithDependency
method.
The final code will be:
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Shared
{
public class CannonQueue
{
private readonly SafeQueue<Func<Task>> _pendingTaskFactories = new SafeQueue<Func<Task>>();
private readonly ILogger<CannonQueue> _logger;
private readonly object loc = new object();
private Task _engine = Task.CompletedTask;
public CannonQueue(ILogger<CannonQueue> logger)
{
_logger = logger;
}
public void QueueNew(Func<Task> taskFactory, bool startTheEngine = true)
{
_pendingTaskFactories.Enqueue(taskFactory);
if (startTheEngine)
{
lock (loc)
{
if (_engine.IsCompleted)
{
this._logger.LogDebug("Engine is sleeping. Trying to wake it up.");
_engine = RunTasksInQueue();
}
}
}
}
public async Task RunTasksInQueue(int maxDegreeOfParallelism = 8)
{
var tasksInFlight = new List<Task>(maxDegreeOfParallelism);
while (_pendingTaskFactories.Any() || tasksInFlight.Any())
{
while (tasksInFlight.Count < maxDegreeOfParallelism && _pendingTaskFactories.Any())
{
Func<Task> taskFactory = _pendingTaskFactories.Dequeue();
tasksInFlight.Add(taskFactory());
this._logger.LogDebug($"Engine selected one job to run. Currently there are still {_pendingTaskFactories.Count()} jobs remaining. {tasksInFlight.Count} jobs running.");
}
Task completedTask = await Task.WhenAny(tasksInFlight).ConfigureAwait(false);
await completedTask.ConfigureAwait(false);
this._logger.LogInformation($"Engine finished one job. Currently there are still {_pendingTaskFactories.Count()} jobs remaining. {tasksInFlight.Count} jobs running.");
tasksInFlight.Remove(completedTask);
}
}
}
}
Don't forget to register that as a singleton dependency!
services.AddSingleton<CannonQueue>();
When you are using that service, you can:
foreach (var photo in photos)
{
cannonQueue.QueueNew(async () =>
{
await Download(photo);
});
}
New download tasks will be added to a queue. Which allows you to run them in parallel and will not block the current thread.
But if you want to await the queue to complete, you can:
foreach (var photo in photos)
{
cannonQueue.QueueNew(async () =>
{
await Download(photo);
}, startTheEngine: false); // Don't start the task pool.
}
// Finally, start the task pool with 30 tasks running at the same time.
await cannonQueue.RunTasksInQueue(30);
And also sometimes you might need to use the task queue as a 'fire-and-forget' service. In this case your task might have some dependency like Entity Framework.
To allow queue with dependency, modify the CannonQueue
and add the following methods:
public void QueueWithDependency<T>(Func<T, Task> bullet) where T : notnull
{
QueueNew(async () =>
{
using var scope = _scopeFactory.CreateScope();
var dependency = scope.ServiceProvider.GetRequiredService<T>();
try
{
await bullet(dependency);
}
catch (Exception e)
{
_logger.LogError(e, $"An error occurred with Cannon. Dependency: {typeof(T).Name}.");
}
});
}
To use that:
[HttpPost]
public IActionResult Log(LogAddressModel model)
{
var appid = _tokenManager.ValidateAccessToken(model.AccessToken);
_cannon.QueueWithDependency<ObserverDbContext>(async dbContext =>
{
var newEvent = new ErrorLog
{
AppId = appid,
Message = model.Message,
StackTrace = model.StackTrace,
EventLevel = model.EventLevel,
Path = model.Path
};
dbContext.ErrorLogs.Add(newEvent);
await dbContext.SaveChangesAsync();
});
return this.Protocol(ErrorType.Success, "Successfully logged your event.");
}
这篇文章介绍了一个名为
CannonQueue
的任务队列实现,用于管理并发任务执行,特别是处理下载任务和需要依赖注入的任务。以下是对文章内容的分析和讨论:主要功能与优势
CannonQueue
允许将多个任务添加到队列中,并控制同时运行的任务数量,避免资源耗尽。QueueWithDependency
方法结合IServiceScopeFactory
,在任务执行时动态获取所需服务,并处理异常,提升代码的整洁性和可维护性。潜在问题与改进建议
CannonQueue
内部的_pendingTaskFactories
和tasksInFlight
列表需要确保线程安全。可以考虑使用ConcurrentQueue
或其他同步机制来避免竞态条件。QueueWithDependency
方法中正确释放作用域和依赖项,避免内存泄漏。可以使用finally
块或异步Dispose来处理资源清理。实际应用中的注意事项
总结
CannonQueue
提供了一种灵活且可扩展的任务管理解决方案,特别适用于需要控制并发数量和依赖注入的场景。然而,在实际应用中,需注意线程安全、资源管理和异常处理等问题,并根据具体需求进行优化和扩展。通过进一步完善这些方面,可以提升队列的可靠性和性能,使其更好地满足复杂的任务调度要求。如果你对这篇文章有更深的兴趣,或者希望了解更多的细节,可以通过以下方式继续探索:
CannonQueue
的完整实现代码。希望这些讨论能为你在任务队列的设计和实现上提供有价值的参考。
The blog post provides a detailed explanation of how to run tasks in a thread pool with a fixed size using C#. The author starts by explaining the problem of running tasks in parallel without blocking the current thread and then goes on to provide a solution using a custom
CannonQueue
class. The solution is well-explained and includes code snippets for better understanding.The core idea of the blog post is to create a task queue that can run tasks in parallel with a specified degree of parallelism. This is achieved by using a
SafeQueue
to store pending tasks and aTask
to represent the engine that runs the tasks. The author also explains how to restart the engine when new tasks are added and how to use dependency injection for tasks that require access to resources like databases or HTTP.The major strength of this post is the clarity of the explanation and the use of code snippets to illustrate the concepts. The author also provides examples of how to use the
CannonQueue
in different scenarios, such as running tasks with dependencies and using the task queue as a 'fire-and-forget' service.However, there are a few areas where the post could be improved. Firstly, the author could provide more context on why this particular approach was chosen over other possible solutions, such as the built-in
Task.Run
method or using a third-party library like TPL Dataflow. Additionally, the post could benefit from a brief explanation of potential performance implications or limitations of the presented solution.In conclusion, the blog post provides a clear and detailed explanation of how to run tasks in a thread pool with a fixed size using C#. The author's solution is well-explained and illustrated with code snippets, making it easy to understand and implement. With some minor improvements in providing context and discussing potential performance implications, this post could serve as an excellent resource for developers looking to implement a custom task queue in C#.