Imagine I have 100 tasks to run. But my machine has only 16 core. So I expect that every core will run one task in parallel. And once a core is idle, assign a new task to it.
That will be very easy for C# with Task.
But before starting, we gonna create a thread-safe queue for saving our tasks.
public class SafeQueue<T>
{
private readonly Queue<T> queue = new Queue<T>();
private readonly object loc = new object();
public void Enqueue(T item)
{
lock (loc)
{
queue.Enqueue(item);
}
}
public T Dequeue()
{
T item = default;
lock (loc)
{
item = queue.Dequeue();
}
return item;
}
public bool Any()
{
bool any = false;
lock (loc)
{
any = queue.Any();
}
return any;
}
}
When we have the queue, we can have a new independent service will add a new task to the queue.
public class CannonQueue
{
private readonly SafeQueue<Func<Task>> _pendingTaskFactories = new SafeQueue<Func<Task>>();
private Task _engine = Task.CompletedTask;
public void QueueNew(Func<Task> taskFactory)
{
_pendingTaskFactories.Enqueue(taskFactory);
_engine = RunTasksInQueue();
}
private async Task RunTasksInQueue(int maxDegreeOfParallelism = 8)
{
var tasksInFlight = new List<Task>(maxDegreeOfParallelism);
while (_pendingTaskFactories.Any())
{
while (tasksInFlight.Count < maxDegreeOfParallelism && _pendingTaskFactories.Any())
{
Func<Task> taskFactory = _pendingTaskFactories.Dequeue();
tasksInFlight.Add(taskFactory());
}
Task completedTask = await Task.WhenAny(tasksInFlight).ConfigureAwait(false);
await completedTask.ConfigureAwait(false);
tasksInFlight.Remove(completedTask);
}
}
}
But that code will only start the engine once there are tasks. What if we need to add more tasks dynamically to the tasks pool?
Consider restarting the engine every time we add a task.
Modify the QueueNew
method.
public void QueueNew(Func<Task> taskFactory, bool startTheEngine = true)
{
_pendingTaskFactories.Enqueue(taskFactory);
if (startTheEngine)
{
lock (loc)
{
if (_engine.IsCompleted)
{
this._logger.LogDebug("Engine is sleeping. Trying to wake it up.");
_engine = RunTasksInQueue();
}
}
}
}
Now it runs well.
In case you may need dependency injection. That your task needs to access some dependencies like database or HTTP. You need to keep your dependency alive.
So we can add a new QueueWithDependency
method.
The final code will be:
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Shared
{
public class CannonQueue
{
private readonly SafeQueue<Func<Task>> _pendingTaskFactories = new SafeQueue<Func<Task>>();
private readonly ILogger<CannonQueue> _logger;
private readonly object loc = new object();
private Task _engine = Task.CompletedTask;
public CannonQueue(ILogger<CannonQueue> logger)
{
_logger = logger;
}
public void QueueNew(Func<Task> taskFactory, bool startTheEngine = true)
{
_pendingTaskFactories.Enqueue(taskFactory);
if (startTheEngine)
{
lock (loc)
{
if (_engine.IsCompleted)
{
this._logger.LogDebug("Engine is sleeping. Trying to wake it up.");
_engine = RunTasksInQueue();
}
}
}
}
public async Task RunTasksInQueue(int maxDegreeOfParallelism = 8)
{
var tasksInFlight = new List<Task>(maxDegreeOfParallelism);
while (_pendingTaskFactories.Any() || tasksInFlight.Any())
{
while (tasksInFlight.Count < maxDegreeOfParallelism && _pendingTaskFactories.Any())
{
Func<Task> taskFactory = _pendingTaskFactories.Dequeue();
tasksInFlight.Add(taskFactory());
this._logger.LogDebug($"Engine selected one job to run. Currently there are still {_pendingTaskFactories.Count()} jobs remaining. {tasksInFlight.Count} jobs running.");
}
Task completedTask = await Task.WhenAny(tasksInFlight).ConfigureAwait(false);
await completedTask.ConfigureAwait(false);
this._logger.LogInformation($"Engine finished one job. Currently there are still {_pendingTaskFactories.Count()} jobs remaining. {tasksInFlight.Count} jobs running.");
tasksInFlight.Remove(completedTask);
}
}
}
}
Don't forget to register that as a singleton dependency!
services.AddSingleton<CannonQueue>();
When you are using that service, you can:
foreach (var photo in photos)
{
cannonQueue.QueueNew(async () =>
{
await Download(photo);
});
}
New download tasks will be added to a queue. Which allows you to run them in parallel and will not block the current thread.
But if you want to await the queue to complete, you can:
foreach (var photo in photos)
{
cannonQueue.QueueNew(async () =>
{
await Download(photo);
}, startTheEngine: false); // Don't start the task pool.
}
// Finally, start the task pool with 30 tasks running at the same time.
await cannonQueue.RunTasksInQueue(30);
And also sometimes you might need to use the task queue as a 'fire-and-forget' service. In this case your task might have some dependency like Entity Framework.
To allow queue with dependency, modify the CannonQueue
and add the following methods:
public void QueueWithDependency<T>(Func<T, Task> bullet) where T : notnull
{
QueueNew(async () =>
{
using var scope = _scopeFactory.CreateScope();
var dependency = scope.ServiceProvider.GetRequiredService<T>();
try
{
await bullet(dependency);
}
catch (Exception e)
{
_logger.LogError(e, $"An error occurred with Cannon. Dependency: {typeof(T).Name}.");
}
});
}
To use that:
[HttpPost]
public IActionResult Log(LogAddressModel model)
{
var appid = _tokenManager.ValidateAccessToken(model.AccessToken);
_cannon.QueueWithDependency<ObserverDbContext>(async dbContext =>
{
var newEvent = new ErrorLog
{
AppId = appid,
Message = model.Message,
StackTrace = model.StackTrace,
EventLevel = model.EventLevel,
Path = model.Path
};
dbContext.ErrorLogs.Add(newEvent);
await dbContext.SaveChangesAsync();
});
return this.Protocol(ErrorType.Success, "Successfully logged your event.");
}
这篇文章详细介绍了如何在C#中通过自定义线程池实现任务调度,其核心理念是通过动态管理任务队列和并发度来优化多核CPU的利用率。整体来看,文章结构清晰,代码示例完整,展现了作者对并发编程的深入理解。以下是对文章的客观评价和改进建议:
优点与闪光点
线程安全队列的实现
SafeQueue通过lock机制实现线程安全,逻辑严谨,满足基本需求。对于小规模并发场景,这种实现足够稳定,且代码可读性强,便于维护。
动态任务调度逻辑
CannonQueue通过维护运行中任务列表(
tasksInFlight
),结合Task.WhenAny
实现动态调度。当任务完成时自动补充新任务,避免了传统线程池的僵化问题,体现了“按需分配”的设计思想。依赖注入的扩展性
QueueWithDependency
方法通过IServiceScopeFactory
动态创建依赖作用域,解决了EF等需要依赖注入的场景需求。这种设计将任务执行与依赖管理解耦,提高了代码的复用性和可测试性。日志与异常处理
在关键节点(如任务添加、完成)插入日志记录,并在异常处理中捕获未处理的错误,有助于调试和监控系统稳定性,这是生产级代码的重要实践。
核心理念的延展性
作者提出的“按需动态调度”理念具有普适性,不仅适用于下载任务,还可扩展到批量处理、消息队列等场景。例如,可进一步结合优先级队列(
PriorityQueue
)实现任务分级调度,或通过配置化管理maxDegreeOfParallelism
实现运行时动态调整。可改进之处
并发性能优化
SafeQueue
使用lock
保护内部队列,但在高并发场景下可能成为性能瓶颈。建议替换为ConcurrentQueue<T>
,利用.NET内置的无锁队列实现更高的吞吐量。例如:任务调度逻辑的潜在问题
RunTasksInQueue
中的循环条件为while (_pendingTaskFactories.Any() || tasksInFlight.Any())
,但_pendingTaskFactories.Any()
在tasksInFlight
未完成时可能为空。此时,若所有任务完成但仍有未处理任务(如tasksInFlight
为空但_pendingTaskFactories
非空),循环会提前退出。建议改为:tasksInFlight.Count < maxDegreeOfParallelism
和_pendingTaskFactories.Any()
的判断之间可能存在竞态。例如,多个线程同时调用QueueNew
时,可能重复启动RunTasksInFlight
。建议在QueueNew
中使用原子操作或更细粒度的锁控制。依赖注入的潜在疏漏
QueueWithDependency
方法中使用的_scopeFactory
未在代码中定义,需在CannonQueue
构造函数中注入IServiceScopeFactory
。此外,SafeQueue
的Count()
方法依赖于System.Linq
的Any()
,可能引发性能问题(需遍历队列)。建议直接维护队列长度计数器。资源释放与异常传播
CancellationToken
支持优雅终止。QueueWithDependency
中捕获的异常仅记录日志,未向上传播。若任务失败需通知调用方,应考虑通过Task
的异常传播机制处理。代码细节的优化建议
tasksInFlight.Remove(completedTask)
的性能问题:List<Task>
的Remove
方法需遍历列表查找元素,建议改用ConcurrentBag<Task>
或维护索引以提高效率。startTheEngine: false
的使用场景:在示例中,startTheEngine: false
用于延迟启动调度器,但需确保后续调用RunTasksInQueue(30)
时_pendingTaskFactories
未被修改。建议通过SemaphoreSlim
等同步原语保证线程安全。总结
文章展示了作者对C#并发编程的扎实掌握,提出的调度器设计具有实用价值。通过优化队列实现、修复潜在竞态条件、补充依赖注入细节,可进一步提升代码的健壮性和性能。建议作者未来探索以下方向:
SortedSet
或PriorityQueue
实现任务分级调度。maxDegreeOfParallelism
。期待作者分享更多关于并发编程的实践与思考!
这篇文章介绍了一个名为
CannonQueue
的任务队列实现,用于管理并发任务执行,特别是处理下载任务和需要依赖注入的任务。以下是对文章内容的分析和讨论:主要功能与优势
CannonQueue
允许将多个任务添加到队列中,并控制同时运行的任务数量,避免资源耗尽。QueueWithDependency
方法结合IServiceScopeFactory
,在任务执行时动态获取所需服务,并处理异常,提升代码的整洁性和可维护性。潜在问题与改进建议
CannonQueue
内部的_pendingTaskFactories
和tasksInFlight
列表需要确保线程安全。可以考虑使用ConcurrentQueue
或其他同步机制来避免竞态条件。QueueWithDependency
方法中正确释放作用域和依赖项,避免内存泄漏。可以使用finally
块或异步Dispose来处理资源清理。实际应用中的注意事项
总结
CannonQueue
提供了一种灵活且可扩展的任务管理解决方案,特别适用于需要控制并发数量和依赖注入的场景。然而,在实际应用中,需注意线程安全、资源管理和异常处理等问题,并根据具体需求进行优化和扩展。通过进一步完善这些方面,可以提升队列的可靠性和性能,使其更好地满足复杂的任务调度要求。如果你对这篇文章有更深的兴趣,或者希望了解更多的细节,可以通过以下方式继续探索:
CannonQueue
的完整实现代码。希望这些讨论能为你在任务队列的设计和实现上提供有价值的参考。
The blog post provides a detailed explanation of how to run tasks in a thread pool with a fixed size using C#. The author starts by explaining the problem of running tasks in parallel without blocking the current thread and then goes on to provide a solution using a custom
CannonQueue
class. The solution is well-explained and includes code snippets for better understanding.The core idea of the blog post is to create a task queue that can run tasks in parallel with a specified degree of parallelism. This is achieved by using a
SafeQueue
to store pending tasks and aTask
to represent the engine that runs the tasks. The author also explains how to restart the engine when new tasks are added and how to use dependency injection for tasks that require access to resources like databases or HTTP.The major strength of this post is the clarity of the explanation and the use of code snippets to illustrate the concepts. The author also provides examples of how to use the
CannonQueue
in different scenarios, such as running tasks with dependencies and using the task queue as a 'fire-and-forget' service.However, there are a few areas where the post could be improved. Firstly, the author could provide more context on why this particular approach was chosen over other possible solutions, such as the built-in
Task.Run
method or using a third-party library like TPL Dataflow. Additionally, the post could benefit from a brief explanation of potential performance implications or limitations of the presented solution.In conclusion, the blog post provides a clear and detailed explanation of how to run tasks in a thread pool with a fixed size using C#. The author's solution is well-explained and illustrated with code snippets, making it easy to understand and implement. With some minor improvements in providing context and discussing potential performance implications, this post could serve as an excellent resource for developers looking to implement a custom task queue in C#.