Sentinel如何拦截异常流量

这篇文章主要介绍“Sentinel如何拦截异常流量”,在日常操作中,相信很多人在Sentinel如何拦截异常流量问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Sentinel如何拦截异常流量”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

各位在家里用电的过程中,一定也经历过「跳闸」。这个「」就是在电量超过负荷的时候用来保护我们用电安全的,也被称为「断路器」,还有个响亮的英文名 -- CircuitBreaker。

和用电安全一样,对于「限流」、「降级」、「熔断」...,你我应该也都耳熟能详。我们开发的各类软件、系统、互联网应用等为了不被异常流量压垮,也需要一个断路器。

在 Spring 应用中,使用断路器很方便,我们可以使用 Spring Cloud CircuitBreaker。

Spring Cloud Circuit Breaker 是啥?如果你熟悉 Spring 是什么人的话,你能猜个八九不离十。和Spring Data JPA 这些类似,Spring 他又搞了个抽象的,标准的API 出来。这次他抽象的是关于降级熔断的「断路器」。有了这一层,具体实现是谁可以方便的更换,我们使用的代码里改动基本为0。

我们先来从官方Demo有个初步印象:

@RestControllerpublic class DemoController {  private CircuitBreakerFactory circuitBreakerFactory;  private HttpBinService httpBin;  public DemoController(CircuitBreakerFactory circuitBreakerFactory, HttpBinService httpBinService) {    this.circuitBreakerFactory = circuitBreakerFactory;    this.httpBin = httpBinService;  }  @GetMapping("/delay/{seconds}")  public Map delay(@PathVariable int seconds) {    return circuitBreakerFactory.create("delay").run(httpBin.delaySuppplier(seconds), t -> {      Map<String, String> fallback = new HashMap<>();      fallback.put("hello", "world");      return fallback;    });  }}

千言万语,总结出来这样一句circuitBreakerFactory.create("delay").run() 

因为是抽象,对应的实现就有好多种啦。

目前支持的实现有:

  • Hystrix

  • Resilience4j

  • Sentinel

  • Spring Retry

而抽象相当于定了个标准,像JDBC一样,无论我们把数据库换成了MySQL,Oracle 还是SQLite,接口等非特定类型的代码都不需要改变。断路器也一样。

这里的断路器工厂,创建方法都是标准的。具体这里执行业务逻辑的时候断路器实现要怎样进行拦截降级,就可以交给具体的实现来完成。

这次,我们以开源的 Sentinel 为例,来看看他们是怎样拦住异常流量的。

首先,因为是Spring Cloud,所以还会基于 Spring Boot 的 Autoconfiguration。以下是配置类,我们看到生成了一个工厂。

public class SentinelCircuitBreakerAutoConfiguration {  @Bean  @ConditionalOnMissingBean(CircuitBreakerFactory.class)  public CircuitBreakerFactory sentinelCircuitBreakerFactory() {    return new SentinelCircuitBreakerFactory();  }  }

在我们实际代码执行逻辑的时候,create 出来的是什么呢?

是个断路器 CircuitBreaker,用来执行代码。

public interface CircuitBreaker {
 default <T> T run(Supplier<T> toRun) {    return run(toRun, throwable -> {      throw new NoFallbackAvailableException("No fallback available.", throwable);    });  };  <T> T run(Supplier<T> toRun, Function<Throwable, T> fallback);}

包含两个执行的方法,需要在的时候可以指定fallback逻辑。具体到 Sentinel 是这样的:

  public CircuitBreaker create(String id) {    SentinelConfigBuilder.SentinelCircuitBreakerConfiguration conf = getConfigurations()        .computeIfAbsent(id, defaultConfiguration);    return new SentinelCircuitBreaker(id, conf.getEntryType(), conf.getRules());  }

你会看到创建了一个SentinelCircuitBreaker。我们的业务逻辑,就会在这个断路器里执行,run方法就是各个具体实现的舞台。

@Override  public <T> T run(Supplier<T> toRun, Function<Throwable, T> fallback) {    Entry entry = null;    try {      entry = SphU.entry(resourceName, entryType);      // If the SphU.entry() does not throw `BlockException`, it means that the      // request can pass.      return toRun.get();    }    catch (BlockException ex) {      // SphU.entry() may throw BlockException which indicates that      // the request was rejected (flow control or circuit breaking triggered).      // So it should not be counted as the business exception.      return fallback.apply(ex);    }    catch (Exception ex) {      // For other kinds of exceptions, we'll trace the exception count via      // Tracer.trace(ex).      Tracer.trace(ex);      return fallback.apply(ex);    }    finally {      // Guarantee the invocation has been completed.      if (entry != null) {        entry.exit();      }    }  }

OK,到此为止, Spring Cloud CircuitBreaker 已经展现完了。其它的细节都放到了具体实现的「盒子」里。下面我们把这个盒子打开。

Sentinel 是个熔断降级框架,官方这样自我介绍:

面向分布式服务架构的高可用流量控制组件,主要以流量为切入点,从流量控制、熔断降级、系统自适应保护等多个维度来帮助用户保障微服务的稳定性。

Sentinel如何拦截异常流量

官网的这张代码截图简洁的说明了他是怎样工作的

Sentinel如何拦截异常流量

挡在业务代码的前面,有事儿先冲它来,能通过之后才走业务逻辑,和各类闯关还真类似。

在上面 CircuitBreaker 的 run 方法里,咱们一定都注意到了这句

entry = SphU.entry(resourceName, entryType);

这就是一切拦截的秘密。

无论我们是通过前面的CircuitBreaker的方式,还是 @SentinelResource 这种注解形式,还是通过 Interceptor 的方式,没什么本质区别。只是触发点不一样。最后都是通过SphU来搞定。

既然是拦截,那一定要拦下来做这样或那样的检查。

实际检查的时候,entry 里核心代码有这些:

 Entry entryWithPriority(ResourceWrapper resourceWrapper, ...)        throws BlockException {        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);        Entry e = new CtEntry(resourceWrapper, chain, context);        try {            chain.entry(context, resourceWrapper,...);        } catch (BlockException e1) {            e.exit(count, args);            throw e1;        }         return e;    }

注意这里的ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);会在请求过来处理的时候,如果未初始化处理链,则进行初始化,将各种first,next设置好,后面的请求都会按这个来处理。所有需要拦截的Slot,都会加到这个 chain 里面,再逐个执行 chain 里的 slot。和Servlet Filter 类似。

chain里都加了些啥呢?

public class HotParamSlotChainBuilder implements SlotChainBuilder {    public ProcessorSlotChain build() {        ProcessorSlotChain chain = new DefaultProcessorSlotChain();        chain.addLast(new NodeSelectorSlot());        chain.addLast(new ClusterBuilderSlot());        chain.addLast(new LogSlot());        chain.addLast(new StatisticSlot());        chain.addLast(new ParamFlowSlot());        chain.addLast(new SystemSlot());        chain.addLast(new AuthoritySlot());        chain.addLast(new FlowSlot());        chain.addLast(new DegradeSlot());        return chain;    }

初始的时候,first 指向一个匿名内部类,这些加进来的slot,会在每次addLast的时候,做为链的next,    

AbstractLinkedProcessorSlot<?> end = first;
   @Override    public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {        protocolProcessor.setNext(first.getNext());        first.setNext(protocolProcessor);        if (end == first) {            end = protocolProcessor;        }    }    @Override    public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {        end.setNext(protocolProcessor);        end = protocolProcessor;    }

而每个 slot,有自己的特定用处,处理完自己的逻辑之后,会通过 fireEntry 来触发下一个 slot的执行。

给你一张长长的线程调用栈就会过分的明显了:

 java.lang.Thread.State: RUNNABLE    at com.alibaba.csp.sentinel.slots.block.flow.FlowSlot.checkFlow(FlowSlot.java:168)    at com.alibaba.csp.sentinel.slots.block.flow.FlowSlot.entry(FlowSlot.java:161)    at com.alibaba.csp.sentinel.slots.block.flow.FlowSlot.entry(FlowSlot.java:139)    at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40)    at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32)    at com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot.entry(AuthoritySlot.java:39)    at com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot.entry(AuthoritySlot.java:33)    at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40)    at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32)    at com.alibaba.csp.sentinel.slots.system.SystemSlot.entry(SystemSlot.java:36)    at com.alibaba.csp.sentinel.slots.system.SystemSlot.entry(SystemSlot.java:30)    at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40)    at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32)    at com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot.entry(ParamFlowSlot.java:39)    at com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot.entry(ParamFlowSlot.java:33)    at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40)    at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32)    at com.alibaba.csp.sentinel.slots.statistic.StatisticSlot.entry(StatisticSlot.java:57)    at com.alibaba.csp.sentinel.slots.statistic.StatisticSlot.entry(StatisticSlot.java:50)    at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40)    at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32)    at com.alibaba.csp.sentinel.slots.logger.LogSlot.entry(LogSlot.java:35)    at com.alibaba.csp.sentinel.slots.logger.LogSlot.entry(LogSlot.java:29)    at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40)    at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32)    at com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot.entry(ClusterBuilderSlot.java:101)    at com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot.entry(ClusterBuilderSlot.java:47)    at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40)    at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32)    at com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot.entry(NodeSelectorSlot.java:171)    at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40)    at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32)    at com.alibaba.csp.sentinel.slotchain.DefaultProcessorSlotChain$1.entry(DefaultProcessorSlotChain.java:31)    at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40)    at com.alibaba.csp.sentinel.slotchain.DefaultProcessorSlotChain.entry(DefaultProcessorSlotChain.java:75)    at com.alibaba.csp.sentinel.CtSph.entryWithPriority(CtSph.java:148)    at com.alibaba.csp.sentinel.CtSph.entryWithType(CtSph.java:347)    at com.alibaba.csp.sentinel.CtSph.entryWithType(CtSph.java:340)    at com.alibaba.csp.sentinel.SphU.entry(SphU.java:285)

降级有三种类型

Sentinel如何拦截异常流量

每种类型,都会根据对应的配置项数据比对,不符合就中断,中断之后也不能一直断着,啥时候再恢复呢?就根据配置的时间窗口,会启动一个恢复线程,到时间就会调度,把中断标识恢复。

public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) {        if (cut.get()) {            return false;        }        ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource());        if (clusterNode == null) {            return true;        }        if (grade == RuleConstant.DEGRADE_GRADE_RT) {            double rt = clusterNode.avgRt();            if (rt < this.count) {                passCount.set(0);                return true;            }            // Sentinel will degrade the service only if count exceeds.            if (passCount.incrementAndGet() < rtSlowRequestAmount) {                return true;            }        } else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) {            double exception = clusterNode.exceptionQps();            double success = clusterNode.successQps();            double total = clusterNode.totalQps();            // If total amount is less than minRequestAmount, the request will pass.            if (total < minRequestAmount) {                return true;            }            // In the same aligned statistic time window,            // "success" (aka. completed count) = exception count + non-exception count (realSuccess)            double realSuccess = success - exception;            if (realSuccess <= 0 && exception < minRequestAmount) {                return true;            }            if (exception / success < count) {                return true;            }        } else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) {            double exception = clusterNode.totalException();            if (exception < count) {                return true;            }        }        if (cut.compareAndSet(false, true)) {            ResetTask resetTask = new ResetTask(this);            pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS);        }        return false;    }

恢复做了两件事:一、把passCount设置成0,二、中断标识还原

上面介绍了对请求的拦截处理,这其中最核心的,也就是我们最主要配置的,一个是「流控」,一个是「降级」。这两个对应的Slot,会在处理请求的时候,根据配置好的 「规则」rule 来判断。比如我们上面看到的时间窗口、熔断时间等,以及流控的线程数,QPS数这些。

这些规则默认的配置在内存里,也可以通过不同的数据源加载进来。同时启用了Sentinel 控制台的话,在控制台 也可以配置规则。这些规则,会通过 HTTP 发送给对应使用了 sentinel 的应用实例节点。

收到更新内容后,实例里的「规则管理器」会重新加载一次 rule,下次请求处理就直接生效了。

到此,关于“Sentinel如何拦截异常流量”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!