AsyncSemaphore.swift (9690B)
1 // MIT License 2 // Copyright (C) 2022 Gwendal Roué 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a 5 // copy of this software and associated documentation files (the 6 // "Software"), to deal in the Software without restriction, including 7 // without limitation the rights to use, copy, modify, merge, publish, 8 // distribute, sublicense, and/or sell copies of the Software, and to 9 // permit persons to whom the Software is furnished to do so, subject to 10 // the following conditions: 11 // 12 // The above copyright notice and this permission notice shall be included 13 // in all copies or substantial portions of the Software. 14 // 15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 16 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 17 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. 18 // IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY 19 // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, 20 // TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE 21 // SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 22 23 import Foundation 24 25 /// An object that controls access to a resource across multiple execution 26 /// contexts through use of a traditional counting semaphore. 27 /// 28 /// You increment a semaphore count by calling the ``signal()`` method, and 29 /// decrement a semaphore count by calling ``wait()`` or one of its variants. 30 /// 31 /// ## Topics 32 /// 33 /// ### Creating a Semaphore 34 /// 35 /// - ``init(value:)`` 36 /// 37 /// ### Signaling the Semaphore 38 /// 39 /// - ``signal()`` 40 /// 41 /// ### Waiting for the Semaphore 42 /// 43 /// - ``wait()`` 44 /// - ``waitUnlessCancelled()`` 45 public final class AsyncSemaphore: @unchecked Sendable { 46 /// `Suspension` is the state of a task waiting for a signal. 47 /// 48 /// It is a class because instance identity helps `waitUnlessCancelled()` 49 /// deal with both early and late cancellation. 50 /// 51 /// We make it @unchecked Sendable in order to prevent compiler warnings: 52 /// instances are always protected by the semaphore's lock. 53 private class Suspension: @unchecked Sendable { 54 enum State { 55 /// Initial state. Next is suspendedUnlessCancelled, or cancelled. 56 case pending 57 58 /// Waiting for a signal, with support for cancellation. 59 case suspendedUnlessCancelled(UnsafeContinuation<Void, Error>) 60 61 /// Waiting for a signal, with no support for cancellation. 62 case suspended(UnsafeContinuation<Void, Never>) 63 64 /// Cancelled before we have started waiting. 65 case cancelled 66 } 67 68 var state: State 69 70 init(state: State) { 71 self.state = state 72 } 73 } 74 75 // MARK: - Internal State 76 77 /// The semaphore value. 78 private var value: Int 79 80 /// As many elements as there are suspended tasks waiting for a signal. 81 private var suspensions: [Suspension] = [] 82 83 /// The lock that protects `value` and `suspensions`. 84 /// 85 /// It is recursive in order to handle cancellation (see the implementation 86 /// of ``waitUnlessCancelled()``). 87 private let _lock = NSRecursiveLock() 88 89 // MARK: - Creating a Semaphore 90 91 /// Creates a semaphore. 92 /// 93 /// - parameter value: The starting value for the semaphore. Do not pass a 94 /// value less than zero. 95 public init(value: Int) { 96 precondition(value >= 0, "AsyncSemaphore requires a value equal or greater than zero") 97 self.value = value 98 } 99 100 deinit { 101 precondition(suspensions.isEmpty, "AsyncSemaphore is deallocated while some task(s) are suspended waiting for a signal.") 102 } 103 104 // MARK: - Locking 105 106 // Let's hide the locking primitive in order to avoid a compiler warning: 107 // 108 // > Instance method 'lock' is unavailable from asynchronous contexts; 109 // > Use async-safe scoped locking instead; this is an error in Swift 6. 110 // 111 // We're not sweeping bad stuff under the rug. We really need to protect 112 // our inner state (`value` and `suspension`) across the calls to 113 // `withUnsafeContinuation`. Unfortunately, this method introduces a 114 // suspension point. So we need a lock. 115 private func lock() { _lock.lock() } 116 private func unlock() { _lock.unlock() } 117 118 // MARK: - Waiting for the Semaphore 119 120 /// Waits for, or decrements, a semaphore. 121 /// 122 /// Decrement the counting semaphore. If the resulting value is less than 123 /// zero, this function suspends the current task until a signal occurs, 124 /// without blocking the underlying thread. Otherwise, no suspension happens. 125 public func wait() async { 126 lock() 127 128 value -= 1 129 if value >= 0 { 130 unlock() 131 return 132 } 133 134 await withUnsafeContinuation { continuation in 135 // Register the continuation that `signal` will resume. 136 let suspension = Suspension(state: .suspended(continuation)) 137 suspensions.insert(suspension, at: 0) // FIFO 138 unlock() 139 } 140 } 141 142 /// Waits for, or decrements, a semaphore, with support for cancellation. 143 /// 144 /// Decrement the counting semaphore. If the resulting value is less than 145 /// zero, this function suspends the current task until a signal occurs, 146 /// without blocking the underlying thread. Otherwise, no suspension happens. 147 /// 148 /// If the task is canceled before a signal occurs, this function 149 /// throws `CancellationError`. 150 public func waitUnlessCancelled() async throws { 151 lock() 152 153 value -= 1 154 if value >= 0 { 155 defer { unlock() } 156 157 do { 158 // All code paths check for cancellation 159 try Task.checkCancellation() 160 } catch { 161 // Cancellation is like a signal: we don't really "consume" 162 // the semaphore, and restore the value. 163 value += 1 164 throw error 165 } 166 167 return 168 } 169 170 // Get ready for being suspended waiting for a continuation, or for 171 // early cancellation. 172 let suspension = Suspension(state: .pending) 173 174 try await withTaskCancellationHandler { 175 try await withUnsafeThrowingContinuation { (continuation: UnsafeContinuation<Void, Error>) in 176 if case .cancelled = suspension.state { 177 // Early cancellation: waitUnlessCancelled() is called from 178 // a cancelled task, and the `onCancel` closure below 179 // has marked the suspension as cancelled. 180 // Resume with a CancellationError. 181 unlock() 182 continuation.resume(throwing: CancellationError()) 183 } else { 184 // Current task is not cancelled: register the continuation 185 // that `signal` will resume. 186 suspension.state = .suspendedUnlessCancelled(continuation) 187 suspensions.insert(suspension, at: 0) // FIFO 188 unlock() 189 } 190 } 191 } onCancel: { 192 // withTaskCancellationHandler may immediately call this block (if 193 // the current task is cancelled), or call it later (if the task is 194 // cancelled later). In the first case, we're still holding the lock, 195 // waiting for the continuation. In the second case, we do not hold 196 // the lock. Being able to handle both situations is the reason why 197 // we use a recursive lock. 198 lock() 199 200 // We're no longer waiting for a signal 201 value += 1 202 if let index = suspensions.firstIndex(where: { $0 === suspension }) { 203 suspensions.remove(at: index) 204 } 205 206 if case let .suspendedUnlessCancelled(continuation) = suspension.state { 207 // Late cancellation: the task is cancelled while waiting 208 // from the semaphore. Resume with a CancellationError. 209 unlock() 210 continuation.resume(throwing: CancellationError()) 211 } else { 212 // Early cancellation: waitUnlessCancelled() is called from 213 // a cancelled task. 214 // 215 // The next step is the `withTaskCancellationHandler` 216 // operation closure right above. 217 suspension.state = .cancelled 218 unlock() 219 } 220 } 221 } 222 223 // MARK: - Signaling the Semaphore 224 225 /// Signals (increments) a semaphore. 226 /// 227 /// Increment the counting semaphore. If the previous value was less than 228 /// zero, this function resumes a task currently suspended in ``wait()`` 229 /// or ``waitUnlessCancelled()``. 230 /// 231 /// - returns: This function returns true if a suspended task is 232 /// resumed. Otherwise, the result is false, meaning that no task was 233 /// waiting for the semaphore. 234 @discardableResult 235 public func signal() -> Bool { 236 lock() 237 238 value += 1 239 240 switch suspensions.popLast()?.state { // FIFO 241 case let .suspendedUnlessCancelled(continuation): 242 unlock() 243 continuation.resume() 244 return true 245 case let .suspended(continuation): 246 unlock() 247 continuation.resume() 248 return true 249 default: 250 unlock() 251 return false 252 } 253 } 254 }