Kafka --- 时间轮
Kafka 时间轮:
附上 kafka 官网介绍 https://www.confluent.io/blog/apache-kafka-purgatory-hierarchical-timing-wheels/
源码部分:
// tickMs 当前时间轮中一个时间格表示的时间跨度
// wheelSize 当前时间轮的格数,也是 buckets 数组的大小
// taskCounter 各层级时间轮中任务的总数
// startMs 当前时间轮的创建时间
// queue 整个时间轮公用的一个任务队列
private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {
//当前时间轮的跨度
private[this] val interval = tickMs * wheelSize
//其中每一项 都对应时间轮中的一个时间格,用于保存TimerTaskList的数组,
private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }
//时间轮的指针,
private[this] var currentTime = startMs - (startMs % tickMs) // rounding down to multiple of tickMs
// overflowWheel can potentially be updated and read by two concurrent threads through add().
// Therefore, it needs to be volatile due to the issue of Double-Checked Locking pattern with JVM
//上层时间轮的引用
@volatile private[this] var overflowWheel: TimingWheel = null
}
向时间轮中添加定时任务:
//向时间轮中添加定时任务,也会检查待添加的任务是否已经到期
def add(timerTaskEntry: TimerTaskEntry): Boolean = {
val expiration = timerTaskEntry.expirationMs
//如果任务已经被取消
if (timerTaskEntry.cancelled) {
// Cancelled
false
//如果任务已经到期
} else if (expiration < currentTime + tickMs) {
// Already expired
false
//如果任务在当前时间轮的跨度范围内
} else if (expiration < currentTime + interval) {
// Put in its own bucket
//按照 任务的到期时间查找此任务属于的时间格,并将任务添加到对应的 TimerTaskList 中
val virtualId = expiration / tickMs
val bucket = buckets((virtualId % wheelSize.toLong).toInt)
bucket.add(timerTaskEntry)
/**
* 整个时间轮表示的时间跨度是不变的,随着表针 currentTime 的后移,当前时间轮能处理时间段也在不断后移,
* 新来的 timerTaskEntry 会复用原来已经清理过的 TimerTaskList(bucket) .
* 此时需要重置 TimerTaskList 的到期时间.并将bucket 重新添加到 DelayQueue 中
*/
// Set the bucket expiration time
//设置 bucket 的到期时间
if (bucket.setExpiration(virtualId * tickMs)) {
// The bucket needs to be enqueued because it was an expired bucket
// We only need to enqueue the bucket when its expiration time has changed, i.e. the wheel has advanced
// and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle
// will pass in the same value and hence return false, thus the bucket with the same expiration will not
// be enqueued multiple times.
queue.offer(bucket)
}
true
} else {
//超出了当前时间轮的时间跨度范围,则将任务添加到上层时间轮中处理
// Out of the interval. Put it into the parent timer
if (overflowWheel == null) addOverflowWheel()
overflowWheel.add(timerTaskEntry)
}
}
推进当前时间轮的表针:
//尝试推进当前时间轮的 表针 currentTime ,也会尝试推进 上层的时间轮的表针
def advanceClock(timeMs: Long): Unit = {
if (timeMs >= currentTime + tickMs) {
currentTime = timeMs - (timeMs % tickMs)
// Try to advance the clock of the overflow wheel if present
if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
}
}
添加上层时间轮:
//当前时间轮的时间跨度不够的时候,添加上层时间轮。
//默认情况,上层时间轮的 tickMs 是当前整个时间轮的时间跨度 interval
private[this] def addOverflowWheel(): Unit = {
synchronized {
if (overflowWheel == null) {
/**
* 创建上层时间轮,注意,上层时间轮的tickMs更大,wheelSize不变,则表示的时间跨度也就大
* 随着上层时间轮表针的转动,任务还是会回到最底层的时间轮上,等待最终超时.
*/
overflowWheel = new TimingWheel(
tickMs = interval,
wheelSize = wheelSize,
startMs = currentTime,
//全局唯一的任务计数器
taskCounter = taskCounter,
//全局唯一的任务队列
queue
)
}
}
}