1 | //==-- llvm/Support/ThreadPool.cpp - A ThreadPool implementation -*- C++ -*-==// |
2 | // |
3 | // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. |
4 | // See https://llvm.org/LICENSE.txt for license information. |
5 | // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception |
6 | // |
7 | //===----------------------------------------------------------------------===// |
8 | // |
9 | // This file implements a crude C++11 based thread pool. |
10 | // |
11 | //===----------------------------------------------------------------------===// |
12 | |
13 | #include "llvm/Support/ThreadPool.h" |
14 | |
15 | #include "llvm/Config/llvm-config.h" |
16 | |
17 | #if LLVM_ENABLE_THREADS |
18 | #include "llvm/Support/FormatVariadic.h" |
19 | #include "llvm/Support/Threading.h" |
20 | #else |
21 | #include "llvm/Support/raw_ostream.h" |
22 | #endif |
23 | |
24 | using namespace llvm; |
25 | |
26 | #if LLVM_ENABLE_THREADS |
27 | |
28 | // A note on thread groups: Tasks are by default in no group (represented |
29 | // by nullptr ThreadPoolTaskGroup pointer in the Tasks queue) and functionality |
30 | // here normally works on all tasks regardless of their group (functions |
31 | // in that case receive nullptr ThreadPoolTaskGroup pointer as argument). |
32 | // A task in a group has a pointer to that ThreadPoolTaskGroup in the Tasks |
33 | // queue, and functions called to work only on tasks from one group take that |
34 | // pointer. |
35 | |
36 | ThreadPool::ThreadPool(ThreadPoolStrategy S) |
37 | : Strategy(S), MaxThreadCount(S.compute_thread_count()) {} |
38 | |
39 | void ThreadPool::grow(int requested) { |
40 | llvm::sys::ScopedWriter LockGuard(ThreadsLock); |
41 | if (Threads.size() >= MaxThreadCount) |
42 | return; // Already hit the max thread pool size. |
43 | int newThreadCount = std::min<int>(a: requested, b: MaxThreadCount); |
44 | while (static_cast<int>(Threads.size()) < newThreadCount) { |
45 | int ThreadID = Threads.size(); |
46 | Threads.emplace_back(args: [this, ThreadID] { |
47 | set_thread_name(formatv(Fmt: "llvm-worker-{0}" , Vals: ThreadID)); |
48 | Strategy.apply_thread_strategy(ThreadPoolNum: ThreadID); |
49 | processTasks(WaitingForGroup: nullptr); |
50 | }); |
51 | } |
52 | } |
53 | |
54 | #ifndef NDEBUG |
55 | // The group of the tasks run by the current thread. |
56 | static LLVM_THREAD_LOCAL std::vector<ThreadPoolTaskGroup *> |
57 | *CurrentThreadTaskGroups = nullptr; |
58 | #endif |
59 | |
60 | // WaitingForGroup == nullptr means all tasks regardless of their group. |
61 | void ThreadPool::processTasks(ThreadPoolTaskGroup *WaitingForGroup) { |
62 | while (true) { |
63 | std::function<void()> Task; |
64 | ThreadPoolTaskGroup *GroupOfTask; |
65 | { |
66 | std::unique_lock<std::mutex> LockGuard(QueueLock); |
67 | bool workCompletedForGroup = false; // Result of workCompletedUnlocked() |
68 | // Wait for tasks to be pushed in the queue |
69 | QueueCondition.wait(lock&: LockGuard, p: [&] { |
70 | return !EnableFlag || !Tasks.empty() || |
71 | (WaitingForGroup != nullptr && |
72 | (workCompletedForGroup = |
73 | workCompletedUnlocked(Group: WaitingForGroup))); |
74 | }); |
75 | // Exit condition |
76 | if (!EnableFlag && Tasks.empty()) |
77 | return; |
78 | if (WaitingForGroup != nullptr && workCompletedForGroup) |
79 | return; |
80 | // Yeah, we have a task, grab it and release the lock on the queue |
81 | |
82 | // We first need to signal that we are active before popping the queue |
83 | // in order for wait() to properly detect that even if the queue is |
84 | // empty, there is still a task in flight. |
85 | ++ActiveThreads; |
86 | Task = std::move(Tasks.front().first); |
87 | GroupOfTask = Tasks.front().second; |
88 | // Need to count active threads in each group separately, ActiveThreads |
89 | // would never be 0 if waiting for another group inside a wait. |
90 | if (GroupOfTask != nullptr) |
91 | ++ActiveGroups[GroupOfTask]; // Increment or set to 1 if new item |
92 | Tasks.pop_front(); |
93 | } |
94 | #ifndef NDEBUG |
95 | if (CurrentThreadTaskGroups == nullptr) |
96 | CurrentThreadTaskGroups = new std::vector<ThreadPoolTaskGroup *>; |
97 | CurrentThreadTaskGroups->push_back(x: GroupOfTask); |
98 | #endif |
99 | |
100 | // Run the task we just grabbed |
101 | Task(); |
102 | |
103 | #ifndef NDEBUG |
104 | CurrentThreadTaskGroups->pop_back(); |
105 | if (CurrentThreadTaskGroups->empty()) { |
106 | delete CurrentThreadTaskGroups; |
107 | CurrentThreadTaskGroups = nullptr; |
108 | } |
109 | #endif |
110 | |
111 | bool Notify; |
112 | bool NotifyGroup; |
113 | { |
114 | // Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait() |
115 | std::lock_guard<std::mutex> LockGuard(QueueLock); |
116 | --ActiveThreads; |
117 | if (GroupOfTask != nullptr) { |
118 | auto A = ActiveGroups.find(Val: GroupOfTask); |
119 | if (--(A->second) == 0) |
120 | ActiveGroups.erase(I: A); |
121 | } |
122 | Notify = workCompletedUnlocked(Group: GroupOfTask); |
123 | NotifyGroup = GroupOfTask != nullptr && Notify; |
124 | } |
125 | // Notify task completion if this is the last active thread, in case |
126 | // someone waits on ThreadPool::wait(). |
127 | if (Notify) |
128 | CompletionCondition.notify_all(); |
129 | // If this was a task in a group, notify also threads waiting for tasks |
130 | // in this function on QueueCondition, to make a recursive wait() return |
131 | // after the group it's been waiting for has finished. |
132 | if (NotifyGroup) |
133 | QueueCondition.notify_all(); |
134 | } |
135 | } |
136 | |
137 | bool ThreadPool::workCompletedUnlocked(ThreadPoolTaskGroup *Group) const { |
138 | if (Group == nullptr) |
139 | return !ActiveThreads && Tasks.empty(); |
140 | return ActiveGroups.count(Val: Group) == 0 && |
141 | !llvm::any_of(Range: Tasks, |
142 | P: [Group](const auto &T) { return T.second == Group; }); |
143 | } |
144 | |
145 | void ThreadPool::wait() { |
146 | assert(!isWorkerThread()); // Would deadlock waiting for itself. |
147 | // Wait for all threads to complete and the queue to be empty |
148 | std::unique_lock<std::mutex> LockGuard(QueueLock); |
149 | CompletionCondition.wait(lock&: LockGuard, |
150 | p: [&] { return workCompletedUnlocked(Group: nullptr); }); |
151 | } |
152 | |
153 | void ThreadPool::wait(ThreadPoolTaskGroup &Group) { |
154 | // Wait for all threads in the group to complete. |
155 | if (!isWorkerThread()) { |
156 | std::unique_lock<std::mutex> LockGuard(QueueLock); |
157 | CompletionCondition.wait(lock&: LockGuard, |
158 | p: [&] { return workCompletedUnlocked(Group: &Group); }); |
159 | return; |
160 | } |
161 | // Make sure to not deadlock waiting for oneself. |
162 | assert(CurrentThreadTaskGroups == nullptr || |
163 | !llvm::is_contained(*CurrentThreadTaskGroups, &Group)); |
164 | // Handle the case of recursive call from another task in a different group, |
165 | // in which case process tasks while waiting to keep the thread busy and avoid |
166 | // possible deadlock. |
167 | processTasks(WaitingForGroup: &Group); |
168 | } |
169 | |
170 | bool ThreadPool::isWorkerThread() const { |
171 | llvm::sys::ScopedReader LockGuard(ThreadsLock); |
172 | llvm::thread::id CurrentThreadId = llvm::this_thread::get_id(); |
173 | for (const llvm::thread &Thread : Threads) |
174 | if (CurrentThreadId == Thread.get_id()) |
175 | return true; |
176 | return false; |
177 | } |
178 | |
179 | // The destructor joins all threads, waiting for completion. |
180 | ThreadPool::~ThreadPool() { |
181 | { |
182 | std::unique_lock<std::mutex> LockGuard(QueueLock); |
183 | EnableFlag = false; |
184 | } |
185 | QueueCondition.notify_all(); |
186 | llvm::sys::ScopedReader LockGuard(ThreadsLock); |
187 | for (auto &Worker : Threads) |
188 | Worker.join(); |
189 | } |
190 | |
191 | #else // LLVM_ENABLE_THREADS Disabled |
192 | |
193 | // No threads are launched, issue a warning if ThreadCount is not 0 |
194 | ThreadPool::ThreadPool(ThreadPoolStrategy S) : MaxThreadCount(1) { |
195 | int ThreadCount = S.compute_thread_count(); |
196 | if (ThreadCount != 1) { |
197 | errs() << "Warning: request a ThreadPool with " << ThreadCount |
198 | << " threads, but LLVM_ENABLE_THREADS has been turned off\n" ; |
199 | } |
200 | } |
201 | |
202 | void ThreadPool::wait() { |
203 | // Sequential implementation running the tasks |
204 | while (!Tasks.empty()) { |
205 | auto Task = std::move(Tasks.front().first); |
206 | Tasks.pop_front(); |
207 | Task(); |
208 | } |
209 | } |
210 | |
211 | void ThreadPool::wait(ThreadPoolTaskGroup &) { |
212 | // Simply wait for all, this works even if recursive (the running task |
213 | // is already removed from the queue). |
214 | wait(); |
215 | } |
216 | |
217 | bool ThreadPool::isWorkerThread() const { |
218 | report_fatal_error("LLVM compiled without multithreading" ); |
219 | } |
220 | |
221 | ThreadPool::~ThreadPool() { wait(); } |
222 | |
223 | #endif |
224 | |