Doug Binks - 05 Sep 2015
This is the second in a series of articles detailing the inner workings and evolution of the permissively open source multithreading task scheduler enkiTS for C and C++ (including C++ 11). In the first article of this series I covered the external interfaces and their implementation. This post will cover the task threading function, running tasks, and waiting for tasks.
Figure 1: Screenshot of Avoyd being profiled with microprofile and ImGui integration available in enkiTSExamples. Solid bars above named tasks show when threads are active - the wait functionality allows the core to idle or other threads to run.
The task scheduler creates several threads, by default the number of cores minus one (to account for the main thread), each of which runs a static member function TaskScheduler::TaskingThreadFunction.
THREADFUNC_DECL TaskScheduler::TaskingThreadFunction( void* pArgs ) { ThreadArgs args = *(ThreadArgs*)pArgs; uint32_t threadNum = args.threadNum; TaskScheduler* pTS = args.pTaskScheduler; gtl_threadNum = threadNum; AtomicAdd( &pTS->m_NumThreadsActive, 1 ); SafeCallback( pTS->m_ProfilerCallbacks.threadStart, threadNum );
The task function starts by unpacking its arguments - the thread number and a pointer to the task scheduler. The thread number is stored in gtl_threadNum is a thread local global variable. Storing the thread number in a thread local variable allows user functions to call the task scheduler without needing to pass it in, which cuts down on sources of errors and simplifies the interface.
I then atomically increment the number of threads active count and call any user provided profiler callback to let user code know a thread has been created.
uint32_t spinCount = 0; uint32_t hintPipeToCheck_io = threadNum + 1; // does not need to be clamped. while( pTS->m_bRunning ) { if( !pTS->TryRunTask( threadNum, hintPipeToCheck_io ) ) { // no tasks, will spin then wait ++spinCount; if( spinCount > SPIN_COUNT ) { pTS->WaitForTasks( threadNum ); } } else { spinCount = 0; } }
The main action of the task thread function is to call TryRunTask in a loop, and if none are available call WaitForTasks. Both of these functions are described below.
I first initialize a spin count, and a hint index which will be described later. Then I loop until the task scheduler sets a running boolean variable to false - this is declared as volatile so the compiler knows not to optimize the read from memory out of the loop (an std::atomic type is used in the C++11 branch for the same purpose). Note that the exact order of the memory read with respect to others isn't critical here, as I just want to be able to exit the loop at some point.
Next I call TryRunTask. This function needs the current thread number and a hint for which pipe should be checked (as an in-out non const reference). TryRunTask will return true if it was able to find and run a task, false if not. If no task is found I increment a spin counter, otherwise I set the spin counter to zero. If I've passed the SPIN_COUNT limit without any task having run, I call WaitForTasks to prevent unneeded spinning in this loop.
AtomicAdd( &pTS->m_NumThreadsRunning, -1 ); SafeCallback( pTS->m_ProfilerCallbacks.threadStop, threadNum ); return 0; }
Finally I close the function by atomically decrementing the count of threads running, call the profiler callback if present, and return from the function which exits the thread.
The function TaskScheduler::TryRunTask contains the algorithm for finding a task and running it.
bool TaskScheduler::TryRunTask( uint32_t threadNum, uint32_t& hintPipeToCheck_io_ ) { // check for tasks SubTaskSet subTask; bool bHaveTask = m_pPipesPerThread[ threadNum ].WriterTryReadFront( &subTask );
I start by first checking for subtasks from the front of the current threads task pipe. Subtasks (described in detail in AddTaskSetToPipe) are the unit of execution of the task scheduler, and they include a task pointer and a range used for data parallel processing. The front of the pipe is checked by WriterTryReadFront(), which returns whether a task has been found and initializes the subTask variable. If a subtask is found, it will be removed from the pipe.
Since subtasks are added to the front of a thread task pipe this means I first try to run a task which has been added recently. This approach is good for L1 cache coherence.
uint32_t threadToCheck = hintPipeToCheck_io_; uint32_t checkCount = 0; while( !bHaveTask && checkCount < m_NumThreads ) { threadToCheck = ( hintPipeToCheck_io_ + checkCount ) % m_NumThreads; if( threadToCheck != threadNum ) { bHaveTask = m_pPipesPerThread[ threadToCheck ].ReaderTryReadBack( &subTask ); } ++checkCount; }
Next I check other threads task pipes, looping through from the last known 'hot' pipe using the hint parameter. This hint significantly boosts the performance of the typical situation where one subtask generates a number of other subtasks. I use a loop rather than a random choice, as this is simpler.
By reading from the back of other thread pipes I leave control of the front of a pipe to the pipe's owning thread, improving performance for writing.
if( bHaveTask ) { // update hint, will preserve value unless actually got task from another thread. hintPipeToCheck_io_ = threadToCheck; // the task has already been divided up by AddTaskSetToPipe, so just run it subTask.pTask->ExecuteRange( subTask.partition, threadNum ); AtomicAdd( &subTask.pTask->m_RunningCount, -1 ); } return bHaveTask; }
If I have a subtask, I update the pipe hint and execute the task using the subtask range then atomically decrement the running count. The atomic decrement functions used generate a full memory barrier to ensure that the count is not decremented prior to the subtask being executed. The function then exits returning whether a task was executed.
The task waiting function uses OS synchronization functions to put the thread to sleep until a new task event is generated. The task scheduler would function without waiting, with potentially slightly higher performance in some cases, but at a high cost to both CPU power wasted and the performance of other processes and threads. Waiting can also improve performance, as the OS can potentially schedule other work when a tasking thread is idle rather than interrupting randomly.
Figure 2: This microprofile of Avoyd shows the same time period as Figure 1 but with only the thread status being displayed (context switch tracking on). By using OS synchronization methods to wait for a new task event the scheduler can free up cores and permit other threads to run or the core to idle.
void TaskScheduler::WaitForTasks( uint32_t threadNum ) { bool bHaveTasks = false; for( uint32_t thread = 0; thread < m_NumThreads; ++thread ) { if( !m_pPipesPerThread[ thread ].IsPipeEmpty() ) { bHaveTasks = true; break; } } if( !bHaveTasks ) { SafeCallback( m_ProfilerCallbacks.waitStart, threadNum ); AtomicAdd( &m_NumThreadsActive, -1 ); EventWait( m_NewTaskEvent, EVENTWAIT_INFINITE ); AtomicAdd( &m_NumThreadsActive, +1 ); SafeCallback( m_ProfilerCallbacks.waitStop, threadNum ); } }
This function is simple enough to list in one go. First all the thread task pipes are checked to see if they are empty. If a pipe is non empty I exit, otherwise I decrement the active thread count and wait on the new task event. On wake up the active thread count is incremented. User profiler callbacks surround the wait (this is how I mark the wait functions in a profiler such as microprofile).
As noted in the previous post I'm not using a lock with my synchronization, which means that the pipe check and count of active threads isn't atomically set along with the event. This could result in a thread waiting when it should be awake if a new task event is fired during the function before the wait. Since the code will still function in the case of one task thread being inactive (the main thread doesn't wait, so can still perform work), this trades usual case performance for a very rare (so far not observed) potential decrease in worst case performance.
The next most important part of the task scheduler internals to discuss is the multi-reader, single writer pipe. Since this is fairly complex, both in terms of lines of code and the approach, I'll leave it for the next post. Following on from that I hope to take a look at performance and comparisons with other tasking systems, as I'll likely leave explaining the task scheduler initialization code as homework for the reader.