1// Copyright (C) 2022 The Qt Company Ltd.
2// Copyright (C) 2018 Intel Corporation.
3// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
4
5#include "qsemaphore.h"
6#include "qfutex_p.h"
7#include "qdeadlinetimer.h"
8#include "qdatetime.h"
9#include "qdebug.h"
10#include "qlocking_p.h"
11#include "qwaitcondition_p.h"
12
13#include <chrono>
14
15QT_BEGIN_NAMESPACE
16
17using namespace QtFutex;
18
19/*!
20 \class QSemaphore
21 \inmodule QtCore
22 \brief The QSemaphore class provides a general counting semaphore.
23
24 \threadsafe
25
26 \ingroup thread
27
28 A semaphore is a generalization of a mutex. While a mutex can
29 only be locked once, it's possible to acquire a semaphore
30 multiple times. Semaphores are typically used to protect a
31 certain number of identical resources.
32
33 Semaphores support two fundamental operations, acquire() and
34 release():
35
36 \list
37 \li acquire(\e{n}) tries to acquire \e n resources. If there aren't
38 that many resources available, the call will block until this
39 is the case.
40 \li release(\e{n}) releases \e n resources.
41 \endlist
42
43 There's also a tryAcquire() function that returns immediately if
44 it cannot acquire the resources, and an available() function that
45 returns the number of available resources at any time.
46
47 Example:
48
49 \snippet code/src_corelib_thread_qsemaphore.cpp 0
50
51 A typical application of semaphores is for controlling access to
52 a circular buffer shared by a producer thread and a consumer
53 thread. The \l{Semaphores Example} shows how
54 to use QSemaphore to solve that problem.
55
56 A non-computing example of a semaphore would be dining at a
57 restaurant. A semaphore is initialized with the number of chairs
58 in the restaurant. As people arrive, they want a seat. As seats
59 are filled, available() is decremented. As people leave, the
60 available() is incremented, allowing more people to enter. If a
61 party of 10 people want to be seated, but there are only 9 seats,
62 those 10 people will wait, but a party of 4 people would be
63 seated (taking the available seats to 5, making the party of 10
64 people wait longer).
65
66 \sa QSemaphoreReleaser, QMutex, QWaitCondition, QThread, {Semaphores Example}
67*/
68
69/*
70 QSemaphore futex operation
71
72 QSemaphore stores a 32-bit integer with the counter of currently available
73 tokens (value between 0 and INT_MAX). When a thread attempts to acquire n
74 tokens and the counter is larger than that, we perform a compare-and-swap
75 with the new count. If that succeeds, the acquisition worked; if not, we
76 loop again because the counter changed. If there were not enough tokens,
77 we'll perform a futex-wait.
78
79 Before we do, we set the high bit in the futex to indicate that semaphore
80 is contended: that is, there's a thread waiting for more tokens. On
81 release() for n tokens, we perform a fetch-and-add of n and then check if
82 that high bit was set. If it was, then we clear that bit and perform a
83 futex-wake on the semaphore to indicate the waiting threads can wake up and
84 acquire tokens. Which ones get woken up is unspecified.
85
86 If the system has the ability to wake up a precise number of threads, has
87 Linux's FUTEX_WAKE_OP functionality, and is 64-bit, instead of using a
88 single bit indicating a contended semaphore, we'll store the number of
89 tokens *plus* total number of waiters in the high word. Additionally, all
90 multi-token waiters will be waiting on that high word. So when releasing n
91 tokens on those systems, we tell the kernel to wake up n single-token
92 threads and all of the multi-token ones. Which threads get woken up is
93 unspecified, but it's likely single-token threads will get woken up first.
94 */
95
96#if defined(FUTEX_OP) && QT_POINTER_SIZE > 4
97static constexpr bool futexHasWaiterCount = true;
98#else
99static constexpr bool futexHasWaiterCount = false;
100#endif
101
102static constexpr quintptr futexNeedsWakeAllBit = futexHasWaiterCount ?
103 (Q_UINT64_C(1) << (sizeof(quintptr) * CHAR_BIT - 1)) : 0x80000000U;
104
105static int futexAvailCounter(quintptr v)
106{
107 // the low 31 bits
108 if (futexHasWaiterCount) {
109 // the high bit of the low word isn't used
110 Q_ASSERT((v & 0x80000000U) == 0);
111
112 // so we can be a little faster
113 return int(unsigned(v));
114 }
115 return int(v & 0x7fffffffU);
116}
117
118static bool futexNeedsWake(quintptr v)
119{
120 // If we're counting waiters, the number of waiters plus value is stored in the
121 // low 31 bits of the high word (that is, bits 32-62). If we're not, then we only
122 // use futexNeedsWakeAllBit to indicate anyone is waiting.
123 if constexpr (futexHasWaiterCount)
124 return unsigned(quint64(v) >> 32) > unsigned(v);
125 return v >> 31;
126}
127
128static QBasicAtomicInteger<quint32> *futexLow32(QBasicAtomicInteger<quintptr> *ptr)
129{
130 auto result = reinterpret_cast<QBasicAtomicInteger<quint32> *>(ptr);
131#if Q_BYTE_ORDER == Q_BIG_ENDIAN && QT_POINTER_SIZE > 4
132 ++result;
133#endif
134 return result;
135}
136
137static QBasicAtomicInteger<quint32> *futexHigh32(QBasicAtomicInteger<quintptr> *ptr)
138{
139 Q_ASSERT(futexHasWaiterCount);
140 auto result = reinterpret_cast<QBasicAtomicInteger<quint32> *>(ptr);
141#if Q_BYTE_ORDER == Q_LITTLE_ENDIAN && QT_POINTER_SIZE > 4
142 ++result;
143#endif
144 return result;
145}
146
147template <bool IsTimed> bool
148futexSemaphoreTryAcquire_loop(QBasicAtomicInteger<quintptr> &u, quintptr curValue, quintptr nn,
149 QDeadlineTimer timer)
150{
151 qint64 remainingTime = IsTimed ? timer.remainingTimeNSecs() : -1;
152 int n = int(unsigned(nn));
153
154 // we're called after one testAndSet, so start by waiting first
155 for (;;) {
156 // indicate we're waiting
157 auto ptr = futexLow32(ptr: &u);
158 if (n > 1 || !futexHasWaiterCount) {
159 u.fetchAndOrRelaxed(valueToAdd: futexNeedsWakeAllBit);
160 curValue |= futexNeedsWakeAllBit;
161 if constexpr (futexHasWaiterCount) {
162 Q_ASSERT(n > 1);
163 ptr = futexHigh32(ptr: &u);
164 curValue = quint64(curValue) >> 32;
165 }
166 }
167
168 if (IsTimed && remainingTime > 0) {
169 bool timedout = !futexWait(futex&: *ptr, expectedValue: curValue, nstimeout: remainingTime);
170 if (timedout)
171 return false;
172 } else {
173 futexWait(futex&: *ptr, expectedValue: curValue);
174 }
175
176 curValue = u.loadAcquire();
177 if (IsTimed)
178 remainingTime = timer.remainingTimeNSecs();
179
180 // try to acquire
181 while (futexAvailCounter(v: curValue) >= n) {
182 quintptr newValue = curValue - nn;
183 if (u.testAndSetOrdered(expectedValue: curValue, newValue, currentValue&: curValue))
184 return true; // succeeded!
185 }
186
187 // not enough tokens available, put us to wait
188 if (remainingTime == 0)
189 return false;
190 }
191}
192
193static constexpr QDeadlineTimer::ForeverConstant Expired =
194 QDeadlineTimer::ForeverConstant(1);
195
196template <typename T> bool
197futexSemaphoreTryAcquire(QBasicAtomicInteger<quintptr> &u, int n, T timeout)
198{
199 constexpr bool IsTimed = std::is_same_v<QDeadlineTimer, T>;
200 // Try to acquire without waiting (we still loop because the testAndSet
201 // call can fail).
202 quintptr nn = unsigned(n);
203 if (futexHasWaiterCount)
204 nn |= quint64(nn) << 32; // token count replicated in high word
205
206 quintptr curValue = u.loadAcquire();
207 while (futexAvailCounter(v: curValue) >= n) {
208 // try to acquire
209 quintptr newValue = curValue - nn;
210 if (u.testAndSetOrdered(expectedValue: curValue, newValue, currentValue&: curValue))
211 return true; // succeeded!
212 }
213 if constexpr (IsTimed) {
214 if (timeout.hasExpired())
215 return false;
216 } else {
217 if (timeout == Expired)
218 return false;
219 }
220
221 // we need to wait
222 constexpr quintptr oneWaiter = quintptr(Q_UINT64_C(1) << 32); // zero on 32-bit
223 if constexpr (futexHasWaiterCount) {
224 // We don't use the fetched value from above so futexWait() fails if
225 // it changed after the testAndSetOrdered above.
226 quint32 waiterCount = (quint64(curValue) >> 32) & 0x7fffffffU;
227 if (waiterCount == 0x7fffffffU) {
228 qCritical() << "Waiter count overflow in QSemaphore";
229 return false;
230 }
231
232 // increase the waiter count
233 u.fetchAndAddRelaxed(valueToAdd: oneWaiter);
234 curValue += oneWaiter;
235
236 // Also adjust nn to subtract oneWaiter when we succeed in acquiring.
237 nn += oneWaiter;
238 }
239
240 if (futexSemaphoreTryAcquire_loop<IsTimed>(u, curValue, nn, timeout))
241 return true;
242
243 Q_ASSERT(IsTimed);
244
245 if (futexHasWaiterCount) {
246 // decrement the number of threads waiting
247 Q_ASSERT(futexHigh32(&u)->loadRelaxed() & 0x7fffffffU);
248 u.fetchAndSubRelaxed(valueToAdd: oneWaiter);
249 }
250 return false;
251}
252
253class QSemaphorePrivate {
254public:
255 explicit QSemaphorePrivate(int n) : avail(n) { }
256
257 QtPrivate::mutex mutex;
258 QtPrivate::condition_variable cond;
259
260 int avail;
261};
262
263/*!
264 Creates a new semaphore and initializes the number of resources
265 it guards to \a n (by default, 0).
266
267 \sa release(), available()
268*/
269QSemaphore::QSemaphore(int n)
270{
271 Q_ASSERT_X(n >= 0, "QSemaphore", "parameter 'n' must be non-negative");
272 if (futexAvailable()) {
273 quintptr nn = unsigned(n);
274 if (futexHasWaiterCount)
275 nn |= quint64(nn) << 32; // token count replicated in high word
276 u.storeRelaxed(newValue: nn);
277 } else {
278 d = new QSemaphorePrivate(n);
279 }
280}
281
282/*!
283 Destroys the semaphore.
284
285 \warning Destroying a semaphore that is in use may result in
286 undefined behavior.
287*/
288QSemaphore::~QSemaphore()
289{
290 if (!futexAvailable())
291 delete d;
292}
293
294/*!
295 Tries to acquire \c n resources guarded by the semaphore. If \a n
296 > available(), this call will block until enough resources are
297 available.
298
299 \sa release(), available(), tryAcquire()
300*/
301void QSemaphore::acquire(int n)
302{
303#if QT_VERSION >= QT_VERSION_CHECK(7, 0, 0)
304# warning "Move the Q_ASSERT to inline code, make QSemaphore have wide contract, " \
305 "and mark noexcept where futexes are in use."
306#else
307 Q_ASSERT_X(n >= 0, "QSemaphore::acquire", "parameter 'n' must be non-negative");
308#endif
309
310 if (futexAvailable()) {
311 futexSemaphoreTryAcquire(u&: u, n, timeout: QDeadlineTimer::Forever);
312 return;
313 }
314
315 const auto sufficientResourcesAvailable = [this, n] { return d->avail >= n; };
316
317 auto locker = qt_unique_lock(mutex&: d->mutex);
318 d->cond.wait(lock&: locker, p: sufficientResourcesAvailable);
319 d->avail -= n;
320}
321
322/*!
323 Releases \a n resources guarded by the semaphore.
324
325 This function can be used to "create" resources as well. For
326 example:
327
328 \snippet code/src_corelib_thread_qsemaphore.cpp 1
329
330 QSemaphoreReleaser is a \l{http://en.cppreference.com/w/cpp/language/raii}{RAII}
331 wrapper around this function.
332
333 \sa acquire(), available(), QSemaphoreReleaser
334*/
335void QSemaphore::release(int n)
336{
337 Q_ASSERT_X(n >= 0, "QSemaphore::release", "parameter 'n' must be non-negative");
338
339 if (futexAvailable()) {
340 quintptr nn = unsigned(n);
341 if (futexHasWaiterCount)
342 nn |= quint64(nn) << 32; // token count replicated in high word
343 quintptr prevValue = u.loadRelaxed();
344 quintptr newValue;
345 do { // loop just to ensure the operations are done atomically
346 newValue = prevValue + nn;
347 newValue &= (futexNeedsWakeAllBit - 1);
348 } while (!u.testAndSetRelease(expectedValue: prevValue, newValue, currentValue&: prevValue));
349 if (futexNeedsWake(v: prevValue)) {
350#ifdef FUTEX_OP
351 if (futexHasWaiterCount) {
352 /*
353 On 64-bit systems, the single-token waiters wait on the low half
354 and the multi-token waiters wait on the upper half. So we ask
355 the kernel to wake up n single-token waiters and all multi-token
356 waiters (if any), and clear the multi-token wait bit.
357
358 atomic {
359 int oldval = *upper;
360 *upper = oldval | 0;
361 futexWake(lower, n);
362 if (oldval != 0) // always true
363 futexWake(upper, INT_MAX);
364 }
365 */
366 quint32 op = FUTEX_OP_OR;
367 quint32 oparg = 0;
368 quint32 cmp = FUTEX_OP_CMP_NE;
369 quint32 cmparg = 0;
370 futexWakeOp(futex1&: *futexLow32(ptr: &u), wake1: n, INT_MAX, futex2&: *futexHigh32(ptr: &u), FUTEX_OP(op, oparg, cmp, cmparg));
371 return;
372 }
373#endif
374 // Unset the bit and wake everyone. There are two possibilities
375 // under which a thread can set the bit between the AND and the
376 // futexWake:
377 // 1) it did see the new counter value, but it wasn't enough for
378 // its acquisition anyway, so it has to wait;
379 // 2) it did not see the new counter value, in which case its
380 // futexWait will fail.
381 if (futexHasWaiterCount) {
382 futexWakeAll(futex&: *futexLow32(ptr: &u));
383 futexWakeAll(futex&: *futexHigh32(ptr: &u));
384 } else {
385 futexWakeAll(futex&: u);
386 }
387 }
388 return;
389 }
390
391 const auto locker = qt_scoped_lock(mutex&: d->mutex);
392 d->avail += n;
393 d->cond.notify_all();
394}
395
396/*!
397 Returns the number of resources currently available to the
398 semaphore. This number can never be negative.
399
400 \sa acquire(), release()
401*/
402int QSemaphore::available() const
403{
404 if (futexAvailable())
405 return futexAvailCounter(v: u.loadRelaxed());
406
407 const auto locker = qt_scoped_lock(mutex&: d->mutex);
408 return d->avail;
409}
410
411/*!
412 Tries to acquire \c n resources guarded by the semaphore and
413 returns \c true on success. If available() < \a n, this call
414 immediately returns \c false without acquiring any resources.
415
416 Example:
417
418 \snippet code/src_corelib_thread_qsemaphore.cpp 2
419
420 \sa acquire()
421*/
422bool QSemaphore::tryAcquire(int n)
423{
424 Q_ASSERT_X(n >= 0, "QSemaphore::tryAcquire", "parameter 'n' must be non-negative");
425
426 if (futexAvailable())
427 return futexSemaphoreTryAcquire(u&: u, n, timeout: Expired);
428
429 const auto locker = qt_scoped_lock(mutex&: d->mutex);
430 if (n > d->avail)
431 return false;
432 d->avail -= n;
433 return true;
434}
435
436/*!
437 \fn QSemaphore::tryAcquire(int n, int timeout)
438
439 Tries to acquire \c n resources guarded by the semaphore and
440 returns \c true on success. If available() < \a n, this call will
441 wait for at most \a timeout milliseconds for resources to become
442 available.
443
444 Note: Passing a negative number as the \a timeout is equivalent to
445 calling acquire(), i.e. this function will wait forever for
446 resources to become available if \a timeout is negative.
447
448 Example:
449
450 \snippet code/src_corelib_thread_qsemaphore.cpp 3
451
452 \sa acquire()
453*/
454
455/*!
456 \since 6.6
457
458 Tries to acquire \c n resources guarded by the semaphore and returns \c
459 true on success. If available() < \a n, this call will wait until \a timer
460 expires for resources to become available.
461
462 Example:
463
464 \snippet code/src_corelib_thread_qsemaphore.cpp tryAcquire-QDeadlineTimer
465
466 \sa acquire()
467*/
468bool QSemaphore::tryAcquire(int n, QDeadlineTimer timer)
469{
470 if (timer.isForever()) {
471 acquire(n);
472 return true;
473 }
474
475 if (timer.hasExpired())
476 return tryAcquire(n);
477
478 Q_ASSERT_X(n >= 0, "QSemaphore::tryAcquire", "parameter 'n' must be non-negative");
479
480 if (futexAvailable())
481 return futexSemaphoreTryAcquire(u&: u, n, timeout: timer);
482
483 using namespace std::chrono;
484 const auto sufficientResourcesAvailable = [this, n] { return d->avail >= n; };
485
486 auto locker = qt_unique_lock(mutex&: d->mutex);
487 if (!d->cond.wait_until(lock&: locker, atime: timer.deadline<steady_clock>(), p: sufficientResourcesAvailable))
488 return false;
489 d->avail -= n;
490 return true;
491}
492
493/*!
494 \fn template <typename Rep, typename Period> QSemaphore::tryAcquire(int n, std::chrono::duration<Rep, Period> timeout)
495 \overload
496 \since 6.3
497*/
498
499/*!
500 \fn bool QSemaphore::try_acquire()
501 \since 6.3
502
503 This function is provided for \c{std::counting_semaphore} compatibility.
504
505 It is equivalent to calling \c{tryAcquire(1)}, where the function returns
506 \c true on acquiring the resource successfully.
507
508 \sa tryAcquire(), try_acquire_for(), try_acquire_until()
509*/
510
511/*!
512 \fn template <typename Rep, typename Period> bool QSemaphore::try_acquire_for(const std::chrono::duration<Rep, Period> &timeout)
513 \since 6.3
514
515 This function is provided for \c{std::counting_semaphore} compatibility.
516
517 It is equivalent to calling \c{tryAcquire(1, timeout)}, where the call
518 times out on the given \a timeout value. The function returns \c true
519 on acquiring the resource successfully.
520
521 \sa tryAcquire(), try_acquire(), try_acquire_until()
522*/
523
524/*!
525 \fn template <typename Clock, typename Duration> bool QSemaphore::try_acquire_until(const std::chrono::time_point<Clock, Duration> &tp)
526 \since 6.3
527
528 This function is provided for \c{std::counting_semaphore} compatibility.
529
530 It is equivalent to calling \c{tryAcquire(1, tp - Clock::now())},
531 which means that the \a tp (time point) is recorded, ignoring the
532 adjustments to \c{Clock} while waiting. The function returns \c true
533 on acquiring the resource successfully.
534
535 \sa tryAcquire(), try_acquire(), try_acquire_for()
536*/
537
538/*!
539 \class QSemaphoreReleaser
540 \brief The QSemaphoreReleaser class provides exception-safe deferral of a QSemaphore::release() call.
541 \since 5.10
542 \ingroup thread
543 \inmodule QtCore
544
545 \reentrant
546
547 QSemaphoreReleaser can be used wherever you would otherwise use
548 QSemaphore::release(). Constructing a QSemaphoreReleaser defers the
549 release() call on the semaphore until the QSemaphoreReleaser is
550 destroyed (see
551 \l{http://en.cppreference.com/w/cpp/language/raii}{RAII pattern}).
552
553 You can use this to reliably release a semaphore to avoid dead-lock
554 in the face of exceptions or early returns:
555
556 \snippet code/src_corelib_thread_qsemaphore.cpp 4
557
558 If an early return is taken or an exception is thrown before the
559 \c{sem.release()} call is reached, the semaphore is not released,
560 possibly preventing the thread waiting in the corresponding
561 \c{sem.acquire()} call from ever continuing execution.
562
563 When using RAII instead:
564
565 \snippet code/src_corelib_thread_qsemaphore.cpp 5
566
567 this can no longer happen, because the compiler will make sure that
568 the QSemaphoreReleaser destructor is always called, and therefore
569 the semaphore is always released.
570
571 QSemaphoreReleaser is move-enabled and can therefore be returned
572 from functions to transfer responsibility for releasing a semaphore
573 out of a function or a scope:
574
575 \snippet code/src_corelib_thread_qsemaphore.cpp 6
576
577 A QSemaphoreReleaser can be canceled by a call to cancel(). A canceled
578 semaphore releaser will no longer call QSemaphore::release() in its
579 destructor.
580
581 \sa QMutexLocker
582*/
583
584/*!
585 \fn QSemaphoreReleaser::QSemaphoreReleaser()
586
587 Default constructor. Creates a QSemaphoreReleaser that does nothing.
588*/
589
590/*!
591 \fn QSemaphoreReleaser::QSemaphoreReleaser(QSemaphore &sem, int n)
592
593 Constructor. Stores the arguments and calls \a{sem}.release(\a{n})
594 in the destructor.
595*/
596
597/*!
598 \fn QSemaphoreReleaser::QSemaphoreReleaser(QSemaphore *sem, int n)
599
600 Constructor. Stores the arguments and calls \a{sem}->release(\a{n})
601 in the destructor.
602*/
603
604/*!
605 \fn QSemaphoreReleaser::QSemaphoreReleaser(QSemaphoreReleaser &&other)
606
607 Move constructor. Takes over responsibility to call QSemaphore::release()
608 from \a other, which in turn is canceled.
609
610 \sa cancel()
611*/
612
613/*!
614 \fn QSemaphoreReleaser::operator=(QSemaphoreReleaser &&other)
615
616 Move assignment operator. Takes over responsibility to call QSemaphore::release()
617 from \a other, which in turn is canceled.
618
619 If this semaphore releaser had the responsibility to call some QSemaphore::release()
620 itself, it performs the call before taking over from \a other.
621
622 \sa cancel()
623*/
624
625/*!
626 \fn QSemaphoreReleaser::~QSemaphoreReleaser()
627
628 Unless canceled, calls QSemaphore::release() with the arguments provided
629 to the constructor, or by the last move assignment.
630*/
631
632/*!
633 \fn QSemaphoreReleaser::swap(QSemaphoreReleaser &other)
634
635 Exchanges the responsibilities of \c{*this} and \a other.
636
637 Unlike move assignment, neither of the two objects ever releases its
638 semaphore, if any, as a consequence of swapping.
639
640 Therefore this function is very fast and never fails.
641*/
642
643/*!
644 \fn QSemaphoreReleaser::semaphore() const
645
646 Returns a pointer to the QSemaphore object provided to the constructor,
647 or by the last move assignment, if any. Otherwise, returns \nullptr.
648*/
649
650/*!
651 \fn QSemaphoreReleaser::cancel()
652
653 Cancels this QSemaphoreReleaser such that the destructor will no longer
654 call \c{semaphore()->release()}. Returns the value of semaphore()
655 before this call. After this call, semaphore() will return \nullptr.
656
657 To enable again, assign a new QSemaphoreReleaser:
658
659 \snippet code/src_corelib_thread_qsemaphore.cpp 7
660*/
661
662
663QT_END_NAMESPACE
664

source code of qtbase/src/corelib/thread/qsemaphore.cpp