1 | //========- unittests/Support/ThreadPools.cpp - ThreadPools.h tests --========// |
2 | // |
3 | // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. |
4 | // See https://llvm.org/LICENSE.txt for license information. |
5 | // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception |
6 | // |
7 | //===----------------------------------------------------------------------===// |
8 | |
9 | #include "llvm/Support/ThreadPool.h" |
10 | |
11 | #include "llvm/ADT/STLExtras.h" |
12 | #include "llvm/ADT/SetVector.h" |
13 | #include "llvm/ADT/SmallVector.h" |
14 | #include "llvm/Support/CommandLine.h" |
15 | #include "llvm/Support/Program.h" |
16 | #include "llvm/Support/TargetSelect.h" |
17 | #include "llvm/Support/Threading.h" |
18 | #include "llvm/TargetParser/Host.h" |
19 | #include "llvm/TargetParser/Triple.h" |
20 | |
21 | #ifdef _WIN32 |
22 | #include "llvm/Support/Windows/WindowsSupport.h" |
23 | #endif |
24 | |
25 | #include <chrono> |
26 | #include <thread> |
27 | |
28 | #include "gtest/gtest.h" |
29 | |
30 | namespace testing { |
31 | namespace internal { |
32 | // Specialize gtest construct to provide friendlier name in the output. |
33 | #if LLVM_ENABLE_THREADS |
34 | template <> std::string GetTypeName<llvm::StdThreadPool>() { |
35 | return "llvm::StdThreadPool" ; |
36 | } |
37 | #endif |
38 | template <> std::string GetTypeName<llvm::SingleThreadExecutor>() { |
39 | return "llvm::SingleThreadExecutor" ; |
40 | } |
41 | } // namespace internal |
42 | } // namespace testing |
43 | |
44 | using namespace llvm; |
45 | |
46 | // Fixture for the unittests, allowing to *temporarily* disable the unittests |
47 | // on a particular platform |
48 | template <typename ThreadPoolImpl> class ThreadPoolTest : public testing::Test { |
49 | Triple Host; |
50 | SmallVector<Triple::ArchType, 4> UnsupportedArchs; |
51 | SmallVector<Triple::OSType, 4> UnsupportedOSs; |
52 | SmallVector<Triple::EnvironmentType, 1> UnsupportedEnvironments; |
53 | |
54 | protected: |
55 | // This is intended for platform as a temporary "XFAIL" |
56 | bool isUnsupportedOSOrEnvironment() { |
57 | Triple Host(Triple::normalize(sys::getProcessTriple())); |
58 | |
59 | if (find(Range&: UnsupportedEnvironments, Val: Host.getEnvironment()) != |
60 | UnsupportedEnvironments.end()) |
61 | return true; |
62 | |
63 | if (is_contained(Range&: UnsupportedOSs, Element: Host.getOS())) |
64 | return true; |
65 | |
66 | if (is_contained(Range&: UnsupportedArchs, Element: Host.getArch())) |
67 | return true; |
68 | |
69 | return false; |
70 | } |
71 | |
72 | ThreadPoolTest() { |
73 | // Add unsupported configuration here, example: |
74 | // UnsupportedArchs.push_back(Triple::x86_64); |
75 | |
76 | // See https://llvm.org/bugs/show_bug.cgi?id=25829 |
77 | UnsupportedArchs.push_back(Elt: Triple::ppc64le); |
78 | UnsupportedArchs.push_back(Elt: Triple::ppc64); |
79 | } |
80 | |
81 | /// Make sure this thread not progress faster than the main thread. |
82 | void waitForMainThread() { waitForPhase(Phase: 1); } |
83 | |
84 | /// Set the readiness of the main thread. |
85 | void setMainThreadReady() { setPhase(1); } |
86 | |
87 | /// Wait until given phase is set using setPhase(); first "main" phase is 1. |
88 | /// See also PhaseResetHelper below. |
89 | void waitForPhase(int Phase) { |
90 | std::unique_lock<std::mutex> LockGuard(CurrentPhaseMutex); |
91 | CurrentPhaseCondition.wait( |
92 | LockGuard, [&] { return CurrentPhase == Phase || CurrentPhase < 0; }); |
93 | } |
94 | /// If a thread waits on another phase, the test could bail out on a failed |
95 | /// assertion and ThreadPool destructor would wait() on all threads, which |
96 | /// would deadlock on the task waiting. Create this helper to automatically |
97 | /// reset the phase and unblock such threads. |
98 | struct PhaseResetHelper { |
99 | PhaseResetHelper(ThreadPoolTest *test) : test(test) {} |
100 | ~PhaseResetHelper() { test->setPhase(-1); } |
101 | ThreadPoolTest *test; |
102 | }; |
103 | |
104 | /// Advance to the given phase. |
105 | void setPhase(int Phase) { |
106 | { |
107 | std::unique_lock<std::mutex> LockGuard(CurrentPhaseMutex); |
108 | assert(Phase == CurrentPhase + 1 || Phase < 0); |
109 | CurrentPhase = Phase; |
110 | } |
111 | CurrentPhaseCondition.notify_all(); |
112 | } |
113 | |
114 | void SetUp() override { CurrentPhase = 0; } |
115 | |
116 | SmallVector<llvm::BitVector, 0> RunOnAllSockets(ThreadPoolStrategy S); |
117 | |
118 | std::condition_variable CurrentPhaseCondition; |
119 | std::mutex CurrentPhaseMutex; |
120 | int CurrentPhase; // -1 = error, 0 = setup, 1 = ready, 2+ = custom |
121 | }; |
122 | |
123 | using ThreadPoolImpls = ::testing::Types< |
124 | #if LLVM_ENABLE_THREADS |
125 | StdThreadPool, |
126 | #endif |
127 | SingleThreadExecutor>; |
128 | |
129 | TYPED_TEST_SUITE(ThreadPoolTest, ThreadPoolImpls, ); |
130 | |
131 | #define CHECK_UNSUPPORTED() \ |
132 | do { \ |
133 | if (this->isUnsupportedOSOrEnvironment()) \ |
134 | GTEST_SKIP(); \ |
135 | } while (0); |
136 | |
137 | TYPED_TEST(ThreadPoolTest, AsyncBarrier) { |
138 | CHECK_UNSUPPORTED(); |
139 | // test that async & barrier work together properly. |
140 | |
141 | std::atomic_int checked_in{0}; |
142 | |
143 | DefaultThreadPool Pool; |
144 | for (size_t i = 0; i < 5; ++i) { |
145 | Pool.async([this, &checked_in] { |
146 | this->waitForMainThread(); |
147 | ++checked_in; |
148 | }); |
149 | } |
150 | ASSERT_EQ(0, checked_in); |
151 | this->setMainThreadReady(); |
152 | Pool.wait(); |
153 | ASSERT_EQ(5, checked_in); |
154 | } |
155 | |
156 | static void TestFunc(std::atomic_int &checked_in, int i) { checked_in += i; } |
157 | |
158 | TYPED_TEST(ThreadPoolTest, AsyncBarrierArgs) { |
159 | CHECK_UNSUPPORTED(); |
160 | // Test that async works with a function requiring multiple parameters. |
161 | std::atomic_int checked_in{0}; |
162 | |
163 | DefaultThreadPool Pool; |
164 | for (size_t i = 0; i < 5; ++i) { |
165 | Pool.async(F&: TestFunc, ArgList: std::ref(t&: checked_in), ArgList&: i); |
166 | } |
167 | Pool.wait(); |
168 | ASSERT_EQ(10, checked_in); |
169 | } |
170 | |
171 | TYPED_TEST(ThreadPoolTest, Async) { |
172 | CHECK_UNSUPPORTED(); |
173 | DefaultThreadPool Pool; |
174 | std::atomic_int i{0}; |
175 | Pool.async([this, &i] { |
176 | this->waitForMainThread(); |
177 | ++i; |
178 | }); |
179 | Pool.async([&i] { ++i; }); |
180 | ASSERT_NE(2, i.load()); |
181 | this->setMainThreadReady(); |
182 | Pool.wait(); |
183 | ASSERT_EQ(2, i.load()); |
184 | } |
185 | |
186 | TYPED_TEST(ThreadPoolTest, GetFuture) { |
187 | CHECK_UNSUPPORTED(); |
188 | DefaultThreadPool Pool(hardware_concurrency(ThreadCount: 2)); |
189 | std::atomic_int i{0}; |
190 | Pool.async([this, &i] { |
191 | this->waitForMainThread(); |
192 | ++i; |
193 | }); |
194 | // Force the future using get() |
195 | Pool.async([&i] { ++i; }).get(); |
196 | ASSERT_NE(2, i.load()); |
197 | this->setMainThreadReady(); |
198 | Pool.wait(); |
199 | ASSERT_EQ(2, i.load()); |
200 | } |
201 | |
202 | TYPED_TEST(ThreadPoolTest, GetFutureWithResult) { |
203 | CHECK_UNSUPPORTED(); |
204 | DefaultThreadPool Pool(hardware_concurrency(ThreadCount: 2)); |
205 | auto F1 = Pool.async([] { return 1; }); |
206 | auto F2 = Pool.async([] { return 2; }); |
207 | |
208 | this->setMainThreadReady(); |
209 | Pool.wait(); |
210 | ASSERT_EQ(1, F1.get()); |
211 | ASSERT_EQ(2, F2.get()); |
212 | } |
213 | |
214 | TYPED_TEST(ThreadPoolTest, GetFutureWithResultAndArgs) { |
215 | CHECK_UNSUPPORTED(); |
216 | DefaultThreadPool Pool(hardware_concurrency(ThreadCount: 2)); |
217 | auto Fn = [](int x) { return x; }; |
218 | auto F1 = Pool.async(Fn, 1); |
219 | auto F2 = Pool.async(Fn, 2); |
220 | |
221 | this->setMainThreadReady(); |
222 | Pool.wait(); |
223 | ASSERT_EQ(1, F1.get()); |
224 | ASSERT_EQ(2, F2.get()); |
225 | } |
226 | |
227 | TYPED_TEST(ThreadPoolTest, PoolDestruction) { |
228 | CHECK_UNSUPPORTED(); |
229 | // Test that we are waiting on destruction |
230 | std::atomic_int checked_in{0}; |
231 | { |
232 | DefaultThreadPool Pool; |
233 | for (size_t i = 0; i < 5; ++i) { |
234 | Pool.async([this, &checked_in] { |
235 | this->waitForMainThread(); |
236 | ++checked_in; |
237 | }); |
238 | } |
239 | ASSERT_EQ(0, checked_in); |
240 | this->setMainThreadReady(); |
241 | } |
242 | ASSERT_EQ(5, checked_in); |
243 | } |
244 | |
245 | // Check running tasks in different groups. |
246 | TYPED_TEST(ThreadPoolTest, Groups) { |
247 | CHECK_UNSUPPORTED(); |
248 | // Need at least two threads, as the task in group2 |
249 | // might block a thread until all tasks in group1 finish. |
250 | ThreadPoolStrategy S = hardware_concurrency(ThreadCount: 2); |
251 | if (S.compute_thread_count() < 2) |
252 | GTEST_SKIP(); |
253 | DefaultThreadPool Pool(S); |
254 | typename TestFixture::PhaseResetHelper Helper(this); |
255 | ThreadPoolTaskGroup Group1(Pool); |
256 | ThreadPoolTaskGroup Group2(Pool); |
257 | |
258 | // Check that waiting for an empty group is a no-op. |
259 | Group1.wait(); |
260 | |
261 | std::atomic_int checked_in1{0}; |
262 | std::atomic_int checked_in2{0}; |
263 | |
264 | for (size_t i = 0; i < 5; ++i) { |
265 | Group1.async([this, &checked_in1] { |
266 | this->waitForMainThread(); |
267 | ++checked_in1; |
268 | }); |
269 | } |
270 | Group2.async([this, &checked_in2] { |
271 | this->waitForPhase(2); |
272 | ++checked_in2; |
273 | }); |
274 | ASSERT_EQ(0, checked_in1); |
275 | ASSERT_EQ(0, checked_in2); |
276 | // Start first group and wait for it. |
277 | this->setMainThreadReady(); |
278 | Group1.wait(); |
279 | ASSERT_EQ(5, checked_in1); |
280 | // Second group has not yet finished, start it and wait for it. |
281 | ASSERT_EQ(0, checked_in2); |
282 | this->setPhase(2); |
283 | Group2.wait(); |
284 | ASSERT_EQ(5, checked_in1); |
285 | ASSERT_EQ(1, checked_in2); |
286 | } |
287 | |
288 | // Check recursive tasks. |
289 | TYPED_TEST(ThreadPoolTest, RecursiveGroups) { |
290 | CHECK_UNSUPPORTED(); |
291 | DefaultThreadPool Pool; |
292 | ThreadPoolTaskGroup Group(Pool); |
293 | |
294 | std::atomic_int checked_in1{0}; |
295 | |
296 | for (size_t i = 0; i < 5; ++i) { |
297 | Group.async([this, &Pool, &checked_in1] { |
298 | this->waitForMainThread(); |
299 | |
300 | ThreadPoolTaskGroup LocalGroup(Pool); |
301 | |
302 | // Check that waiting for an empty group is a no-op. |
303 | LocalGroup.wait(); |
304 | |
305 | std::atomic_int checked_in2{0}; |
306 | for (size_t i = 0; i < 5; ++i) { |
307 | LocalGroup.async([&checked_in2] { ++checked_in2; }); |
308 | } |
309 | LocalGroup.wait(); |
310 | ASSERT_EQ(5, checked_in2); |
311 | |
312 | ++checked_in1; |
313 | }); |
314 | } |
315 | ASSERT_EQ(0, checked_in1); |
316 | this->setMainThreadReady(); |
317 | Group.wait(); |
318 | ASSERT_EQ(5, checked_in1); |
319 | } |
320 | |
321 | TYPED_TEST(ThreadPoolTest, RecursiveWaitDeadlock) { |
322 | CHECK_UNSUPPORTED(); |
323 | ThreadPoolStrategy S = hardware_concurrency(ThreadCount: 2); |
324 | if (S.compute_thread_count() < 2) |
325 | GTEST_SKIP(); |
326 | DefaultThreadPool Pool(S); |
327 | typename TestFixture::PhaseResetHelper Helper(this); |
328 | ThreadPoolTaskGroup Group(Pool); |
329 | |
330 | // Test that a thread calling wait() for a group and is waiting for more tasks |
331 | // returns when the last task finishes in a different thread while the waiting |
332 | // thread was waiting for more tasks to process while waiting. |
333 | |
334 | // Task A runs in the first thread. It finishes and leaves |
335 | // the background thread waiting for more tasks. |
336 | Group.async([this] { |
337 | this->waitForMainThread(); |
338 | this->setPhase(2); |
339 | }); |
340 | // Task B is run in a second thread, it launches yet another |
341 | // task C in a different group, which will be handled by the waiting |
342 | // thread started above. |
343 | Group.async([this, &Pool] { |
344 | this->waitForPhase(2); |
345 | ThreadPoolTaskGroup LocalGroup(Pool); |
346 | LocalGroup.async([this] { |
347 | this->waitForPhase(3); |
348 | // Give the other thread enough time to check that there's no task |
349 | // to process and suspend waiting for a notification. This is indeed racy, |
350 | // but probably the best that can be done. |
351 | std::this_thread::sleep_for(rtime: std::chrono::milliseconds(10)); |
352 | }); |
353 | // And task B only now will wait for the tasks in the group (=task C) |
354 | // to finish. This test checks that it does not deadlock. If the |
355 | // `NotifyGroup` handling in ThreadPool::processTasks() didn't take place, |
356 | // this task B would be stuck waiting for tasks to arrive. |
357 | this->setPhase(3); |
358 | LocalGroup.wait(); |
359 | }); |
360 | this->setMainThreadReady(); |
361 | Group.wait(); |
362 | } |
363 | |
364 | #if LLVM_ENABLE_THREADS == 1 |
365 | |
366 | // FIXME: Skip some tests below on non-Windows because multi-socket systems |
367 | // were not fully tested on Unix yet, and llvm::get_thread_affinity_mask() |
368 | // isn't implemented for Unix (need AffinityMask in Support/Unix/Program.inc). |
369 | #ifdef _WIN32 |
370 | |
371 | template <typename ThreadPoolImpl> |
372 | SmallVector<llvm::BitVector, 0> |
373 | ThreadPoolTest<ThreadPoolImpl>::RunOnAllSockets(ThreadPoolStrategy S) { |
374 | llvm::SetVector<llvm::BitVector> ThreadsUsed; |
375 | std::mutex Lock; |
376 | { |
377 | std::condition_variable AllThreads; |
378 | std::mutex AllThreadsLock; |
379 | unsigned Active = 0; |
380 | |
381 | DefaultThreadPool Pool(S); |
382 | for (size_t I = 0; I < S.compute_thread_count(); ++I) { |
383 | Pool.async([&] { |
384 | { |
385 | std::lock_guard<std::mutex> Guard(AllThreadsLock); |
386 | ++Active; |
387 | AllThreads.notify_one(); |
388 | } |
389 | this->waitForMainThread(); |
390 | std::lock_guard<std::mutex> Guard(Lock); |
391 | auto Mask = llvm::get_thread_affinity_mask(); |
392 | ThreadsUsed.insert(Mask); |
393 | }); |
394 | } |
395 | EXPECT_EQ(true, ThreadsUsed.empty()); |
396 | { |
397 | std::unique_lock<std::mutex> Guard(AllThreadsLock); |
398 | AllThreads.wait(Guard, |
399 | [&]() { return Active == S.compute_thread_count(); }); |
400 | } |
401 | this->setMainThreadReady(); |
402 | } |
403 | return ThreadsUsed.takeVector(); |
404 | } |
405 | |
406 | TYPED_TEST(ThreadPoolTest, AllThreads_UseAllRessources) { |
407 | CHECK_UNSUPPORTED(); |
408 | // After Windows 11, the OS is free to deploy the threads on any CPU socket. |
409 | // We cannot relibly ensure that all thread affinity mask are covered, |
410 | // therefore this test should not run. |
411 | if (llvm::RunningWindows11OrGreater()) |
412 | GTEST_SKIP(); |
413 | auto ThreadsUsed = this->RunOnAllSockets({}); |
414 | ASSERT_EQ(llvm::get_cpus(), ThreadsUsed.size()); |
415 | } |
416 | |
417 | TYPED_TEST(ThreadPoolTest, AllThreads_OneThreadPerCore) { |
418 | CHECK_UNSUPPORTED(); |
419 | // After Windows 11, the OS is free to deploy the threads on any CPU socket. |
420 | // We cannot relibly ensure that all thread affinity mask are covered, |
421 | // therefore this test should not run. |
422 | if (llvm::RunningWindows11OrGreater()) |
423 | GTEST_SKIP(); |
424 | auto ThreadsUsed = |
425 | this->RunOnAllSockets(llvm::heavyweight_hardware_concurrency()); |
426 | ASSERT_EQ(llvm::get_cpus(), ThreadsUsed.size()); |
427 | } |
428 | |
429 | // From TestMain.cpp. |
430 | extern const char *TestMainArgv0; |
431 | |
432 | // Just a reachable symbol to ease resolving of the executable's path. |
433 | static cl::opt<std::string> ThreadPoolTestStringArg1("thread-pool-string-arg1" ); |
434 | |
435 | #ifdef _WIN32 |
436 | #define setenv(name, var, ignore) _putenv_s(name, var) |
437 | #endif |
438 | |
439 | TYPED_TEST(ThreadPoolTest, AffinityMask) { |
440 | CHECK_UNSUPPORTED(); |
441 | |
442 | // Skip this test if less than 4 threads are available. |
443 | if (llvm::hardware_concurrency().compute_thread_count() < 4) |
444 | GTEST_SKIP(); |
445 | |
446 | using namespace llvm::sys; |
447 | if (getenv("LLVM_THREADPOOL_AFFINITYMASK" )) { |
448 | auto ThreadsUsed = this->RunOnAllSockets({}); |
449 | // Ensure the threads only ran on CPUs 0-3. |
450 | // NOTE: Don't use ASSERT* here because this runs in a subprocess, |
451 | // and will show up as un-executed in the parent. |
452 | assert(llvm::all_of(ThreadsUsed, |
453 | [](auto &T) { return T.getData().front() < 16UL; }) && |
454 | "Threads ran on more CPUs than expected! The affinity mask does not " |
455 | "seem to work." ); |
456 | GTEST_SKIP(); |
457 | } |
458 | std::string Executable = |
459 | sys::fs::getMainExecutable(TestMainArgv0, &ThreadPoolTestStringArg1); |
460 | StringRef argv[] = {Executable, "--gtest_filter=ThreadPoolTest.AffinityMask" }; |
461 | |
462 | // Add environment variable to the environment of the child process. |
463 | int Res = setenv("LLVM_THREADPOOL_AFFINITYMASK" , "1" , false); |
464 | ASSERT_EQ(Res, 0); |
465 | |
466 | std::string Error; |
467 | bool ExecutionFailed; |
468 | BitVector Affinity; |
469 | Affinity.resize(4); |
470 | Affinity.set(0, 4); // Use CPUs 0,1,2,3. |
471 | int Ret = sys::ExecuteAndWait(Executable, argv, {}, {}, 0, 0, &Error, |
472 | &ExecutionFailed, nullptr, &Affinity); |
473 | ASSERT_EQ(0, Ret); |
474 | } |
475 | |
476 | #endif // #ifdef _WIN32 |
477 | #endif // #if LLVM_ENABLE_THREADS == 1 |
478 | |