1//========- unittests/Support/ThreadPools.cpp - ThreadPools.h tests --========//
2//
3// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4// See https://llvm.org/LICENSE.txt for license information.
5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6//
7//===----------------------------------------------------------------------===//
8
9#include "llvm/Support/ThreadPool.h"
10
11#include "llvm/ADT/STLExtras.h"
12#include "llvm/ADT/SetVector.h"
13#include "llvm/ADT/SmallVector.h"
14#include "llvm/Support/CommandLine.h"
15#include "llvm/Support/Program.h"
16#include "llvm/Support/TargetSelect.h"
17#include "llvm/Support/Threading.h"
18#include "llvm/TargetParser/Host.h"
19#include "llvm/TargetParser/Triple.h"
20
21#ifdef _WIN32
22#include "llvm/Support/Windows/WindowsSupport.h"
23#endif
24
25#include <chrono>
26#include <thread>
27
28#include "gtest/gtest.h"
29
30namespace testing {
31namespace internal {
32// Specialize gtest construct to provide friendlier name in the output.
33#if LLVM_ENABLE_THREADS
34template <> std::string GetTypeName<llvm::StdThreadPool>() {
35 return "llvm::StdThreadPool";
36}
37#endif
38template <> std::string GetTypeName<llvm::SingleThreadExecutor>() {
39 return "llvm::SingleThreadExecutor";
40}
41} // namespace internal
42} // namespace testing
43
44using namespace llvm;
45
46// Fixture for the unittests, allowing to *temporarily* disable the unittests
47// on a particular platform
48template <typename ThreadPoolImpl> class ThreadPoolTest : public testing::Test {
49 Triple Host;
50 SmallVector<Triple::ArchType, 4> UnsupportedArchs;
51 SmallVector<Triple::OSType, 4> UnsupportedOSs;
52 SmallVector<Triple::EnvironmentType, 1> UnsupportedEnvironments;
53
54protected:
55 // This is intended for platform as a temporary "XFAIL"
56 bool isUnsupportedOSOrEnvironment() {
57 Triple Host(Triple::normalize(sys::getProcessTriple()));
58
59 if (find(Range&: UnsupportedEnvironments, Val: Host.getEnvironment()) !=
60 UnsupportedEnvironments.end())
61 return true;
62
63 if (is_contained(Range&: UnsupportedOSs, Element: Host.getOS()))
64 return true;
65
66 if (is_contained(Range&: UnsupportedArchs, Element: Host.getArch()))
67 return true;
68
69 return false;
70 }
71
72 ThreadPoolTest() {
73 // Add unsupported configuration here, example:
74 // UnsupportedArchs.push_back(Triple::x86_64);
75
76 // See https://llvm.org/bugs/show_bug.cgi?id=25829
77 UnsupportedArchs.push_back(Elt: Triple::ppc64le);
78 UnsupportedArchs.push_back(Elt: Triple::ppc64);
79 }
80
81 /// Make sure this thread not progress faster than the main thread.
82 void waitForMainThread() { waitForPhase(Phase: 1); }
83
84 /// Set the readiness of the main thread.
85 void setMainThreadReady() { setPhase(1); }
86
87 /// Wait until given phase is set using setPhase(); first "main" phase is 1.
88 /// See also PhaseResetHelper below.
89 void waitForPhase(int Phase) {
90 std::unique_lock<std::mutex> LockGuard(CurrentPhaseMutex);
91 CurrentPhaseCondition.wait(
92 LockGuard, [&] { return CurrentPhase == Phase || CurrentPhase < 0; });
93 }
94 /// If a thread waits on another phase, the test could bail out on a failed
95 /// assertion and ThreadPool destructor would wait() on all threads, which
96 /// would deadlock on the task waiting. Create this helper to automatically
97 /// reset the phase and unblock such threads.
98 struct PhaseResetHelper {
99 PhaseResetHelper(ThreadPoolTest *test) : test(test) {}
100 ~PhaseResetHelper() { test->setPhase(-1); }
101 ThreadPoolTest *test;
102 };
103
104 /// Advance to the given phase.
105 void setPhase(int Phase) {
106 {
107 std::unique_lock<std::mutex> LockGuard(CurrentPhaseMutex);
108 assert(Phase == CurrentPhase + 1 || Phase < 0);
109 CurrentPhase = Phase;
110 }
111 CurrentPhaseCondition.notify_all();
112 }
113
114 void SetUp() override { CurrentPhase = 0; }
115
116 SmallVector<llvm::BitVector, 0> RunOnAllSockets(ThreadPoolStrategy S);
117
118 std::condition_variable CurrentPhaseCondition;
119 std::mutex CurrentPhaseMutex;
120 int CurrentPhase; // -1 = error, 0 = setup, 1 = ready, 2+ = custom
121};
122
123using ThreadPoolImpls = ::testing::Types<
124#if LLVM_ENABLE_THREADS
125 StdThreadPool,
126#endif
127 SingleThreadExecutor>;
128
129TYPED_TEST_SUITE(ThreadPoolTest, ThreadPoolImpls, );
130
131#define CHECK_UNSUPPORTED() \
132 do { \
133 if (this->isUnsupportedOSOrEnvironment()) \
134 GTEST_SKIP(); \
135 } while (0);
136
137TYPED_TEST(ThreadPoolTest, AsyncBarrier) {
138 CHECK_UNSUPPORTED();
139 // test that async & barrier work together properly.
140
141 std::atomic_int checked_in{0};
142
143 DefaultThreadPool Pool;
144 for (size_t i = 0; i < 5; ++i) {
145 Pool.async([this, &checked_in] {
146 this->waitForMainThread();
147 ++checked_in;
148 });
149 }
150 ASSERT_EQ(0, checked_in);
151 this->setMainThreadReady();
152 Pool.wait();
153 ASSERT_EQ(5, checked_in);
154}
155
156static void TestFunc(std::atomic_int &checked_in, int i) { checked_in += i; }
157
158TYPED_TEST(ThreadPoolTest, AsyncBarrierArgs) {
159 CHECK_UNSUPPORTED();
160 // Test that async works with a function requiring multiple parameters.
161 std::atomic_int checked_in{0};
162
163 DefaultThreadPool Pool;
164 for (size_t i = 0; i < 5; ++i) {
165 Pool.async(F&: TestFunc, ArgList: std::ref(t&: checked_in), ArgList&: i);
166 }
167 Pool.wait();
168 ASSERT_EQ(10, checked_in);
169}
170
171TYPED_TEST(ThreadPoolTest, Async) {
172 CHECK_UNSUPPORTED();
173 DefaultThreadPool Pool;
174 std::atomic_int i{0};
175 Pool.async([this, &i] {
176 this->waitForMainThread();
177 ++i;
178 });
179 Pool.async([&i] { ++i; });
180 ASSERT_NE(2, i.load());
181 this->setMainThreadReady();
182 Pool.wait();
183 ASSERT_EQ(2, i.load());
184}
185
186TYPED_TEST(ThreadPoolTest, GetFuture) {
187 CHECK_UNSUPPORTED();
188 DefaultThreadPool Pool(hardware_concurrency(ThreadCount: 2));
189 std::atomic_int i{0};
190 Pool.async([this, &i] {
191 this->waitForMainThread();
192 ++i;
193 });
194 // Force the future using get()
195 Pool.async([&i] { ++i; }).get();
196 ASSERT_NE(2, i.load());
197 this->setMainThreadReady();
198 Pool.wait();
199 ASSERT_EQ(2, i.load());
200}
201
202TYPED_TEST(ThreadPoolTest, GetFutureWithResult) {
203 CHECK_UNSUPPORTED();
204 DefaultThreadPool Pool(hardware_concurrency(ThreadCount: 2));
205 auto F1 = Pool.async([] { return 1; });
206 auto F2 = Pool.async([] { return 2; });
207
208 this->setMainThreadReady();
209 Pool.wait();
210 ASSERT_EQ(1, F1.get());
211 ASSERT_EQ(2, F2.get());
212}
213
214TYPED_TEST(ThreadPoolTest, GetFutureWithResultAndArgs) {
215 CHECK_UNSUPPORTED();
216 DefaultThreadPool Pool(hardware_concurrency(ThreadCount: 2));
217 auto Fn = [](int x) { return x; };
218 auto F1 = Pool.async(Fn, 1);
219 auto F2 = Pool.async(Fn, 2);
220
221 this->setMainThreadReady();
222 Pool.wait();
223 ASSERT_EQ(1, F1.get());
224 ASSERT_EQ(2, F2.get());
225}
226
227TYPED_TEST(ThreadPoolTest, PoolDestruction) {
228 CHECK_UNSUPPORTED();
229 // Test that we are waiting on destruction
230 std::atomic_int checked_in{0};
231 {
232 DefaultThreadPool Pool;
233 for (size_t i = 0; i < 5; ++i) {
234 Pool.async([this, &checked_in] {
235 this->waitForMainThread();
236 ++checked_in;
237 });
238 }
239 ASSERT_EQ(0, checked_in);
240 this->setMainThreadReady();
241 }
242 ASSERT_EQ(5, checked_in);
243}
244
245// Check running tasks in different groups.
246TYPED_TEST(ThreadPoolTest, Groups) {
247 CHECK_UNSUPPORTED();
248 // Need at least two threads, as the task in group2
249 // might block a thread until all tasks in group1 finish.
250 ThreadPoolStrategy S = hardware_concurrency(ThreadCount: 2);
251 if (S.compute_thread_count() < 2)
252 GTEST_SKIP();
253 DefaultThreadPool Pool(S);
254 typename TestFixture::PhaseResetHelper Helper(this);
255 ThreadPoolTaskGroup Group1(Pool);
256 ThreadPoolTaskGroup Group2(Pool);
257
258 // Check that waiting for an empty group is a no-op.
259 Group1.wait();
260
261 std::atomic_int checked_in1{0};
262 std::atomic_int checked_in2{0};
263
264 for (size_t i = 0; i < 5; ++i) {
265 Group1.async([this, &checked_in1] {
266 this->waitForMainThread();
267 ++checked_in1;
268 });
269 }
270 Group2.async([this, &checked_in2] {
271 this->waitForPhase(2);
272 ++checked_in2;
273 });
274 ASSERT_EQ(0, checked_in1);
275 ASSERT_EQ(0, checked_in2);
276 // Start first group and wait for it.
277 this->setMainThreadReady();
278 Group1.wait();
279 ASSERT_EQ(5, checked_in1);
280 // Second group has not yet finished, start it and wait for it.
281 ASSERT_EQ(0, checked_in2);
282 this->setPhase(2);
283 Group2.wait();
284 ASSERT_EQ(5, checked_in1);
285 ASSERT_EQ(1, checked_in2);
286}
287
288// Check recursive tasks.
289TYPED_TEST(ThreadPoolTest, RecursiveGroups) {
290 CHECK_UNSUPPORTED();
291 DefaultThreadPool Pool;
292 ThreadPoolTaskGroup Group(Pool);
293
294 std::atomic_int checked_in1{0};
295
296 for (size_t i = 0; i < 5; ++i) {
297 Group.async([this, &Pool, &checked_in1] {
298 this->waitForMainThread();
299
300 ThreadPoolTaskGroup LocalGroup(Pool);
301
302 // Check that waiting for an empty group is a no-op.
303 LocalGroup.wait();
304
305 std::atomic_int checked_in2{0};
306 for (size_t i = 0; i < 5; ++i) {
307 LocalGroup.async([&checked_in2] { ++checked_in2; });
308 }
309 LocalGroup.wait();
310 ASSERT_EQ(5, checked_in2);
311
312 ++checked_in1;
313 });
314 }
315 ASSERT_EQ(0, checked_in1);
316 this->setMainThreadReady();
317 Group.wait();
318 ASSERT_EQ(5, checked_in1);
319}
320
321TYPED_TEST(ThreadPoolTest, RecursiveWaitDeadlock) {
322 CHECK_UNSUPPORTED();
323 ThreadPoolStrategy S = hardware_concurrency(ThreadCount: 2);
324 if (S.compute_thread_count() < 2)
325 GTEST_SKIP();
326 DefaultThreadPool Pool(S);
327 typename TestFixture::PhaseResetHelper Helper(this);
328 ThreadPoolTaskGroup Group(Pool);
329
330 // Test that a thread calling wait() for a group and is waiting for more tasks
331 // returns when the last task finishes in a different thread while the waiting
332 // thread was waiting for more tasks to process while waiting.
333
334 // Task A runs in the first thread. It finishes and leaves
335 // the background thread waiting for more tasks.
336 Group.async([this] {
337 this->waitForMainThread();
338 this->setPhase(2);
339 });
340 // Task B is run in a second thread, it launches yet another
341 // task C in a different group, which will be handled by the waiting
342 // thread started above.
343 Group.async([this, &Pool] {
344 this->waitForPhase(2);
345 ThreadPoolTaskGroup LocalGroup(Pool);
346 LocalGroup.async([this] {
347 this->waitForPhase(3);
348 // Give the other thread enough time to check that there's no task
349 // to process and suspend waiting for a notification. This is indeed racy,
350 // but probably the best that can be done.
351 std::this_thread::sleep_for(rtime: std::chrono::milliseconds(10));
352 });
353 // And task B only now will wait for the tasks in the group (=task C)
354 // to finish. This test checks that it does not deadlock. If the
355 // `NotifyGroup` handling in ThreadPool::processTasks() didn't take place,
356 // this task B would be stuck waiting for tasks to arrive.
357 this->setPhase(3);
358 LocalGroup.wait();
359 });
360 this->setMainThreadReady();
361 Group.wait();
362}
363
364#if LLVM_ENABLE_THREADS == 1
365
366// FIXME: Skip some tests below on non-Windows because multi-socket systems
367// were not fully tested on Unix yet, and llvm::get_thread_affinity_mask()
368// isn't implemented for Unix (need AffinityMask in Support/Unix/Program.inc).
369#ifdef _WIN32
370
371template <typename ThreadPoolImpl>
372SmallVector<llvm::BitVector, 0>
373ThreadPoolTest<ThreadPoolImpl>::RunOnAllSockets(ThreadPoolStrategy S) {
374 llvm::SetVector<llvm::BitVector> ThreadsUsed;
375 std::mutex Lock;
376 {
377 std::condition_variable AllThreads;
378 std::mutex AllThreadsLock;
379 unsigned Active = 0;
380
381 DefaultThreadPool Pool(S);
382 for (size_t I = 0; I < S.compute_thread_count(); ++I) {
383 Pool.async([&] {
384 {
385 std::lock_guard<std::mutex> Guard(AllThreadsLock);
386 ++Active;
387 AllThreads.notify_one();
388 }
389 this->waitForMainThread();
390 std::lock_guard<std::mutex> Guard(Lock);
391 auto Mask = llvm::get_thread_affinity_mask();
392 ThreadsUsed.insert(Mask);
393 });
394 }
395 EXPECT_EQ(true, ThreadsUsed.empty());
396 {
397 std::unique_lock<std::mutex> Guard(AllThreadsLock);
398 AllThreads.wait(Guard,
399 [&]() { return Active == S.compute_thread_count(); });
400 }
401 this->setMainThreadReady();
402 }
403 return ThreadsUsed.takeVector();
404}
405
406TYPED_TEST(ThreadPoolTest, AllThreads_UseAllRessources) {
407 CHECK_UNSUPPORTED();
408 // After Windows 11, the OS is free to deploy the threads on any CPU socket.
409 // We cannot relibly ensure that all thread affinity mask are covered,
410 // therefore this test should not run.
411 if (llvm::RunningWindows11OrGreater())
412 GTEST_SKIP();
413 auto ThreadsUsed = this->RunOnAllSockets({});
414 ASSERT_EQ(llvm::get_cpus(), ThreadsUsed.size());
415}
416
417TYPED_TEST(ThreadPoolTest, AllThreads_OneThreadPerCore) {
418 CHECK_UNSUPPORTED();
419 // After Windows 11, the OS is free to deploy the threads on any CPU socket.
420 // We cannot relibly ensure that all thread affinity mask are covered,
421 // therefore this test should not run.
422 if (llvm::RunningWindows11OrGreater())
423 GTEST_SKIP();
424 auto ThreadsUsed =
425 this->RunOnAllSockets(llvm::heavyweight_hardware_concurrency());
426 ASSERT_EQ(llvm::get_cpus(), ThreadsUsed.size());
427}
428
429// From TestMain.cpp.
430extern const char *TestMainArgv0;
431
432// Just a reachable symbol to ease resolving of the executable's path.
433static cl::opt<std::string> ThreadPoolTestStringArg1("thread-pool-string-arg1");
434
435#ifdef _WIN32
436#define setenv(name, var, ignore) _putenv_s(name, var)
437#endif
438
439TYPED_TEST(ThreadPoolTest, AffinityMask) {
440 CHECK_UNSUPPORTED();
441
442 // Skip this test if less than 4 threads are available.
443 if (llvm::hardware_concurrency().compute_thread_count() < 4)
444 GTEST_SKIP();
445
446 using namespace llvm::sys;
447 if (getenv("LLVM_THREADPOOL_AFFINITYMASK")) {
448 auto ThreadsUsed = this->RunOnAllSockets({});
449 // Ensure the threads only ran on CPUs 0-3.
450 // NOTE: Don't use ASSERT* here because this runs in a subprocess,
451 // and will show up as un-executed in the parent.
452 assert(llvm::all_of(ThreadsUsed,
453 [](auto &T) { return T.getData().front() < 16UL; }) &&
454 "Threads ran on more CPUs than expected! The affinity mask does not "
455 "seem to work.");
456 GTEST_SKIP();
457 }
458 std::string Executable =
459 sys::fs::getMainExecutable(TestMainArgv0, &ThreadPoolTestStringArg1);
460 StringRef argv[] = {Executable, "--gtest_filter=ThreadPoolTest.AffinityMask"};
461
462 // Add environment variable to the environment of the child process.
463 int Res = setenv("LLVM_THREADPOOL_AFFINITYMASK", "1", false);
464 ASSERT_EQ(Res, 0);
465
466 std::string Error;
467 bool ExecutionFailed;
468 BitVector Affinity;
469 Affinity.resize(4);
470 Affinity.set(0, 4); // Use CPUs 0,1,2,3.
471 int Ret = sys::ExecuteAndWait(Executable, argv, {}, {}, 0, 0, &Error,
472 &ExecutionFailed, nullptr, &Affinity);
473 ASSERT_EQ(0, Ret);
474}
475
476#endif // #ifdef _WIN32
477#endif // #if LLVM_ENABLE_THREADS == 1
478

source code of llvm/unittests/Support/ThreadPool.cpp