Version

menu_open
Wwise SDK 2022.1.18
AkFifoQueue.h
Go to the documentation of this file.
1 /*******************************************************************************
2 The content of this file includes portions of the AUDIOKINETIC Wwise Technology
3 released in source code form as part of the SDK installer package.
4 
5 Commercial License Usage
6 
7 Licensees holding valid commercial licenses to the AUDIOKINETIC Wwise Technology
8 may use this file in accordance with the end user license agreement provided
9 with the software or, alternatively, in accordance with the terms contained in a
10 written agreement between you and Audiokinetic Inc.
11 
12 Apache License Usage
13 
14 Alternatively, this file may be used under the Apache License, Version 2.0 (the
15 "Apache License"); you may not use this file except in compliance with the
16 Apache License. You may obtain a copy of the Apache License at
17 http://www.apache.org/licenses/LICENSE-2.0.
18 
19 Unless required by applicable law or agreed to in writing, software distributed
20 under the Apache License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
21 OR CONDITIONS OF ANY KIND, either express or implied. See the Apache License for
22 the specific language governing permissions and limitations under the License.
23 
24  Copyright (c) 2024 Audiokinetic Inc.
25 *******************************************************************************/
26 
27 #ifndef _AKFIFOQUEUE_H
28 #define _AKFIFOQUEUE_H
29 
33 
34 /// AkFifoQueue is a lock-less, thread-safe, multi-producer-multi-consumer queue data structure.
35 /// It is designed to hold copyable values.
36 template<typename T, T TDEFAULT, class TAlloc = ArrayPoolDefault>
37 struct AkFifoQueue : public TAlloc
38 {
39 public:
40 
42  : m_buffer(nullptr)
43  , m_uQueueIndexMask(0)
44  , m_readPos(0)
45  , m_writePos(0)
46  {
47  }
48 
50  {
51  Term();
52  }
53 
54  /// Initializes the FifoQueue and allocates memory for the specified number of entries.
55  /// The number of entries is not growable after initialization.
57  AkUInt32 in_uMaxEntries ///< The number of entries. Must be a power of two.
58  )
59  {
60  // check that maxentries is a power of 2
61  AKASSERT((in_uMaxEntries & (in_uMaxEntries - 1)) == 0);
62 
63  m_uQueueIndexMask = in_uMaxEntries - 1;
64  m_writePos = 0;
65  m_readPos = 0;
66 
67  m_buffer = (FifoQueueEntry*)TAlloc::Alloc(sizeof(FifoQueueEntry) * in_uMaxEntries);
68  if (m_buffer == nullptr)
69  {
70  return AK_InsufficientMemory;
71  }
72 
73  AkZeroMemLarge(m_buffer, sizeof(FifoQueueEntry) * in_uMaxEntries);
74  for (AkUInt32 i = 0; i < in_uMaxEntries; ++i)
75  {
76  m_buffer[i].value = TDEFAULT;
77  AkAtomicStore64(&m_buffer[i].uSequence, i);
78  }
79 
80  return AK_Success;
81  }
82 
83  /// Free memory reserved for the queue and reset internal state
84  /// The queue MUST be empty when this is called!
85  void Term()
86  {
87  if (m_buffer)
88  {
89  AKASSERT(m_readPos == m_writePos);
90  TAlloc::Free(m_buffer);
91 
92  m_buffer = nullptr;
93  }
94  m_readPos = 0;
95  m_writePos = 0;
96  }
97 
98  /// Enqueues the provided value. The value will be copied to the queue's internal buffer.
99  /// Returns true if the enqueue was performed successfully.
100  /// Returns false if the enqueue could not be performed. This can happen if the queue is "full", and some dequeue operations have to occur.
101  AK_NODISCARD bool Enqueue(T in_value)
102  {
103  const AkUInt64 uQueueIndexMask = m_uQueueIndexMask;
104  FifoQueueEntry* pBuffer = m_buffer;
105 
106  AkInt64 writePos = AkAtomicLoad64(&m_writePos);
107  do {
108  // see where we are in the sequence, relative to where we can write data
109  AkInt64 sequenceDelta = AkAtomicLoad64(&pBuffer[writePos & uQueueIndexMask].uSequence) - writePos;
110  // if we're in the right spot, and we can successfully write an updated write position, break out and write the handle into the queue
111  if (sequenceDelta == 0)
112  {
113  if (AkAtomicCas64(&m_writePos, writePos + 1, writePos))
114  {
115  break;
116  }
117  }
118  else if (sequenceDelta < 0)
119  {
120  // we would have over-enqueued if we tried to write the position in. Return false; the user needs to decide how to handle things
121  return false;
122  }
123  else
124  {
125  // if it didn't work, reload writePos: someone else must have written to the sequence and we need to get caught up
126  writePos = AkAtomicLoad64(&m_writePos);
127  }
128  } while (true);
129 
130  // advance the sequence by one so that it can be dequeued
131  pBuffer[writePos & uQueueIndexMask].value = in_value;
132  AkAtomicStore64(&pBuffer[writePos & uQueueIndexMask].uSequence, writePos + 1);
133  return true;
134  }
135 
136  /// Dequeues a value from the specified queue, copying it to io_value
137  /// \return true if a value was successfully dequeued, false otherwise (if false, io_value will not be written to)
138  bool Dequeue(T& io_value)
139  {
140  const AkInt64 uQueueIndexMask = m_uQueueIndexMask;
141  FifoQueueEntry* pBuffer = m_buffer;
142 
143  AkInt64 readPos = AkAtomicLoad64(&m_readPos);
144  do {
145  // see where we are in the sequence relative to where we can write data
146  AkInt64 sequenceDelta = AkAtomicLoad64(&pBuffer[readPos & uQueueIndexMask].uSequence) - (readPos + 1);
147  // if we're in the right spot, and we can successfully write an updated read position, break out and read the entry
148  if (sequenceDelta == 0)
149  {
150  if (AkAtomicCas64(&m_readPos, readPos + 1, readPos))
151  {
152  break;
153  }
154  }
155  // if an entry has yet to be written, bail out
156  else if (sequenceDelta < 0)
157  {
158  return false;
159  }
160  else
161  {
162  // if it didn't work, reload readPos
163  readPos = AkAtomicLoad64(&m_readPos);
164  }
165  } while (true);
166 
167  // update the acceptable sequence value for this entry
168  io_value = pBuffer[readPos & uQueueIndexMask].value;
169  AkAtomicStore64(&pBuffer[readPos & uQueueIndexMask].uSequence, readPos + m_uQueueIndexMask + 1);
170 
171  return true;
172  }
173 
174  /// Checks if there is a value available to be dequeued
175  bool Empty()
176  {
177  AkInt64 readPos = AkAtomicLoad64(&m_readPos);
178  AkInt64 sequenceDelta = AkAtomicLoad64(&m_buffer[readPos & m_uQueueIndexMask].uSequence) - (readPos + 1);
179  return sequenceDelta < 0;
180  }
181 
182 private:
183  struct FifoQueueEntry
184  {
185  // Value actually contained in the queue
186  T value;
187  // Global index of the queue entry in the sequence, to detect when we are at a valid read or write pos
188  AkAtomic64 uSequence;
189  };
190 
191  // Buffer of QueueEntries
192  FifoQueueEntry* m_buffer;
193  // Mask to apply to the read/write position to clamp it to array bounds
194  AkInt64 m_uQueueIndexMask;
195 
196  // readIndex of where we are in the sequence
197  AkAtomic64 m_readPos;
198  // writeIndex of where we are in the sequence
199  AkAtomic64 m_writePos;
200 };
201 
202 #endif // _AKFIFOQUEUE_H
__forceinline int AkAtomicCas64(AkAtomic64 *pDest, long long proposed, long long expected)
Definition: AkAtomic.h:87
__forceinline void AkAtomicStore64(AkAtomic64 *pDest, long long value)
Definition: AkAtomic.h:79
AKSOUNDENGINE_API void Free(AkMemPoolId in_poolId, void *in_pMemAddress)
AKRESULT
Standard function call result.
Definition: AkTypes.h:199
bool Dequeue(T &io_value)
Definition: AkFifoQueue.h:138
@ AK_Success
The operation was successful.
Definition: AkTypes.h:201
#define AkZeroMemLarge(___Dest, ___Size)
bool Empty()
Checks if there is a value available to be dequeued.
Definition: AkFifoQueue.h:175
#define AKASSERT(Condition)
Definition: AkAssert.h:67
volatile long long AkAtomic64
Definition: AkAtomic.h:43
__forceinline long long AkAtomicLoad64(AkAtomic64 *pSrc)
Definition: AkAtomic.h:76
AK_NODISCARD bool Enqueue(T in_value)
Definition: AkFifoQueue.h:101
int64_t AkInt64
Signed 64-bit integer.
uint64_t AkUInt64
Unsigned 64-bit integer.
uint32_t AkUInt32
Unsigned 32-bit integer.
#define AK_NODISCARD
Definition: AkTypes.h:101
void Term()
Definition: AkFifoQueue.h:85
@ AK_InsufficientMemory
Memory error.
Definition: AkTypes.h:229
AKRESULT Init(AkUInt32 in_uMaxEntries)
Definition: AkFifoQueue.h:56

Was this page helpful?

Need Support?

Questions? Problems? Need more info? Contact us, and we can help!

Visit our Support page

Tell us about your project. We're here to help.

Register your project and we'll help you get started with no strings attached!

Get started with Wwise