RateLimiterGuava包提供的限流器,采用了令牌桶算法,特定是均匀地向桶中添加令牌,每次消费时也必须持有令牌,否则就需要等待。应用场景之一是限制消息消费的速度,避免消息消费过快而对下游的数据库造成较大的压力。

本文主要介绍RateLimiter的源码,包括他的基本限流器SmoothBursty,以及带预热效果的SmoothWarmingUp



RateLimiter作为限流器的顶层类,只有两个属性:

  private final SleepingStopwatch stopwatch;
private volatile Object mutexDoNotUseDirectly;

stopwatch用来计算时间间隔,以及实现了当拿不到令牌时将线程阻塞的功能;mutexDoNotUseDirectly主要用来进行线程同步。

RateLimiter作为一个抽象类,本身不能直接实例化,可以使用静态工厂方法来创建:

 public static RateLimiter create(double permitsPerSecond);  //①
public static RateLimiter create(double permitsPerSecond, Duration warmupPeriod); //②
public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) //③

RateLimiter对外提供了3个构造器,分成两类,构造器①是第一类,底层会创建基本限流器SmoothBursty;构造器②和③是第二类,底层会创建带warm up效果的SmoothWarmingUp。参数permitsPerSecond表示每秒产生多少个令牌,参数warmupPeriod是限流器warm up阶段的时间,即限流器产生令牌从最慢到最快所需要的时间,参数unitwarm up的时间单位。

SmoothRateLimiter新增了4个属性:

  //桶中存储的令牌数
double storedPermits;
//桶中允许的最大令牌数
double maxPermits;
//稳定状态下产生令牌是速度,其值为1/permitsPerSecond
double stableIntervalMicros;
//下一次请求需要等待的时间
private long nextFreeTicketMicros = 0L; // could be either in the past or future

这其中比较有意思的是nextFreeTicketMicros字段,它表示下一次获取令牌的请求到来时需要等待的时间,该字段可以实现上一次获取令牌的请求预支的等待时间由下一次请求来兑现。

接下来先介绍下SmoothBursty的构造过程:

public static RateLimiter create(double permitsPerSecond) {
return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
} static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}

构造SmoothBursty时出传入了两个参数,stopwatch好理解,第二个参数意思是当限流器长时间没用时,令牌桶内最多存储多少秒的令牌,这里限定了最多只存储1秒钟的令牌,也就是permitsPerSecond个。

我们继续分析setRate方法的实现:

  public final void setRate(double permitsPerSecond) {
checkArgument(
permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
synchronized (mutex()) {
doSetRate(permitsPerSecond, stopwatch.readMicros());
}
}

setRate方法先校验permitsPerSecond必须为整数,然后在同步块中执行doSetRate方法。mutex方法通过双重检测的方式实例化mutexDoNotUseDirectly字段,详细代码略去,doSetRate是抽象方法,其具体的实现在抽象子类SmoothRateLimiter中:

  final void doSetRate(double permitsPerSecond, long nowMicros) {
resync(nowMicros);
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}

doSetRate方法主要是设置了stableIntervalMicros字段,调用的两个方法resync和重载方法doSetRate我们接着分析。resync方法主要用来设置storedPermitsnextFreeTicketMicros这俩字段,代码如下:

  void resync(long nowMicros) {
// if nextFreeTicket is in the past, resync to now
if (nowMicros > nextFreeTicketMicros) {
//计算超过的这些时间里产生了多少新的令牌
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
//重新计算当前令牌桶内持有的令牌数
storedPermits = min(maxPermits, storedPermits + newPermits);
//更新下次准许获取令牌的时间为当前时间
nextFreeTicketMicros = nowMicros;
}
}

此方法会根据当前的时间决定是否进行字段赋值,如果当前时间已经超过了nextFreeTicketMicros的值,那么就重新计算storedPermitsnextFreeTicketMicros字段,其中计算storedPermits的代码虽然容易理解,但是思路挺巧妙。一般来说,令牌桶算法的令牌需要以固定的速率进行添加,那么很自然想到可以起一个任务,按照一定的速度产生令牌,但是起一个新任务会占用一定的资源,从而加重系统的负担,此处的实现是根据利用时间差来计算这段时间产生的令牌数,以简单的计算完成了新任务需要做的事情,开销大大减少了。coolDownIntervalMicros方法是抽象方法,在SmoothBurstySmoothWarmingUp有不同的实现,在SmoothBursty的实现是直接返回stableIntervalMicros字段,这个字段目前还没设置过值,取默认值0.0,这里double的除零操作并不会抛异常,而是会返回无穷大。

我们接着看一下doSetRate方法,这也是个抽象方法,在SmoothBursty的实现如下:

    void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
maxPermits = maxBurstSeconds * permitsPerSecond;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = maxPermits;
} else {
storedPermits =
(oldMaxPermits == 0.0)
? 0.0 // initial state
: storedPermits * maxPermits / oldMaxPermits;
}
}

maxPermits在此之前并没有设置过值,因此默认是0.0,这里只是将storedPermits初始化成了0。不过这里的代码也说明,在执行期间maxPermits是可以在其他地方被修改的,如果出现了更改,就会等比例修改storedPermits的值。

到这里SmoothBursty的初始化过程就结束了,大体上是将内部的字段赋予了初始值。我们接下来看看SmoothBursty的使用:

  public double acquire() {
return acquire(1);
} public double acquire(int permits) {
long microsToWait = reserve(permits);
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}

acquire方法用于从令牌桶中获取令牌,参数permits表示需要获取的令牌数量,如果当前没办法拿到需要的令牌,线程会阻塞一段时间,该方法返回等待的时间,reserve的实现如下:

  final long reserve(int permits) {
checkPermits(permits);
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
} final long reserveAndGetWaitLength(int permits, long nowMicros) {
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
//返回等待时间,如果不需要等待,返回0
return max(momentAvailable - nowMicros, 0);
} final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros);
long returnValue = nextFreeTicketMicros;
//取可用的令牌与需要的令牌两者的最小值
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
//计算该次请求超出的令牌数
double freshPermits = requiredPermits - storedPermitsToSpend;
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros); this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
//扣减令牌桶库存
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}

reserve的核心逻辑在reserveEarliestAvailable方法中,该方法的主要思想是检查当前令牌桶内令牌数是否满足需求,如果满足则不需要额外的等待时间,否则需要将额外等待时间追加到nextFreeTicketMicros,需要注意的是方法返回的不是更新过后的nextFreeTicketMicros,而是上一次请求更新的时间,这个时间就是当前线程需要阻塞的时间,也就是说,当前请求所需要等待的时间是由下次请求完成的,下次请求需要的等待时间由下下次请求完成,以此类推。当前请求的令牌数超过令牌桶中的令牌数越多,下次请求需要等待的时间就越长。并且这里并没有对requiredPermits的上限做检查,这就允许预支令牌,即假设桶的上限是100个令牌,一次请求可以允许超过100个令牌,只是生成多余令牌的时间需要算到下一个请求上。同时这里的逻辑也说明,获取令牌是直接成功的,只是获取完令牌后才需要一小段等待时间。

到这里SmoothBursty的初始化以及获取令牌的所有逻辑就介绍完了,接下来看看另一个类SmoothWarmingUp的源码。

  static RateLimiter create(
double permitsPerSecond,
long warmupPeriod,
TimeUnit unit,
double coldFactor,
SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}

我们之前介绍的另外领个构造器的底层调用的是这个包级的create方法,该方法的5个参数中,只有coldFactor是新出现的,字面意思是冷启动因子,源码写死了是3.0,该值表示指在warm up阶段开始时,以多大的速率产生令牌,速率是稳定速率的三分之一,冷启动阶段结束后恢复到正常速率。

setRate方法底层会调用如下的doSetRate方法:

  final void doSetRate(double permitsPerSecond, long nowMicros) {
resync(nowMicros);
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
} void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = maxPermits;
//设置冷启动生成令牌的间隔是正常值的3倍(codeFactor固定为3)
double coldIntervalMicros = stableIntervalMicros * coldFactor;
thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
maxPermits = thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
//slope是梯形部分斜线的斜率
slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = 0.0;
} else {
storedPermits =
(oldMaxPermits == 0.0)
? maxPermits // initial state is cold
: storedPermits * maxPermits / oldMaxPermits;
}
}



doSetRate的代码不容易理解,源码中利用图示介绍了几个变量之间的关系(但是本人仍然不是很理解,因此只能将结论放在这里,无法进行更多解释),如图所示,源码注释中说明了如下的两个等式:

  • 梯形的面积等于预热时间warmupPeriodMicros
warmupPeriodMicros = 0.5 * (coldIntervalMicros + stableIntervalMicros) * (maxPermits - thresholdPermits)

由此可以得到maxPermits的值:

maxPermits = thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
  • 左边矩形的面积是梯形面积的一半,由此可知:
warmupPeriodMicros * 0.5 = thresholdPermits * stableIntervalMicros

计算出thresholdPermits的值为:

thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros

SmoothWarmingUp的初始化逻辑到这里就结束了,接下来介绍下它获取令牌的流程,acquire方法的其他部分上文已经结束过,此处重点介绍storedPermitsToWaitTime方法:

    long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
//存储的令牌数量超出thresholdPermits的部分,这部分反应在梯形区域
double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
long micros = 0;
// measuring the integral on the right part of the function (the climbing line)
if (availablePermitsAboveThreshold > 0.0) {
//permitsAboveThresholdToTake表示梯形区域的高
double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
//length计算的是梯形的上底+下底
double length =
permitsToTime(availablePermitsAboveThreshold)
+ permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
//梯形区域的面积,即生产梯形区域的令牌数所需要的时间
micros = (long) (permitsAboveThresholdToTake * length / 2.0);
//扣除掉需要消耗的梯形区域的令牌数,表示还需要从左侧矩形区域取得的令牌数量
permitsToTake -= permitsAboveThresholdToTake;
}
// measuring the integral on the left part of the function (the horizontal line)
//等待时间=梯形区域的时间+矩形区域的时间
micros += (long) (stableIntervalMicros * permitsToTake);
return micros;
} //由前文可知,slope = =y/x = 产生令牌间隔/令牌数,permits * slope表示产生令牌间隔的增量,加上stableIntervalMicros表示梯形的底
private double permitsToTime(double permits) {
return stableIntervalMicros + permits * slope;
}



此处的storedPermitsToWaitTimeSmoothBursty中的实现大不相同,SmoothBursty由于不需要预热,可以直接获取桶中的令牌,因此直接返回了0,而此处存在预热阶段,不能直接获取到令牌,因此计算逻辑稍微复杂些,总体来说,就是求图中阴影部分的面积。

RateLimiter源码解析的更多相关文章

  1. 常用限流算法与Guava RateLimiter源码解析

    在分布式系统中,应对高并发访问时,缓存.限流.降级是保护系统正常运行的常用方法.当请求量突发暴涨时,如果不加以限制访问,则可能导致整个系统崩溃,服务不可用.同时有一些业务场景,比如短信验证码,或者其它 ...

  2. kube-proxy源码解析

    kubernetes离线安装包,仅需三步 kube-proxy源码解析 ipvs相对于iptables模式具备较高的性能与稳定性, 本文讲以此模式的源码解析为主,如果想去了解iptables模式的原理 ...

  3. java.lang.Void类源码解析_java - JAVA

    文章来源:嗨学网 敏而好学论坛www.piaodoo.com 欢迎大家相互学习 在一次源码查看ThreadGroup的时候,看到一段代码,为以下: /* * @throws NullPointerEx ...

  4. 【原】Android热更新开源项目Tinker源码解析系列之三:so热更新

    本系列将从以下三个方面对Tinker进行源码解析: Android热更新开源项目Tinker源码解析系列之一:Dex热更新 Android热更新开源项目Tinker源码解析系列之二:资源文件热更新 A ...

  5. 【原】Android热更新开源项目Tinker源码解析系列之一:Dex热更新

    [原]Android热更新开源项目Tinker源码解析系列之一:Dex热更新 Tinker是微信的第一个开源项目,主要用于安卓应用bug的热修复和功能的迭代. Tinker github地址:http ...

  6. 【原】Android热更新开源项目Tinker源码解析系列之二:资源文件热更新

    上一篇文章介绍了Dex文件的热更新流程,本文将会分析Tinker中对资源文件的热更新流程. 同Dex,资源文件的热更新同样包括三个部分:资源补丁生成,资源补丁合成及资源补丁加载. 本系列将从以下三个方 ...

  7. 多线程爬坑之路-Thread和Runable源码解析之基本方法的运用实例

    前面的文章:多线程爬坑之路-学习多线程需要来了解哪些东西?(concurrent并发包的数据结构和线程池,Locks锁,Atomic原子类) 多线程爬坑之路-Thread和Runable源码解析 前面 ...

  8. jQuery2.x源码解析(缓存篇)

    jQuery2.x源码解析(构建篇) jQuery2.x源码解析(设计篇) jQuery2.x源码解析(回调篇) jQuery2.x源码解析(缓存篇) 缓存是jQuery中的又一核心设计,jQuery ...

  9. Spring IoC源码解析——Bean的创建和初始化

    Spring介绍 Spring(http://spring.io/)是一个轻量级的Java 开发框架,同时也是轻量级的IoC和AOP的容器框架,主要是针对JavaBean的生命周期进行管理的轻量级容器 ...

随机推荐

  1. Shpfile文件的字段类型说明

    Shpfile文件的字段类型设置如下表所示: 字段类型 字符 字段长度 长整型 N 9 短整型 N 4 浮点型 F 13 双精度 F 19 文本 C 50 特别需要注意的是字段长度,在导出SHP的时候 ...

  2. KafkaConsumer 简析

    使用方式 创建一个 KafkaConsumer 对象订阅主题并开始接收消息: Properties properties = new Properties(); properties.setPrope ...

  3. Linux 驱动框架---i2c驱动框架

    i2c驱动在Linux通过一个周的学习后发现i2c总线的驱动框架还是和Linux整体的驱动框架是相同的,思想并不特殊比较复杂的内容如i2c核心的内容都是内核驱动框架实现完成的,今天我们暂时只分析驱动开 ...

  4. React Security Best Practices All In One

    React Security Best Practices All In One Default XSS Protection with Data Binding Dangerous URLs Ren ...

  5. Python Web Frameworks

    Python Web Frameworks top 10 Python web frameworks Django (Full-stack framework) Flask (Micro framew ...

  6. Linux & bash & tcpdump

    Linux & bash & tcpdump Linux & tcpdump https://www.tecmint.com/12-tcpdump-commands-a-net ...

  7. Angular Routing

    Angular Routing v9.0.7 https://angular.io/start/start-routing

  8. CSS flex waterfall layout

    CSS flex waterfall layout https://github.com/YoneChen/waterfall-flexbox https://css-tricks.com/snipp ...

  9. Axios & POST & application/x-www-form-urlencoded

    Axios & POST & application/x-www-form-urlencoded application/x-www-form-urlencoded https://g ...

  10. Flutter: AnimatedList 一个滚动容器,可在插入或移除项目时为其设置动画

    Flutter Widget of the Week import 'dart:math'; import 'package:flutter/material.dart'; void main() = ...