引言 在分析Sentinel的上一篇文章中,我们知道了它是基于滑动窗口做的流量统计,那么在当我们能够根据流量统计算法拿到流量的实时数据后,下一步要做的事情自然就是基于这些数据做流控。在介绍Sentinel
的流控模型之前,我们先来简单看下 Sentinel 后台是如何去定义一个流控规则的
对于上图的配置Sentinel
把它抽象成一个FlowRule
类,与其属性一一对应
resource 资源名
limitApp 限流来源,默认为default不区分来源
grade 限流类型,有QPS和并发线程数两种类型
count 限流阈值
strategy 流控策略 1. 直接 2. 关联 3.链路
controlBehavior 流控效果 1.快速失败 2.预热启动 3.排队等待 4. 预热启动排队等待
warmUpPeriodSec 流控效果为预热启动时的预热时长(秒)
maxQueueingTimeMs 流控效果为排队等待时的等待时长 (毫秒)
下面我们来看下选择流控策略和流控效果的核心代码
1 2 3 4 5 6 7 8 9 private static boolean passLocalCheck (FlowRule rule, Context context, DefaultNode node, int acquireCount,boolean prioritized) { Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node); if (selectedNode == null ) { return true ; } return rule.getRater().canPass(selectedNode, acquireCount, prioritized); }
上面的代码比较简单流程也很清晰,首先根据我们配置的流控策略获取到合适维度的 Node 节点(Node节点是Sentinel做流量统计的基本单位),然后再获取到规则中配置的流控效果控制器(1. 直接拒绝 2. 预热启动 3. 排队等待 4.预热启动排队等待)。
流控策略 下面我们来看下选择流控策略的源码分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 static Node selectNodeByRequesterAndStrategy ( FlowRule rule, Context context, DefaultNode node) { String limitApp = rule.getLimitApp(); int strategy = rule.getStrategy(); String origin = context.getOrigin(); if (limitApp.equals(origin) && filterOrigin(origin)) { if (strategy == RuleConstant.STRATEGY_DIRECT) { return context.getOriginNode(); } return selectReferenceNode(rule, context, node); } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) { if (strategy == RuleConstant.STRATEGY_DIRECT) { return node.getClusterNode(); } return selectReferenceNode(rule, context, node); } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp) && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) { if (strategy == RuleConstant.STRATEGY_DIRECT) { return context.getOriginNode(); } return selectReferenceNode(rule, context, node); } return null ; } static Node selectReferenceNode (FlowRule rule, Context context, DefaultNode node) { String refResource = rule.getRefResource(); int strategy = rule.getStrategy(); if (StringUtil.isEmpty(refResource)) { return null ; } if (strategy == RuleConstant.STRATEGY_RELATE) { return ClusterBuilderSlot.getClusterNode(refResource); } if (strategy == RuleConstant.STRATEGY_CHAIN) { if (!refResource.equals(context.getName())) { return null ; } return node; } return null ; }
这段代码的逻辑判断比较多,我们稍微理一下整个过程
LimitApp
的作用域只在配置的流控策略为RuleConstant.STRATEGY_DIRECT
(直接关联)时起作用。其有三种配置,分别为default
,origin_name
,other
default 如果配置为default,表示统计不区分来源,当前资源的任何来源流量都会被统计(其实就是选择 Node 为 clusterNode 维度)
origin_name 如果配置为指定名称的 origin_name,则只会对当前配置的来源流量做统计
other 如果配置为other 则会对其他全部来源生效但不包括第二条配置的来源
当策略配置为 RuleConstant.STRATEGY_RELATE 或 RuleConstant.STRATEGY_CHAIN 时
STRATEGY_RELATE 关联其他的指定资源,如资源A想以资源B的流量状况来决定是否需要限流,这时资源A规则配置可以使用 STRATEGY_RELATE 策略
STRATEGY_CHAIN 对指定入口的流量限流,因为流量可以有多个不同的入口(EntranceNode)
对于上面几个节点之间的关系不清楚的可以去看我这篇文章开头的总览图 Sentinel源码解析一(流程总览)
流控效果 关于流控效果的配置有四种,我们来看下它们的初始化代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private static TrafficShapingController generateRater ( FlowRule rule) { if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) { switch (rule.getControlBehavior()) { case RuleConstant.CONTROL_BEHAVIOR_WARM_UP: return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(), ColdFactorProperty.coldFactor); case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER: return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount()); case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER: return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(), rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor); case RuleConstant.CONTROL_BEHAVIOR_DEFAULT: default : } } return new DefaultController(rule.getCount(), rule.getGrade()); }
可以比较清晰的看到总共对应有四种流控器的初始化
直接拒绝 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Override public boolean canPass (Node node, int acquireCount, boolean prioritized) { int curCount = avgUsedTokens(node); if (curCount + acquireCount > count) { if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) { long currentTime; long waitInMs; currentTime = TimeUtil.currentTimeMillis(); waitInMs = node.tryOccupyNext(currentTime, acquireCount, count); if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) { node.addWaitingRequest(currentTime + waitInMs, acquireCount); node.addOccupiedPass(acquireCount); sleep(waitInMs); throw new PriorityWaitException(waitInMs); } } return false ; } return true ; }
此种策略比较简单粗暴,超过流量阈值的会直接拒绝。不过这里有一个小细节,如果入口流量prioritized为true,也就是优先级比较高,则会通过占用未来时间窗口的名额来实现。这个在上一篇文章有介绍到
预热启动 WarmUpController
主要是用来防止流量的突然上升,使系统本在稳定状态下能处理的,但是由于许多资源没有预热,导致处理不了了。注意这里的预热并不是指系统启动之后的一次性预热,而是指系统在运行的任何时候流量从低峰到突增的预热阶段 。
下面我们来看下WarmUpController
的具体实现类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 private void construct (double count, int warmUpPeriodInSec, int coldFactor) { if (coldFactor <= 1 ) { throw new IllegalArgumentException("Cold factor should be larger than 1" ); } this .count = count; this .coldFactor = coldFactor; warningToken = (int )(warmUpPeriodInSec * count) / (coldFactor - 1 ); maxToken = warningToken + (int )(2 * warmUpPeriodInSec * count / (1.0 + coldFactor)); slope = (coldFactor - 1.0 ) / count / (maxToken - warningToken); } @Override public boolean canPass (Node node, int acquireCount, boolean prioritized) { long passQps = (long ) node.passQps(); long previousQps = (long ) node.previousPassQps(); syncToken(previousQps); long restToken = storedTokens.get(); if (restToken >= warningToken) { long aboveToken = restToken - warningToken; double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count)); if (passQps + acquireCount <= warningQps) { return true ; } } else { if (passQps + acquireCount <= count) { return true ; } } return false ; }
首先是构造方法,主要关注2个重要参数
warningToken 剩余token的警戒值
maxToken 剩余的最大token数,如果剩余token数等于maxToken,则说明系统处于最冷阶段
要理解这两个参数的含义,可以参考令牌桶算法,每通过一个请求,就会从令牌桶中取走一个令牌。那么试想一下,当令牌桶中的令牌达到最大值是,是不是意味着系统目前处于最冷阶段,因为桶里的令牌始终处于一个非常饱和的状态。这里的令牌最大值对应的就是maxToken
,而warningToken
,则是对应了一个警戒值,当桶中的令牌数减少到一个指定的值时,说明系统已经度过了预热阶段
当一个请求进来时,首先需要计算当前桶中剩余的token数,具体逻辑在syncToken
方法中 当系统剩余Token大于warningToken时,说明系统仍处于预热阶段,故需要调整当前所能通过的最大qps阈值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 protected void syncToken (long passQps) { long currentTime = TimeUtil.currentTimeMillis(); currentTime = currentTime - currentTime % 1000 ; long oldLastFillTime = lastFilledTime.get(); if (currentTime <= oldLastFillTime) { return ; } long oldValue = storedTokens.get(); long newValue = coolDownTokens(currentTime, passQps); if (storedTokens.compareAndSet(oldValue, newValue)) { long currentValue = storedTokens.addAndGet(0 - passQps); if (currentValue < 0 ) { storedTokens.set(0L ); } lastFilledTime.set(currentTime); } }
获取当前时间
coolDownTokens 方法会判断是否需要往桶中放 token,并返回最新的token数
如果返回了最新的token数,则将当前剩余的token数减去已经通过的qps,得到最新的剩余token数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private long coolDownTokens (long currentTime, long passQps) { long oldValue = storedTokens.get(); long newValue = oldValue; if (oldValue < warningToken) { newValue = (long )(oldValue + (currentTime - lastFilledTime.get()) * count / 1000 ); } else if (oldValue > warningToken) { if (passQps < (int )count / coldFactor) { newValue = (long )(oldValue + (currentTime - lastFilledTime.get()) * count / 1000 ); } } return Math.min(newValue, maxToken); }
这里看一下会添加令牌的几种情况
系统初始启动阶段,oldvalue = 0,lastFilledTime也等于0,此时得到一个非常大的newValue,会取maxToken为当前token数量值
系统处于完成预热阶段,需要补充 token 使其稳定在一个范围内
系统处于预热阶段 且 当前qps小于 count / coldFactor
前2种情况比较好理解,这里主要解释一下第三种情况,为何 当前qps
小于count / coldFactor
时,需要往桶中添加Token?试想一下如果没有这一步会怎么样,如果没有这一步在比较低的qps情况下补充Token,系统最终也会慢慢度过预热阶段,但实际上这么低的qps(小于 count / coldFactor时
)不应该完成预热。所以这里才会在 qps低于count / coldFactor
时补充剩余token数,来让系统在低qps情况下始终处于预热状态下
排队等待 排队等待的实现相对预热启动实现比较简单
首先会通过我们的配置,计算出相邻两个请求允许通过的最小时间,然后会记录最近一个通过的时间。两者相加即是下一次请求允许通过的最小时间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public boolean canPass (Node node, int acquireCount, boolean prioritized) { if (acquireCount <= 0 ) { return true ; } if (count <= 0 ) { return false ; } long currentTime = TimeUtil.currentTimeMillis(); long costTime = Math.round(1.0 * (acquireCount) / count * 1000 ); long expectedTime = costTime + latestPassedTime.get(); if (expectedTime <= currentTime) { latestPassedTime.set(currentTime); return true ; } else { long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis(); if (waitTime > maxQueueingTimeMs) { return false ; } else { long oldTime = latestPassedTime.addAndGet(costTime); try { waitTime = oldTime - TimeUtil.currentTimeMillis(); if (waitTime > maxQueueingTimeMs) { latestPassedTime.addAndGet(-costTime); return false ; } if (waitTime > 0 ) { Thread.sleep(waitTime); } return true ; } catch (InterruptedException e) { } } } return false ; }
排队等待控制器的核心策略其实就是围绕了latestPassedTime
进行的,latestPassedTime
指的是上一次请求通过的时间,通过latestPassedTime
+ costTime
来与当前时间做比较,来判断当前请求是否可以通过,无法通过的请求则会优先占用latestPassedTime
时间,直到sleep到可以通过的时间。当然我们也可以配置排队等待的最大时间,来限制目前排队等待通过的请求数量。
预热启动排队等待 预热排队等待,WarmUpRateLimiterController
实现类我们发现其继承了WarmUpController
,这是Sentinel在1.4版本后新加的一种控制器,其实就是预热启动和排队等待的结合体,具体源码我们就不做分析。
尾言 Sentinel
的流控策略和流控效果的相结合使用还是非常巧妙的,当中的一些设计思想还是非常有借鉴意义的
Sentinel系列
打赏支持
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!