1/*
2Copyright 2018 Google Inc. All Rights Reserved.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS-IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17#include "utils/lockless_task_queue.h"
18
19#include <limits>
20
21#include "base/logging.h"
22
23namespace vraudio {
24
25namespace {
26
27// Reserved index representing an invalid list index.
28constexpr uint64_t kInvalidIndex = std::numeric_limits<uint32_t>::max();
29
30// Maximum number of producers.
31constexpr uint64_t kMaxProducers = kInvalidIndex - 1;
32
33} // namespace
34
35LocklessTaskQueue::LocklessTaskQueue(size_t max_tasks) {
36 CHECK_GT(max_tasks, 0U);
37 CHECK_LE(max_tasks, kMaxProducers);
38 Init(num_nodes: max_tasks);
39}
40
41LocklessTaskQueue::~LocklessTaskQueue() { Clear(); }
42
43void LocklessTaskQueue::Post(Task&& task) {
44 const TagAndIndex free_node_idx = PopNodeFromList(list_head: &free_list_head_idx_);
45 if (GetIndex(tag_and_index: free_node_idx) == kInvalidIndex) {
46 LOG(WARNING) << "Queue capacity reached - dropping task";
47 return;
48 }
49 nodes_[GetIndex(tag_and_index: free_node_idx)].task = std::move(task);
50 PushNodeToList(list_head: &task_list_head_idx_, node: free_node_idx);
51}
52
53void LocklessTaskQueue::Execute() {
54 const TagAndIndex old_flag_with_invalid_index =
55 (GetFlag(tag_and_index: task_list_head_idx_) << 32) + kInvalidIndex;
56 const TagAndIndex old_task_list_head_idx =
57 task_list_head_idx_.exchange(i: old_flag_with_invalid_index);
58 ProcessTaskList(list_head: old_task_list_head_idx, execute: true /*execute_tasks*/);
59}
60
61void LocklessTaskQueue::Clear() {
62 const TagAndIndex old_flag_with_invalid_index =
63 (GetFlag(tag_and_index: task_list_head_idx_) << 32) + kInvalidIndex;
64 const TagAndIndex old_task_list_head_idx =
65 task_list_head_idx_.exchange(i: old_flag_with_invalid_index);
66 ProcessTaskList(list_head: old_task_list_head_idx, execute: false /*execute_tasks*/);
67}
68
69LocklessTaskQueue::TagAndIndex LocklessTaskQueue::IncreaseTag(
70 TagAndIndex tag_and_index) {
71 // The most significant 32 bits a reserved for tagging. Overflows are
72 // acceptable.
73 return tag_and_index + (static_cast<uint64_t>(1) << 32);
74}
75
76LocklessTaskQueue::TagAndIndex LocklessTaskQueue::GetIndex(
77 TagAndIndex tag_and_index) {
78 // The least significant 32 bits a reserved for the index.
79 return tag_and_index & std::numeric_limits<uint32_t>::max();
80}
81
82// Extracts the flag in the most significant 32 bits from a TagAndIndex;
83LocklessTaskQueue::TagAndIndex LocklessTaskQueue::GetFlag(
84 TagAndIndex tag_and_index) {
85 // The most significant 32 bits a reserved for the flag.
86 return tag_and_index >> 32;
87}
88
89void LocklessTaskQueue::PushNodeToList(
90 std::atomic<TagAndIndex>* list_head_idx_ptr, TagAndIndex node_idx) {
91 DCHECK(list_head_idx_ptr);
92 TagAndIndex list_head_idx;
93 do {
94 list_head_idx = list_head_idx_ptr->load();
95 nodes_[GetIndex(tag_and_index: node_idx)].next = list_head_idx;
96 } while (!std::atomic_compare_exchange_strong(a: list_head_idx_ptr,
97 i1: &list_head_idx, i2: node_idx));
98}
99
100LocklessTaskQueue::TagAndIndex LocklessTaskQueue::PopNodeFromList(
101 std::atomic<TagAndIndex>* list_head_idx_ptr) {
102 DCHECK(list_head_idx_ptr);
103 TagAndIndex list_head_idx;
104 TagAndIndex list_head_next;
105 do {
106 list_head_idx = list_head_idx_ptr->load();
107 if (GetIndex(tag_and_index: list_head_idx) == kInvalidIndex) {
108 // End of list reached.
109 return kInvalidIndex;
110 }
111 list_head_next = nodes_[GetIndex(tag_and_index: list_head_idx)].next;
112 } while (!std::atomic_compare_exchange_strong(
113 a: list_head_idx_ptr, i1: &list_head_idx, i2: list_head_next));
114 return IncreaseTag(tag_and_index: list_head_idx);
115}
116
117void LocklessTaskQueue::ProcessTaskList(TagAndIndex list_head_idx,
118 bool execute) {
119 TagAndIndex node_itr = list_head_idx;
120 while (GetIndex(tag_and_index: node_itr) != kInvalidIndex) {
121 Node* node = &nodes_[GetIndex(tag_and_index: node_itr)];
122 TagAndIndex next_node = node->next;
123 temp_tasks_.emplace_back(args: std::move(node->task));
124 node->task = nullptr;
125 PushNodeToList(list_head_idx_ptr: &free_list_head_idx_, node_idx: node_itr);
126 node_itr = next_node;
127 }
128
129 if (execute) {
130 // Execute tasks in reverse order.
131 for (std::vector<Task>::reverse_iterator task_itr = temp_tasks_.rbegin();
132 task_itr != temp_tasks_.rend(); ++task_itr) {
133 if (*task_itr != nullptr) {
134 (*task_itr)();
135 }
136 }
137 }
138 temp_tasks_.clear();
139}
140
141void LocklessTaskQueue::Init(size_t num_nodes) {
142 nodes_.resize(new_size: num_nodes);
143 temp_tasks_.reserve(n: num_nodes);
144
145 // Initialize free list.
146 free_list_head_idx_ = 0;
147 for (size_t i = 0; i < num_nodes - 1; ++i) {
148 nodes_[i].next = i + 1;
149 }
150 nodes_[num_nodes - 1].next = kInvalidIndex;
151
152 // Initialize task list.
153 task_list_head_idx_ = kInvalidIndex;
154}
155
156} // namespace vraudio
157

source code of qtmultimedia/src/3rdparty/resonance-audio/resonance_audio/utils/lockless_task_queue.cc