Dotnet developers are well afforded with easy-to-use multithreading libraries within the .NET Framework. This is especially true in recent versions, most notably with the addition of async/await in C# 5.0. However, it is sometimes necessary to write bespoke logic to handle task scheduling in some exceptional cases.
Rather than writing our own logic from scratch, we can utilise the existing infrastructure for scheduling tasks. This approach will allow us to continue to make use of C# and .NET Task Pool functionality, such as async/await, but with our custom code handling the scheduling work in the background. This may sound complex, but it’s actually pretty straightforward to implement and makes the scheduler far more convenient.
Why bother?
In most cases, it won’t be necessary to implement your own scheduler. It’s normally far more practical to use functionality already built into .NET than to write your own bespoke implementation to achieve the same thing. However, in some cases custom scheduling logic is needed, especially for handling semi-critical tasks and complex task prioritisation. In these cases and others, the builtin scheduling logic simply won’t do, rendering it necessary to write custom code to do the job.
This can also be considered a useful educational exercise for those with some time on their hands, since it can teach you a lot about the internal workings of a scheduler. In this vein, we will cover writing a basic scheduler, leaving any more complex bespoke code required for specific cases as an exercise for the reader.
The Thread Pool
By far the most simple way of running tasks in parallel is with a Thread Pool. This is just a bunch of threads that continually pop and execute tasks from a single queue. Before we can implement the Thread Pool however, we need a suitable queue class to store tasks that are waiting to be run on the pool.
The Blocking Queue
The System.Collections.Generic.Queue<T>
class is our first thought here, however it lacks some of the functionality we need.
Firstly, it is not Thread Safe. This is a major problem, as we will potentially be pushing and popping on many different threads.
Secondly, the queue has no ‘blocking’ functionality. Blocking is extremely useful, as it allows us to wait (or block) on our worker threads if the thread pool’s queue is empty, meaning the threads will sleep until something is enqueued1.
The first of these issues can be solved quite conveniently with the synchronised wrapper provided. However, the second issue is not solved as easily, requiring a custom wrapper to be implemented. Fortunately, with the help of System.Threading
, this becomes quite trivial.
Let’s start by writing a boilerplate wrapper around the Queue<T>
class:
public sealed class BlockingQueue<T>
{
private readonly Queue<T> _queue = new Queue<T>();
public void Enqueue(T item)
{
_queue.Enqueue(item);
}
public T Dequeue()
{
return _queue.Dequeue();
}
}
This code merely wraps the Enqueue()
and Dequeue()
methods of the Queue class without adding any functionality to them.
The first issue we can address is the lack of thread safety. Simply adding a lock object and appropriate code blocks does the trick here:
public sealed class BlockingQueue<T>
{
private readonly Queue<T> _queue = new Queue<T>();
private readonly object _lock = new object();
public void Enqueue(T item)
{
lock(_lock)
{
_queue.Enqueue(item);
}
}
public T Dequeue()
{
lock(_lock)
{
return _queue.Dequeue();
}
}
}
This will ensure that no two threads can be enqueuing or dequeuing at the same time. If they tried, only one would be allowed to ‘acquire’ the lock (and thus be able to enter a lock region) and the other(s) would have to wait.2
Next, we must solve the more complex problem of blocking any dequeue operations when the queue is empty. Since we may have many dequeue operations and only want to allow at most as many dequeues as we have items in the queue, we can use a Semaphore to keep track of the items as resources consumed by the threads. This way, threads are only allowed to continue once there are enough resources in the pool (in this case items in the collection) to satisfy their requirements.
The semaphore handles all the complicated logic behind this operation, so that all we have to do is release resources after items are enqueued and consume them before dequeuing.
public sealed class BlockingQueue<T>
{
bool _disposed;
private readonly Queue<T> _queue = new Queue<T>();
private readonly Semaphore _pool = new Semaphore(0, int.MaxValue);
private readonly object _lock = new object();
public void Enqueue(T item)
{
lock (_lock)
{
_queue.Enqueue(item);
_pool.Release();
}
}
public T Dequeue()
{
_pool.WaitOne();
lock (_lock)
{
return _queue.Dequeue();
}
}
}
Strictly speaking the Semaphore used here should be disposed, meaning this class should implement IDisposable
, but this complicates the example code so it is left as an exercise for the reader.
Implementing the Thread Pool
Our next job is to implement our thread pool. We will derive functionality from the TaskScheduler
class to maintain compatibility with .NET’s task scheduling infrastructure. We will also implement IDisposable
to stop the thread pool when it is no longer required.
The key methods to look at are the QueueTask()
method used for queuing up a task to run on the thread pool, and the ExecuteTasks()
method, which is run on all of the worker threads to continuously dequeue tasks from the thread pool and execute them. The Run()
method enables easy queuing of tasks onto the thread pool.
When it is time to dispose of the thread pool, several null tasks are queued (as many as there are worker threads) and the threads are waited on until they finish. This ensures that currently queued tasks are allowed to run before the thread pool is disposed.
public class TaskThreadPool : TaskScheduler, IDisposable
{
private readonly BlockingQueue<Task> _queue = new BlockingQueue<Task>();
private Thread[] _threads;
private bool _disposed;
private readonly object _lock = new object();
public int ThreadCount { get; }
public TaskThreadPool(int threadCount, bool isBackground = false)
{
if (threadCount < 1)
throw new ArgumentOutOfRangeException(nameof(threadCount), "Must be at least 1");
ThreadCount = threadCount;
_threads = new Thread[threadCount];
for (int i = 0; i < threadCount; i++)
{
_threads[i] = new Thread(ExecuteTasks)
{
IsBackground = isBackground
};
_threads[i].Start();
}
}
public Task Run(Action action) => Task.Factory.StartNew(action, CancellationToken.None, TaskCreationOptions.None, this);
private void ExecuteTasks()
{
while (true)
{
var task = _queue.Dequeue();
if (task == null)
return;
TryExecuteTask(task);
}
}
protected override IEnumerable<Task> GetScheduledTasks() => _queue.ToArray();
protected override void QueueTask(Task task)
{
if (_disposed)
throw new ObjectDisposedException(typeof(TaskThreadPool).FullName);
if (task == null)
throw new ArgumentNullException(nameof(task));
try
{
_queue.Enqueue(task);
}
catch (ObjectDisposedException e)
{
throw new ObjectDisposedException(typeof(TaskThreadPool).FullName, e);
}
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
if (_disposed)
throw new ObjectDisposedException(typeof(TaskThreadPool).FullName);
return !taskWasPreviouslyQueued && TryExecuteTask(task);
}
public void Dispose()
{
lock (_lock)
{
if (_disposed)
return;
_disposed = true;
}
for (int i = 0; i < _threads.Length; i++)
_queue.Enqueue(null);
foreach (var thread in _threads)
thread.Join();
_threads = null;
_queue.Dispose();
}
}
Async/await and Callbacks
Right now, we have a thread pool that we can queue up and execute tasks on. The thread pool integrates with dotnet, so that we can use standard methods like Task.Factory.StartNew
to run things on the pool. That’s great, but what about async/await? How do we get our thread pool to work with tasks run by async methods?
To get dotnet working properly with async callbacks, we have to tell it how to handle them by providing it with a SynchronisationContext
.
The Synchronisation Context
Within dotnet, the Task Parallel Library (TPL) handles how callbacks are scheduled. A synchronisation context can be used to tell the TPL how to do this. It requires Send()
and Post()
methods, which are called when a new callback needs to be scheduled.
The Send()
method is used to execute a callback synchronously. The method should block until the task has finished executing.
The Post()
method, on the other hand, does not block. It’s more of a ‘fire and forget’ method. We don’t need to worry much right now about how these methods are used as long as they are implemented properly, since the TPL calls them when appropriate.
Our implementation also provides a Receive()
method for a thread to dequeue callbacks to execute.
Just as in the thread pool, we also have an Unblock()
method to trigger an InterruptedException
, which should be handled to signal the thread(s) receiving callbacks to wrap up.
The synchronisation context uses SendOrPostCallbackItem
s to represent callbacks. These hold some information needed to execute the task, such as the task delegate itself and a state object to pass it, along with a handy optional Manual Reset Event3 which is triggered when the task completes (can be null if not required).
public sealed class QueueSyncContext : SynchronizationContext, IDisposable
{
private readonly BlockingQueue<SendOrPostCallbackItem> _messageQueue = new BlockingQueue<SendOrPostCallbackItem>();
public override SynchronizationContext CreateCopy() => this;
public override void Post(SendOrPostCallback d, object state)
{
_messageQueue.Enqueue(new SendOrPostCallbackItem(ExecutionType.Post, d, state, null));
}
public override void Send(SendOrPostCallback d, object state)
{
using (var handle = new ManualResetEventSlim())
{
var callbackItem = new SendOrPostCallbackItem(ExecutionType.Send, d, state, handle);
_messageQueue.Enqueue(callbackItem);
handle.Wait();
if (callbackItem.Exception != null)
throw callbackItem.Exception;
}
}
public SendOrPostCallbackItem Receive()
{
var message = _messageQueue.Dequeue();
if (message == null)
throw new InterruptedException("Message queue was unblocked.");
return message;
}
public void Unblock() => _messageQueue.Enqueue(null);
public void Unblock(int count)
{
for (; count > 0; count--)
_messageQueue.Enqueue(null);
}
public void Dispose() => _messageQueue.Dispose();
}
public class SendOrPostCallbackItem
{
public SendOrPostCallbackItem(ExecutionType executionType, SendOrPostCallback callback, object state, ManualResetEventSlim signalComplete)
{
ExecutionType = executionType;
Callback = callback;
State = state;
SignalComplete = signalComplete;
}
private ExecutionType ExecutionType { get; }
private SendOrPostCallback Callback { get; }
private object State { get; }
private ManualResetEventSlim SignalComplete { get; }
public Exception Exception { get; private set; }
public void Execute()
{
switch (ExecutionType)
{
case ExecutionType.Post:
Callback(State);
break;
case ExecutionType.Send:
try
{
Callback(State);
}
catch (Exception e)
{
Exception = e;
}
SignalComplete.Set();
break;
default:
throw new ArgumentException($"{nameof(ExecutionType)} is not a valid value.");
}
}
}
We can set this as the current synchronisation context with SynchronizationContext.SetSynchronizationContext(syncContext)
.
A Practical Callback Wrapper
We now have a synchronisation context which queues up callback items, however we still haven’t handled the callback items that are being queued. The following class wraps the synchronisation context to handle callbacks when the ExecuteCallbacks()
method is called.
public class SchedulingSyncContext : IDisposable
{
private Thread[] _threads;
public SynchronizationContext SynchronizationContext => _disposed ? throw new ObjectDisposedException(typeof(SchedulingSyncContext).FullName) : _queueSyncContext;
private QueueSyncContext _queueSyncContext;
private bool _disposed;
private readonly object _lock = new object();
public SchedulingSyncContext(int threadCount = 1, bool isBackground = false)
{
if (threadCount < 1)
throw new ArgumentOutOfRangeException(nameof(threadCount));
_queueSyncContext = new QueueSyncContext();
_threads = new Thread[threadCount];
for (int i = 0; i < _threads.Length; i++)
{
_threads[i] = new Thread(ExecuteCallbacks)
{
IsBackground = isBackground
};
_threads[i].Start();
}
}
private void ExecuteCallbacks()
{
while (true)
{
SendOrPostCallbackItem callback;
try
{
callback = _queueSyncContext.Receive();
}
catch (InterruptedException)
{
return;
}
callback.Execute();
}
}
public void Dispose()
{
lock (_lock)
{
if (_disposed)
return;
_disposed = true;
}
_queueSyncContext.Unblock(_threads.Length);
foreach (var thread in _threads)
thread.Join();
_queueSyncContext.Dispose();
}
}
Tying It All Together
Now that we have all our code laid out, it’s time to test it.
Here’s an xUnit test to demonstrate how this code can be used:
public class SchedulerTests
{
private readonly ITestOutputHelper _testOutputHelper;
public SchedulerTests(ITestOutputHelper testOutputHelper)
{
_testOutputHelper = testOutputHelper;
}
[Fact]
public void SimpleTest()
{
var scheduler = new SchedulingSyncContext();
SynchronizationContext.SetSynchronizationContext(scheduler.SynchronizationContext);
var builder = new StringBuilder();
WriteTest();
Thread.Sleep(1000);
_testOutputHelper.WriteLine(builder.ToString());
scheduler.Dispose();
async void WriteTest()
{
var t1 = Write1();
var t2 = Write2();
builder.AppendLine("WriteTest");
await Task.WhenAll(t1, t2);
await Write3();
}
async Task Write1()
{
// Yield the thread (runs the remainder of the async code as a callback on the SchedulingSyncContext thread)
await Task.Yield();
builder.AppendLine("Test1");
}
async Task Write2()
{
// Wait for 10 milliseconds before continuing (uses callbacks)
await Task.Delay(10);
builder.AppendLine("Test2");
}
async Task Write3()
{
// Run these two tasks on the thread pool (could come out in any order)
var tA = Task.Run(() => builder.AppendLine("Write3 - 1"));
var tB = Task.Run(() => builder.AppendLine("Write3 - 2"));
// Wait for them to finish before continuing (uses callbacks)
await Task.WhenAll(tA, tB);
}
}
}
Note that some of the lines that run on the thread pool may come out in a different order. Here’s an example output:
Test1
WriteTest
Test2
Write3 - 2
Write3 - 1
Conclusion
That’s it, we now have a custom .NET scheduler. It’s pretty simple, but can be adapted to do far more, for example you could modify the blocking queue to be a priority queue, or a LIFO stack instead.
To reiterate, I don’t recommend using a custom scheduler in production code just for the fun of it. It’s much better to use the built-in tooling where possible, rather than reinventing the wheel. However, for some purposes, a bespoke scheduler does make sense.
Even if you don’t find a use for this, I hope it serves to demystify the internal workings of the TPL, especially async/await, which can be quite useful when dealing with complicated async code.
Best of luck!
Another word for pushing onto a stack. Dotnet uses
Enqueue()
andDequeue()
methods for LIFO queues as opposed to thePush()
andPop()
methods used for FIFO stacks. ↩Note that these threads will perform busy-waiting, meaning they will regularly wake up to check if they can enter the lock region yet. This results in good performance for short-term locks, but more efficient patterns should be used for locks that are expected to be held for longer periods, for example a Mutex. ↩
This object allows threads to block until it is triggered, which is especially handy for our
Send()
method, which needs to block until the task completes. ↩