在 kafka broker 内部,当执行一些需要等待的任务时(比如 broker 处理 producer 的消息,需要等待消息同步到其他副本),会使用到 DelayedOperationPurgatory 和 DelayedOperation,大致流程如下图:
顶层的类是 DelayedOperationPurgatory,它内部包含 2 个重要的属性:WatcherList 的数组,哈希时间轮,WatcherList 可以认为是 ConcurrentHashMap,用于保存映射的 key 和 DelayedOperation,而使用数组是为了降低锁的粒度;哈希时间轮则是为了在任务到期时能及时的触发执行。
1 abstract class DelayedOperation(override val delayMs: Long, 2 lockOpt: Option[Lock] = None) 3 extends TimerTask with Logging { 4 5 private val completed = new AtomicBoolean(false) 6 // Visible for testing 7 private[server] val lock: Lock = lockOpt.getOrElse(new ReentrantLock) 8 9 /* 10 * Force completing the delayed operation, if not already completed. 11 * This function can be triggered when 12 * 13 * 1. The operation has been verified to be completable inside tryComplete() 14 * 2. The operation has expired and hence needs to be completed right now 15 * 16 * Return true iff the operation is completed by the caller: note that 17 * concurrent threads can try to complete the same operation, but only 18 * the first thread will succeed in completing the operation and return 19 * true, others will still return false 20 */ 21 def forceComplete(): Boolean = { 22 if (completed.compareAndSet(false, true)) { 23 // cancel the timeout timer 24 cancel() 25 onComplete() 26 true 27 } else { 28 false 29 } 30 } 31 32 /** 33 * Check if the delayed operation is already completed 34 */ 35 def isCompleted: Boolean = completed.get() 36 37 /** 38 * Call-back to execute when a delayed operation gets expired and hence forced to complete. 39 */ 40 def onExpiration(): Unit 41 42 /** 43 * Process for completing an operation; This function needs to be defined 44 * in subclasses and will be called exactly once in forceComplete() 45 */ 46 def onComplete(): Unit 47 48 /** 49 * Try to complete the delayed operation by first checking if the operation 50 * can be completed by now. If yes execute the completion logic by calling 51 * forceComplete() and return true iff forceComplete returns true; otherwise return false 52 * 53 * This function needs to be defined in subclasses 54 */ 55 def tryComplete(): Boolean 56 57 /** 58 * Thread-safe variant of tryComplete() and call extra function if first tryComplete returns false 59 * @param f else function to be executed after first tryComplete returns false 60 * @return result of tryComplete 61 */ 62 private[server] def safeTryCompleteOrElse(f: => Unit): Boolean = inLock(lock) { 63 if (tryComplete()) true 64 else { 65 f 66 // last completion check 67 tryComplete() 68 } 69 } 70 71 /** 72 * Thread-safe variant of tryComplete() 73 */ 74 private[server] def safeTryComplete(): Boolean = inLock(lock)(tryComplete()) 75 76 /* 77 * run() method defines a task that is executed on timeout 78 */ 79 override def run(): Unit = { 80 if (forceComplete()) 81 onExpiration() 82 } 83 } 84 85 object DelayedOperationPurgatory { 86 87 private val Shards = 512 // Shard the watcher list to reduce lock contention 88 89 def apply[T <: DelayedOperation](purgatoryName: String, 90 brokerId: Int = 0, 91 purgeInterval: Int = 1000, 92 reaperEnabled: Boolean = true, 93 timerEnabled: Boolean = true): DelayedOperationPurgatory[T] = { 94 val timer = new SystemTimer(purgatoryName) 95 new DelayedOperationPurgatory[T](purgatoryName, timer, brokerId, purgeInterval, reaperEnabled, timerEnabled) 96 } 97 98 } 99 100 /** 101 * A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations. 102 */ 103 final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, 104 timeoutTimer: Timer, 105 brokerId: Int = 0, 106 purgeInterval: Int = 1000, 107 reaperEnabled: Boolean = true, 108 timerEnabled: Boolean = true) 109 extends Logging with KafkaMetricsGroup { 110 /* a list of operation watching keys */ 111 private class WatcherList { 112 val watchersByKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key))) 113 114 val watchersLock = new ReentrantLock() 115 116 /* 117 * Return all the current watcher lists, 118 * note that the returned watchers may be removed from the list by other threads 119 */ 120 def allWatchers = { 121 watchersByKey.values 122 } 123 } 124 125 private val watcherLists = Array.fill[WatcherList](DelayedOperationPurgatory.Shards)(new WatcherList) 126 private def watcherList(key: Any): WatcherList = { 127 watcherLists(Math.abs(key.hashCode() % watcherLists.length)) 128 } 129 130 // the number of estimated total operations in the purgatory 131 private[this] val estimatedTotalOperations = new AtomicInteger(0) 132 133 /* background thread expiring operations that have timed out */ 134 private val expirationReaper = new ExpiredOperationReaper() 135 136 private val metricsTags = Map("delayedOperation" -> purgatoryName) 137 newGauge("PurgatorySize", () => watched, metricsTags) 138 newGauge("NumDelayedOperations", () => numDelayed, metricsTags) 139 140 if (reaperEnabled) 141 expirationReaper.start() 142 143 /** 144 * Check if the operation can be completed, if not watch it based on the given watch keys 145 * 146 * Note that a delayed operation can be watched on multiple keys. It is possible that 147 * an operation is completed after it has been added to the watch list for some, but 148 * not all of the keys. In this case, the operation is considered completed and won't 149 * be added to the watch list of the remaining keys. The expiration reaper thread will 150 * remove this operation from any watcher list in which the operation exists. 151 * 152 * @param operation the delayed operation to be checked 153 * @param watchKeys keys for bookkeeping the operation 154 * @return true iff the delayed operations can be completed by the caller 155 */ 156 def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = { 157 assert(watchKeys.nonEmpty, "The watch key list can't be empty") 158 159 // The cost of tryComplete() is typically proportional to the number of keys. Calling tryComplete() for each key is 160 // going to be expensive if there are many keys. Instead, we do the check in the following way through safeTryCompleteOrElse(). 161 // If the operation is not completed, we just add the operation to all keys. Then we call tryComplete() again. At 162 // this time, if the operation is still not completed, we are guaranteed that it won't miss any future triggering 163 // event since the operation is already on the watcher list for all keys. 164 // 165 // ==============[story about lock]============== 166 // Through safeTryCompleteOrElse(), we hold the operation's lock while adding the operation to watch list and doing 167 // the tryComplete() check. This is to avoid a potential deadlock between the callers to tryCompleteElseWatch() and 168 // checkAndComplete(). For example, the following deadlock can happen if the lock is only held for the final tryComplete() 169 // 1) thread_a holds readlock of stateLock from TransactionStateManager 170 // 2) thread_a is executing tryCompleteElseWatch() 171 // 3) thread_a adds op to watch list 172 // 4) thread_b requires writelock of stateLock from TransactionStateManager (blocked by thread_a) 173 // 5) thread_c calls checkAndComplete() and holds lock of op 174 // 6) thread_c is waiting readlock of stateLock to complete op (blocked by thread_b) 175 // 7) thread_a is waiting lock of op to call the final tryComplete() (blocked by thread_c) 176 // 177 // Note that even with the current approach, deadlocks could still be introduced. For example, 178 // 1) thread_a calls tryCompleteElseWatch() and gets lock of op 179 // 2) thread_a adds op to watch list 180 // 3) thread_a calls op#tryComplete and tries to require lock_b 181 // 4) thread_b holds lock_b and calls checkAndComplete() 182 // 5) thread_b sees op from watch list 183 // 6) thread_b needs lock of op 184 // To avoid the above scenario, we recommend DelayedOperationPurgatory.checkAndComplete() be called without holding 185 // any exclusive lock. Since DelayedOperationPurgatory.checkAndComplete() completes delayed operations asynchronously, 186 // holding a exclusive lock to make the call is often unnecessary. 187 if (operation.safeTryCompleteOrElse { 188 watchKeys.foreach(key => watchForOperation(key, operation)) 189 if (watchKeys.nonEmpty) estimatedTotalOperations.incrementAndGet() 190 }) return true 191 192 // if it cannot be completed by now and hence is watched, add to the expire queue also 193 if (!operation.isCompleted) { 194 if (timerEnabled) 195 timeoutTimer.add(operation) 196 if (operation.isCompleted) { 197 // cancel the timer task 198 operation.cancel() 199 } 200 } 201 202 false 203 } 204 205 /** 206 * Check if some delayed operations can be completed with the given watch key, 207 * and if yes complete them. 208 * 209 * @return the number of completed operations during this process 210 */ 211 def checkAndComplete(key: Any): Int = { 212 val wl = watcherList(key) 213 val watchers = inLock(wl.watchersLock) { wl.watchersByKey.get(key) } 214 val numCompleted = if (watchers == null) 215 0 216 else 217 watchers.tryCompleteWatched() 218 debug(s"Request key $key unblocked $numCompleted $purgatoryName operations") 219 numCompleted 220 } 221 222 /** 223 * Return the total size of watch lists the purgatory. Since an operation may be watched 224 * on multiple lists, and some of its watched entries may still be in the watch lists 225 * even when it has been completed, this number may be larger than the number of real operations watched 226 */ 227 def watched: Int = { 228 watcherLists.foldLeft(0) { case (sum, watcherList) => sum + watcherList.allWatchers.map(_.countWatched).sum } 229 } 230 231 /** 232 * Return the number of delayed operations in the expiry queue 233 */ 234 def numDelayed: Int = timeoutTimer.size 235 236 /** 237 * Cancel watching on any delayed operations for the given key. Note the operation will not be completed 238 */ 239 def cancelForKey(key: Any): List[T] = { 240 val wl = watcherList(key) 241 inLock(wl.watchersLock) { 242 val watchers = wl.watchersByKey.remove(key) 243 if (watchers != null) 244 watchers.cancel() 245 else 246 Nil 247 } 248 } 249 250 /* 251 * Return the watch list of the given key, note that we need to 252 * grab the removeWatchersLock to avoid the operation being added to a removed watcher list 253 */ 254 private def watchForOperation(key: Any, operation: T): Unit = { 255 val wl = watcherList(key) 256 inLock(wl.watchersLock) { 257 val watcher = wl.watchersByKey.getAndMaybePut(key) 258 watcher.watch(operation) 259 } 260 } 261 262 /* 263 * Remove the key from watcher lists if its list is empty 264 */ 265 private def removeKeyIfEmpty(key: Any, watchers: Watchers): Unit = { 266 val wl = watcherList(key) 267 inLock(wl.watchersLock) { 268 // if the current key is no longer correlated to the watchers to remove, skip 269 if (wl.watchersByKey.get(key) != watchers) 270 return 271 272 if (watchers != null && watchers.isEmpty) { 273 wl.watchersByKey.remove(key) 274 } 275 } 276 } 277 278 /** 279 * Shutdown the expire reaper thread 280 */ 281 def shutdown(): Unit = { 282 if (reaperEnabled) 283 expirationReaper.shutdown() 284 timeoutTimer.shutdown() 285 removeMetric("PurgatorySize", metricsTags) 286 removeMetric("NumDelayedOperations", metricsTags) 287 } 288 289 /** 290 * A linked list of watched delayed operations based on some key 291 */ 292 private class Watchers(val key: Any) { 293 private[this] val operations = new ConcurrentLinkedQueue[T]() 294 295 // count the current number of watched operations. This is O(n), so use isEmpty() if possible 296 def countWatched: Int = operations.size 297 298 def isEmpty: Boolean = operations.isEmpty 299 300 // add the element to watch 301 def watch(t: T): Unit = { 302 operations.add(t) 303 } 304 305 // traverse the list and try to complete some watched elements 306 def tryCompleteWatched(): Int = { 307 var completed = 0 308 309 val iter = operations.iterator() 310 while (iter.hasNext) { 311 val curr = iter.next() 312 if (curr.isCompleted) { 313 // another thread has completed this operation, just remove it 314 iter.remove() 315 } else if (curr.safeTryComplete()) { 316 iter.remove() 317 completed += 1 318 } 319 } 320 321 if (operations.isEmpty) 322 removeKeyIfEmpty(key, this) 323 324 completed 325 } 326 327 def cancel(): List[T] = { 328 val iter = operations.iterator() 329 val cancelled = new ListBuffer[T]() 330 while (iter.hasNext) { 331 val curr = iter.next() 332 curr.cancel() 333 iter.remove() 334 cancelled += curr 335 } 336 cancelled.toList 337 } 338 339 // traverse the list and purge elements that are already completed by others 340 def purgeCompleted(): Int = { 341 var purged = 0 342 343 val iter = operations.iterator() 344 while (iter.hasNext) { 345 val curr = iter.next() 346 if (curr.isCompleted) { 347 iter.remove() 348 purged += 1 349 } 350 } 351 352 if (operations.isEmpty) 353 removeKeyIfEmpty(key, this) 354 355 purged 356 } 357 } 358 359 def advanceClock(timeoutMs: Long): Unit = { 360 timeoutTimer.advanceClock(timeoutMs) 361 362 // Trigger a purge if the number of completed but still being watched operations is larger than 363 // the purge threshold. That number is computed by the difference btw the estimated total number of 364 // operations and the number of pending delayed operations. 365 if (estimatedTotalOperations.get - numDelayed > purgeInterval) { 366 // now set estimatedTotalOperations to delayed (the number of pending operations) since we are going to 367 // clean up watchers. Note that, if more operations are completed during the clean up, we may end up with 368 // a little overestimated total number of operations. 369 estimatedTotalOperations.getAndSet(numDelayed) 370 debug("Begin purging watch lists") 371 val purged = watcherLists.foldLeft(0) { 372 case (sum, watcherList) => sum + watcherList.allWatchers.map(_.purgeCompleted()).sum 373 } 374 debug("Purged %d elements from watch lists.".format(purged)) 375 } 376 } 377 378 /** 379 * A background reaper to expire delayed operations that have timed out 380 */ 381 private class ExpiredOperationReaper extends ShutdownableThread( 382 "ExpirationReaper-%d-%s".format(brokerId, purgatoryName), 383 false) { 384 385 override def doWork(): Unit = { 386 advanceClock(200L) 387 } 388 } 389 }View Code
哈希轮的数据结构是 SystemTimer,值得注意的是,这是一个多层的哈希轮,同时时间的推进使用的是 DelayQueue 获取 bucket,以免做无效的推进。
1 package kafka.utils.timer 2 3 import java.util.concurrent.atomic.AtomicInteger 4 import java.util.concurrent.locks.ReentrantReadWriteLock 5 import java.util.concurrent.{DelayQueue, Executors, TimeUnit} 6 7 import kafka.utils.threadsafe 8 import org.apache.kafka.common.utils.{KafkaThread, Time} 9 10 trait Timer { 11 /** 12 * Add a new task to this executor. It will be executed after the task's delay 13 * (beginning from the time of submission) 14 * @param timerTask the task to add 15 */ 16 def add(timerTask: TimerTask): Unit 17 18 /** 19 * Advance the internal clock, executing any tasks whose expiration has been 20 * reached within the duration of the passed timeout. 21 * @param timeoutMs 22 * @return whether or not any tasks were executed 23 */ 24 def advanceClock(timeoutMs: Long): Boolean 25 26 /** 27 * Get the number of tasks pending execution 28 * @return the number of tasks 29 */ 30 def size: Int 31 32 /** 33 * Shutdown the timer service, leaving pending tasks unexecuted 34 */ 35 def shutdown(): Unit 36 } 37 38 @threadsafe 39 class SystemTimer(executorName: String, 40 tickMs: Long = 1, 41 wheelSize: Int = 20, 42 startMs: Long = Time.SYSTEM.hiResClockMs) extends Timer { 43 44 // timeout timer 45 private[this] val taskExecutor = Executors.newFixedThreadPool(1, 46 (runnable: Runnable) => KafkaThread.nonDaemon("executor-" + executorName, runnable)) 47 48 private[this] val delayQueue = new DelayQueue[TimerTaskList]() 49 private[this] val taskCounter = new AtomicInteger(0) 50 private[this] val timingWheel = new TimingWheel( 51 tickMs = tickMs, 52 wheelSize = wheelSize, 53 startMs = startMs, 54 taskCounter = taskCounter, 55 delayQueue 56 ) 57 58 // Locks used to protect data structures while ticking 59 private[this] val readWriteLock = new ReentrantReadWriteLock() 60 private[this] val readLock = readWriteLock.readLock() 61 private[this] val writeLock = readWriteLock.writeLock() 62 63 def add(timerTask: TimerTask): Unit = { 64 readLock.lock() 65 try { 66 addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + Time.SYSTEM.hiResClockMs)) 67 } finally { 68 readLock.unlock() 69 } 70 } 71 72 private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = { 73 if (!timingWheel.add(timerTaskEntry)) { 74 // Already expired or cancelled 75 if (!timerTaskEntry.cancelled) 76 taskExecutor.submit(timerTaskEntry.timerTask) 77 } 78 } 79 80 /* 81 * Advances the clock if there is an expired bucket. If there isn't any expired bucket when called, 82 * waits up to timeoutMs before giving up. 83 */ 84 def advanceClock(timeoutMs: Long): Boolean = { 85 var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS) 86 if (bucket != null) { 87 writeLock.lock() 88 try { 89 while (bucket != null) { 90 timingWheel.advanceClock(bucket.getExpiration) 91 bucket.flush(addTimerTaskEntry) 92 bucket = delayQueue.poll() 93 } 94 } finally { 95 writeLock.unlock() 96 } 97 true 98 } else { 99 false 100 } 101 } 102 103 def size: Int = taskCounter.get 104 105 override def shutdown(): Unit = { 106 taskExecutor.shutdown() 107 } 108 109 }View Code
在 DelayedOperationPurgatory 中,推动时间轮转动的线程是 ExpiredOperationReaper。
标签:operations,DelayedOperation,val,completed,kafka,key,operation,浅析,def From: https://www.cnblogs.com/allenwas3/p/17717093.html