第五章 Hystrix熔断机制及源码分析笔记
在分布式环境下,微服务之间不可避免的发生互相调用的情况,但是没有一个系统是能保证自身绝对正确的,在服务的调用过程中,很可能面临服务失败的问题,因此需要一个公共组件能够在服务通过网络请求访问其他微服务时,能对服务失效情况下有很强的容错能力,对微服务提供保护和监控。
服务雪崩
雪崩是系统中的蝴蝶效应导致其发生的原因多种多样,有不合理的容量设计,或者是高并发下某一个方法响应变慢,亦或是某台机器的资源耗尽。从源头上我们无法完全杜绝雪崩源头的发生,但是雪崩的根本原因来源于服务之间的强依赖,所以我们可以提前评估。当整个微服务系统中,有一个节点出现异常情况,就有可能在高并发的情况下出现雪崩,导致调用它的上游系统出现响应延迟,响应延迟就会导致tomcat连接本耗尽,导致该服务节点不能正常的接收到正常的情况,这就是服务雪崩行为。
服务隔离
如果整个系统雪崩是由于一个接口导致的,由于这一个接口响应不及时导致问题,那么我们就有必要对这个接口进行隔离,就是只允许这个接口最多能接受多少的并发,做了这样的限制后,该接口的主机就会空余线程出来接收其他的情况,不会被哪个坏了的接口占用满。Hystrix就是一个不错的服务隔离框架。
Hystrix是netflix的一个开源项目,他能够在依赖服务失效的情况下,通过隔离系统依赖的方式,防止服务的级联失败(服务的雪崩)。
对于服务的熔断机制,其实需要考虑两种情况:
·服务提供方存活,但调用接口报错。
·服务提供方本身就出问题了。
代码Git地址:https://gitee.com/hankin_chj/springcloud-micro-service.git
一、Hystrix基本使用
1、服务提供方报错
其实这种情况类似于异常捕获机制,当出现异常,返回一个通用的接口报文。
创建hystrix服务模块将springcloud-micro-product 复制一份成为springcloud-micro-product-hystrix
1.1、修改pom文件,增加 Hystrix依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
1.2、修改ProductController添加@HystrixCommand支持
@RestController
@RequestMapping("/prodcut")
public class ProductController {
@Resource
private IProductService iProductService;
// import com.netflix.discovery.DiscoveryClient; // 注意这个包引用方法是旧版本1.x使用的
// import org.springframework.cloud.client.discovery.DiscoveryClient; //2.x版本使用
@Resource
private DiscoveryClient client ; // 进行Eureka的发现服务
@RequestMapping(value="/get/{id}")
@HystrixCommand(fallbackMethod = "getFallback") //制定回调方法
public Object get(@PathVariable("id") long id) {
Product product = this.iProductService.get(id);
if(product == null) {
throw new RuntimeException("该产品已下架!") ;
}
return product;
}
@RequestMapping(value="/add")
public Object add(@RequestBody Product product) {
return this.iProductService.add(product) ;
}
@RequestMapping(value="/list")
public Object list() {
return this.iProductService.list() ;
}
/**
* product服务对于注册到Eureka上的服务,可以通过发现服务来获取一些服务信息
*/
@RequestMapping("/discover")
public Object discover() { // 直接返回发现服务信息
return this.client ;
}
// 遇到异常回调方法
public Object getFallback(@PathVariable("id") long id){
Product product = new Product();
product.setProductName("HystrixName");
product.setProductDesc("HystrixDesc");
product.setProductId(0L);
return product;
}
}
一旦 get()方法上抛出了错误的信息,那么就认为该服务有问题,会默认使用@HystrixCommand注解之中配置好的fallbackMethod调用类中的指定方法,返回相应数据。
1.3、product-hystrix模块修改启动类,增加对熔断的支持
@SpringBootApplication
@MapperScan("com.chj.mapper")
@EnableEurekaClient
@EnableDiscoveryClient // 添加服务发现注解
@EnableCircuitBreaker // 添加hystrix断路器支持
public class ProductHystrixApp {
public static void main(String[] args) {
SpringApplication.run(ProductHystrixApp.class,args);
}
}
1.4、启动服务测试:
访问地址:localhost:8080/prodcut/get/100,访问一条数据不存在的数据。
返回结果:{"productId":0,"productName":"HystrixName","productDesc":"HystrixDesc"}
说明方法报错进入了我们开始设置的getFallback回调方法中。
2、Hystrix服务隔离策略
2.1、线程池隔离
THREAD线程池隔离策略,独立线程接收请求,默认采用的就是线程池隔离。代码配置如下所示:
Command属性:
·execution.isolation.strategy:执行的隔离策略。
·THREAD:线程池隔离策略 独立线程接收请求。
·SEMAPHORE:信号量隔离策略 在调用线程上执行。
·execution.isolation.thread.timeoutInMilliseconds:设置HystrixCommand执行的超时时间,单位毫秒。
·execution.timeout.enabled:是否启动超时时间,true,false。
·execution.isolation.semaphore.maxConcurrentRequests:隔离策略为信号量的时候,该属性来配置信号量的大小,最大并发达到信号量时,后续请求被拒绝。
·circuitBreaker.enabled:是否开启断路器功能。
·circuitBreaker.requestVolumeThreshold:该属性设置在滚动时间窗口中,断路器的最小请求数。默认20,如果在窗口时间内请求次数19,即使19个全部失败,断路器也不会打开
·circuitBreaker.sleepWindowInMilliseconds:改属性用来设置当断路器打开之后的休眠时间,休眠时间结束后断路器为半开状态,断路器能接受请求,如果请求失败又重新回到打开状态,如果请求成功又回到关闭状态。
·circuitBreaker.errorThresholdPercentage:该属性设置断路器打开的错误百分比。
在滚动时间内,在请求数量超过circuitBreaker.requestVolumeThreshold,如果错误请求数的百分比超过这个比例,断路器就为打开状态。
· circuitBreaker.forceOpen:true表示强制打开断路器,拒绝所有请求。
· circuitBreaker.forceClosed:true表示强制进入关闭状态,接收所有请求。
· metrics.rollingStats.timeInMilliseconds:设置滚动时间窗的长度,单位毫秒。这个时间窗口就是断路器收集信息的持续时间。断路器在收集指标信息的时会根据这个时间窗口把这个窗口拆分成多个桶,每个桶代表一段时间的指标,默认10000。
·metrics.rollingStats.numBuckets:滚动时间窗统计指标信息划分的桶的数量,但是滚动时间必须能够整除这个桶的个数,要不然抛异常。
· requestCache.enabled:是否开启请求缓存,默认为true。
·requestLog.enabled:是否打印日志到HystrixRequestLog中,默认true。
·@HystrixCollapser:请求合并。
·maxRequestsInBatch:设置一次请求合并批处理中允许的最大请求数。
·timerDelayInMilliseconds:设置批处理过程中每个命令延迟时间。
·requestCache.enabled:批处理过程中是否开启请求缓存,默认true。
threadPoolProperties 属性:
· coreSize 执行命令线程池的最大线程数,也就是命令执行的最大并发数,默认10
@HystrixCommand(fallbackMethod = "queryContentsFallback",
commandKey = "queryContents",
groupKey = "querygroup-one",
commandProperties = {
@HystrixProperty(name = "execution.isolation.thread.maxConcurrentRequests",value = "100"),
@HystrixProperty(name = "execution.isolation.strategy", value = "THREAD "),
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "1000000000")
},
threadPoolKey = "queryContentshystrixJackpool", threadPoolProperties = {
// @HystrixProperty(name = "coreSize", value = "100")
})
@Override
public List<ConsultContent> queryContents() {
log.info(Thread.currentThread().getName() + "========queryContents=========");
s.incrementAndGet();
List<ConsultContent> results = restTemplate.getForObject("http://"
+ SERVIER_NAME + "/user/queryContent", List.class);
return results;
}
@HystrixCommand(fallbackMethod = "queryContentsAsynFallback")
@Override
public Future<String> queryContentsAsyn() {
return new AsyncResult<String>() {
@Override
public String invoke() {
log.info("========queryContents=========");
List<ConsultContent> results = restTemplate.getForObject("http://"
+ SERVIER_NAME + "/user/queryContent", List.class);
return JSONObject.toJSONString(results);
}
};
}
默认线程池中有10个线程,可以配置线程池中线程大小:
@HystrixProperty(name = "coreSize", value = "100")
线程池隔离策略,hystrix是会单独创建线程的,单元测试如下:
可以看到,用户线程和业务类中的线程是不一样的。
2.2、信号量隔离
信号量隔离是采用一个全局变量来控制并发量,一个请求过来全局变量加1,单加到跟配置中的大小相等是就不再接受用户请求了。代码配置:
@HystrixCommand(fallbackMethod = "queryContentsFallback",
commandKey = "queryContents",
groupKey = "querygroup-one",
commandProperties = {
@HystrixProperty(name = "execution.isolation.semaphore.maxConcurrentRequests",value = "100"),
@HystrixProperty(name = "execution.isolation.strategy", value = "SEMAPHORE"),
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "1000000000")
},
threadPoolKey = "queryContentshystrixJackpool", threadPoolProperties = {
// @HystrixProperty(name = "coreSize", value = "100")
})
@Override
public List<ConsultContent> queryContents() {
log.info(Thread.currentThread().getName() + "========queryContents=========");
s.incrementAndGet();
List<ConsultContent> results = restTemplate.getForObject("http://"
+ SERVIER_NAME + "/user/queryContent", List.class);
return results;
}
@HystrixProperty(name = "execution.isolation.semaphore.maxConcurrentRequests",value = "100"),
这参数是用来控制信号量隔离级别的并发大小的。
单元测试:
可以看到,单元测试中的线程和业务类中的线程是一样的,没有单独开启线程。
3、服务器失连(Hystrix服务降级)
在某些情况下,服务提供方并没有失效,但可能由于网络原因,服务的消费方并不能调用到服务接口,在这种情况下,直接在服务的提供方提供熔断机制依然还是不够的,这方面的处理需要在服务的消费方进行服务的回退(服务的降级)处理。
服务的熔断:熔断指的是当服务的提供方不可使用的时候,程序不会出现异常,而会出现本地的操作调用,服务的熔断是在服务消费方实现的,在断网情况下服务提供方的任何处理都是没有意义的。
方式一:通过HystrixCommand注解方式实现Hystrix服务降级
服务降级是对服务调用过程的出现的异常的友好封装,当出现异常时,我们不希望直接把异常原样返回,所以当出现异常时我们需要对异常信息进行包装,抛一个友好的信息给前端。
Hystrix降级的使用比较简单,代码示例:
@HystrixCommand(fallbackMethod = "queryContentsFallback",
commandKey = "queryContents",
groupKey = "querygroup-one",
commandProperties = {
@HystrixProperty(name = "execution.isolation.semaphore.maxConcurrentRequests",value = "100"),
@HystrixProperty(name = "execution.isolation.strategy", value = "SEMAPHORE"),
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "1000000000")
},
threadPoolKey = "queryContentshystrixJackpool", threadPoolProperties = {
// @HystrixProperty(name = "coreSize", value = "100")
})
@Override
public List<ConsultContent> queryContents() {
log.info(Thread.currentThread().getName() + "========queryContents=========");
s.incrementAndGet();
List<ConsultContent> results = restTemplate.getForObject("http://"
+ SERVIER_NAME + "/user/queryContent", List.class);
return results;
}
通过配置参数:fallbackMethod = "queryContentsFallback",定义降级方法,降级方法的返回值和业务方法的方法值要一样:
public List<ConsultContent> queryContentsFallback() {
f.incrementAndGet();
log.info("===============queryContentsFallback=================");
return null;
}
方式二:通过FeignClient注解方式实现服务降级处理
3.1、新增IProductClientService的失败调用(降级处理)
springcloud-micro-service-feign公共接口模块新增一个IProductClientService的失败调用(降级处理),create方法里面返回IProductClientService的默认实现方法(这里我们只实现getProduct方法):
@Component
public class IProductClientServiceFallbackFactory implements FallbackFactory<IProductClientService> {
@Override
public IProductClientService create(Throwable throwable) {
return new IProductClientService() {
@Override
public Product getProduct(long id) {
Product product = new Product();
product.setProductId(999999L);
product.setProductName("feign-hystrix-Name");
product.setProductDesc("feign-hystrix-Desc");
return product;
}
@Override
public List<Product> listProduct() {
return null;
}
@Override
public boolean addPorduct(Product product) {
return false;
}
};
}
}
3.2、增加fallback配置
springcloud-micro-service-feign公共接口模块修改IProductClientService,增加fallback配置
/**
* feign接口定义,添加Hystrix支持(增加fallbackFactory方法)
*/
@FeignClient(name = "SPRINGCLOUD-MICRO-PRODUCT", configuration = FeignClientConfig.class,
fallbackFactory = IProductClientServiceFallbackFactory.class)
public interface IProductClientService {
@RequestMapping("/prodcut/get/{id}")
public Product getProduct(@PathVariable("id")long id);
@RequestMapping("/prodcut/list")
public List<Product> listProduct() ;
@RequestMapping("/prodcut/add")
public boolean addPorduct(Product product) ;
}
3.3、新建消费放的hystrix模块
将springcloud-micro-consumer-feign复制一份成为springcloud-micro-consumer-feign-hystrix模块
然后修改application.yml配置文件,启用hystrix配置:
feign:
hystrix: # 启用hystrix配置
enabled: true
compression:
request:
enabled: true
mime-types: # 可以被压缩的类型
- text/xml
- application/xml
- application/json
min-request-size: 2048 # 超过2048的字节进行压缩
3.4、修改启动类名称然后启动服务
注意:测试需要先启动eureka注册中心和product-hystrix服务
1)服务消费者访问:http://localhost/consumer/product/get?id=1,能正常访问。
返回结果:{"productId":null,"productName":"java编程","productDesc":"springcloud"}
2)关闭服务提供者:
访问:http://localhost/consumer/product/get?id=1,也能正常访问,此时返回异常情况结果如下:
{"productId":999999,"productName":"feign-hystrix-Name","productDesc":"feign-hystrix-Desc"}
4、HystrixDashboard数据监控
在hystrix里面提供一个Dashboard(仪表盘)的功能,他是一种监控的功能,可以利用它来进行整体服务的监控,新建一个模块springcloud-micro-consumer-hystrix-dashboard进行测试。
4.1、hystrix-dashboard模块pom文件添加依赖:
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix-dashboard</artifactId>
</dependency>
</dependencies>
4.2、product-hystrix模块pom文件确保里面有健康检查模块
<!-- actuator健康检查 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
4.3、hystrix-dashboard模块修改application.yml配置文件
server:
port: 9001
4.4、hystrix-dashboard创建一个启动类
@SpringBootApplication
@EnableHystrixDashboard
public class HystrixDashboardApp {
public static void main(String[] args) {
SpringApplication.run(HystrixDashboardApp.class,args);
}
}
4.5、启动运行:
访问地址:http://localhost:9001/hystrix,然后在界面中输入需要监控的端点url:
http://localhost:8083/actuator/hystrix.stream
4.6、product-hystrix模块修改applcation.yml文件
打开所有监控端口
特别注意:springcloud1.x里面可以不用配置,但是2.x里面必须增加如下配置
management: # 打开监控的所有端口
endpoints:
web:
exposure:
include: '*'
product-hystrix模块启动,访问:localhost:8080/actuator/hystrix.stream
http://localhost:9001/hystrix 填写信息如下
http://admin:[email protected]:8080/actuator/hystrix.stream
这个时候对localhost:8080的访问都可以被监控到
访问地址:localhost:8080/prodcut/get/100,访问一条数据不存在的数据,频繁的访问会触发断路器开关,中断请求返回错误结果,隔段时间继续访问发现断路器开关关闭又可以继续正常访问。
5、Hystrix熔断
熔断就像家里的保险丝一样,家里的保险丝一旦断了,家里就没点了,家里用电器功率高了就会导致保险丝端掉。在我们springcloud领域也可以这样理解,如果并发高了就可能触发hystrix的熔断。
5.1、熔断发生的三个必要条件:
1)有一个统计的时间周期,滚动窗口
相应的配置属性:metrics.rollingStats.timeInMilliseconds,默认10000毫秒
2)请求次数必须达到一定数量
相应的配置属性:circuitBreaker.requestVolumeThreshold,默认20次
3)失败率达到默认失败率
相应的配置属性:circuitBreaker.errorThresholdPercentage,默认50%
上述3个条件缺一不可,必须全部满足才能开启hystrix的熔断功能。
当我们的对一个线程池大小是100的方法压测时看看hystrix的熔断效果:
Jmeter压测:
Hystrix dashboard界面:
可以看到失败率超过50%时,circuit的状态是open的。
5.2、熔断器的三个状态:
1)关闭状态
关闭状态时用户请求是可以到达服务提供方的
2)开启状态
开启状态时用户请求是不能到达服务提供方的,直接会走降级方法
3)半开状态
当hystrix熔断器开启时,过一段时间后,熔断器就会由开启状态变成半开状态。
半开状态的熔断器是可以接受用户请求并把请求传递给服务提供方的,这时候如果远程调用返回成功,那么熔断器就会有半开状态变成关闭状态,反之,如果调用失败,熔断器就会有半开状态变成开启状态。
Hystrix功能建议在并发比较高的方法上使用,并不是所有方法都得使用的。
二、Turbine模块支持
HystrixDashboard 前面已经知道了,它的主要功能是可以对某一项微服务进行监控,但真实情况下,不可能只对某一个服务进行监控,更多的是对很多服务进行一个整体的监控,这个时候就需要使用到turbine来完成了。为了演示监控多个服务模块,这个时候新建一个模块springcloud-micro-user-hystrix,为简单起见,这个模块并不连接数据库,也不做安全控制。
1、模块springcloud-micro-user-hystrix模块
1.1、springcloud-micro-user-hystrix模块pom文件如下
<dependencies>
<dependency>
<groupId>com.chj</groupId>
<artifactId>springcloud-micro-api</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
</dependencies>
1.2、springcloud-micro-api新增一个VO类:User
public class User implements Serializable {
private static final long serialVersionUID = -5938360470671579027L;
private String name;
private int age;
private String sex;
user-hystrix模块新建一个UserController
@RestController
@RequestMapping("/user")
public class UserController {
@RequestMapping("/get/{name}")
@HystrixCommand
public Object get(@PathVariable("name")String name) {
User user = new User();
user.setName(name);
user.setAge(31);
user.setSex("boy");
return user;
}
}
1.3、user-hystrix模块新增启动类
@SpringBootApplication
@EnableEurekaClient
@EnableCircuitBreaker
public class UsersApp {
public static void main(String[] args) {
SpringApplication.run(UsersApp.class, args);
}
}
1.4、user-hystrix模块修改application.yml配置文件
server:
port: 8090
spring:
application:
name: springcloud-micro-user
logging:
level:
com.chj.mapper: debug
eureka:
client: # 客户端进行Eureka注册的配置
service-url:
defaultZone: http://admin:[email protected]eureka1:7001/eureka,http://admin:enjoy@eureka2:7002/eureka,http://admin:[email protected]:7003/eureka
instance:
instance-id: springcloud-micro-user
prefer-ip-address: true
lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
info:
app.name: springcloud-micro-user
company.name: hankin
build.artifactId: $project.artifactId$
build.modelVersion: $project.modelVersion$
# 打开所有监控端口
management:
endpoints:
web:
exposure:
include: '*'
1.5、运行启动
访问地址:http://localhost:8090/user/get/hankin
打开监控平台:http://localhost:9001/hystrix
hystrix监控地址:http://localhost:8090/actuator/hystrix.stream
2、创建turbine模块
前面准备工作完成后,如果想要实现turbine的配置,新增springcloud-micro-consumer-turbine模块。
2.1、consumer-turbine模块,pom文件如下:
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-turbine</artifactId>
</dependency>
</dependencies>
2.2、consumer-turbine模块修改application.yml配置文件
server:
port: 9101
eureka:
client:
register-with-eureka: false
service-url:
defaultZone: http://admin:[email protected]eureka1:7001/eureka,http://admin:admin@eureka2:7002/eureka,http://admin:[email protected]:7003/eureka
turbine:
app-config: SPRINGCLOUD-MICRO-PRODUCT,SPRINGCLOUD-MICRO-USER
cluster-name-expression: new String("default")
注意:可以发现对于turbine,其实是从eureka配置在app-config中服务,然后进行监控。
2.3、consumer-turbine模块新建一个启动类
@SpringBootApplication
@EnableTurbine
public class TurbineApp {
public static void main(String[] args) {
SpringApplication.run(TurbineApp.class,args);
}
}
2.4、启动turbine服务
turbine监控地址:http://localhost:9101/turbine.stream
访问产品服务:localhost:8080/prodcut/get/1
访问用户服务:http://localhost:8090/user/get/hankin
启动Dashboard: http://localhost:9001/hystrix
然后在Dashboard里面填上turbine监控地址:http://localhost:9101/turbine.stream
发现目前turbine只监控了UserController的信息,看下turbine后台发现报错:
com.netflix.turbine.monitor.instance.InstanceMonitor$MisconfiguredHostException: [{"timestamp":"2019-11-22T15:31:15.648+0000","status":401,"error":"Unauthorized","message":"Unauthorized","path":"/actuator/hystrix.stream"}]
其实原因也很简单,User服务并不需要用户验证,所以能正常访问,但对于Product服务,配置了用户名密码的,turbine肯定无法访问。
2.5、turbine模块对于spring security的支持问题
springcloud-micro-security模块如果现在需要turbine进行加密服务的访问,那么只能折衷处理,让访问/actuator/hystrix.stream与/turbine.stream这两个地址的时候不需要用户密码验证。
security模块修改WebSecurityConfiguration文件,代码如下所示:
@Configuration
@EnableWebSecurity
public class WebSecurityConfiguration extends WebSecurityConfigurerAdapter {
@Override
public void configure(AuthenticationManagerBuilder auth) throws Exception {
auth.inMemoryAuthentication().passwordEncoder(
new BCryptPasswordEncoder()).withUser("root")
.password(new BCryptPasswordEncoder().encode("admin"))
.roles("USER").and().withUser("admin")
.password(new BCryptPasswordEncoder().encode("admin"))
.roles("USER", "ADMIN");
}
@Override
protected void configure(HttpSecurity http) throws Exception {
http.httpBasic().and().authorizeRequests().anyRequest().fullyAuthenticated();
http.sessionManagement().sessionCreationPolicy(SessionCreationPolicy.STATELESS);
}
@Override
public void configure(WebSecurity web) throws Exception {
web.ignoring().antMatchers("/actuator/hystrix.stream","/turbine.stream") ;
}
}
turbine监控地址:http://localhost:9101/turbine.stream
启动Dashboard: http://localhost:9001/hystrix
在Dashboard里面填上turbine监控地址
刷新用户和产品接口:
http://localhost:8080/prodcut/get/1
http://localhost:8090/user/get/1
三、Hystrix源码解析:
1、Hystrix的AOP实现
1.1、Hystrix模块配置启动菜单入口:
spring-cloud-netflix-core-2.0.0.RELEASE.jar!/META-INF/spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.hystrix.HystrixAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.security.HystrixSecurityAutoConfiguration
org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\
org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration
HystrixCircuitBreakerConfiguration断路器配置代码:
@Configuration
public class HystrixCircuitBreakerConfiguration {
//注入一个AOP的切面
@Bean
public HystrixCommandAspect hystrixCommandAspect() {
return new HystrixCommandAspect();
}
@Bean
public HystrixShutdownHook hystrixShutdownHook() {
return new HystrixShutdownHook();
}
1.2、处理被注解@HystrixCommand标记的方法
HystrixCommandAspect用于处理被注解@HystrixCommand标记的方法。通过名字可以知道,Hystrix基于AOP机制实现,对目标方法做了代理,然后实现了自己一系列功能特性。
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
public void hystrixCommandAnnotationPointcut() {
}
1.3、AOP环绕通知方法实现
methodsAnnotatedWithHystrixCommand用来执行目标方法
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(ProceedingJoinPoint joinPoint) throws Throwable {
//根据切点获取Method(被@HystrixCommand标记方法)
Method method = AopUtils.getMethodFromTarget(joinPoint);
Validate.notNull(method, "failed to get method from joinPoint: %s", new Object[]{joinPoint});
if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser annotations at the same time");
} else {
HystrixCommandAspect.MetaHolderFactory metaHolderFactory = (HystrixCommandAspect.MetaHolderFactory)META_HOLDER_FACTORY_MAP.get(HystrixCommandAspect.HystrixPointcutType.of(method));
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
// 准备各种材料后,创建HystrixInvokable
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ? metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
try {
Object result;
if (!metaHolder.isObservable()) {
// 利用工具CommandExecutor来执行
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = this.executeObservable(invokable, executionType, metaHolder);
}
return result;
} catch (HystrixBadRequestException var9) {
throw (Throwable)(var9.getCause() != null ? var9.getCause() : var9);
} catch (HystrixRuntimeException var10) {
throw this.hystrixRuntimeExceptionToThrowable(metaHolder, var10);
}
}
}
HystrixInvokable 只是一个空接口,没有任何方法,只是用来标记具备可执行的能力。
2、创建HystrixInvokable
那HystrixInvokable又是如何创建的?它具体的实现类又是什么?
2.1、先看看HystrixCommandFactory.getInstance().create()的代码
com.netflix.hystrix.contrib.javanica.command.HystrixCommandFactory.create方法
public HystrixInvokable create(MetaHolder metaHolder) {
Object executable;
if (metaHolder.isCollapserAnnotationPresent()) {
executable = new CommandCollapser(metaHolder);
} else if (metaHolder.isObservable()) {
executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
} else {
executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
}
return (HystrixInvokable)executable;
}
这里可以看到实现类是 GenericCommand,接口关系图如下:
三个抽象父类 AbstractHystrixCommand、HystrixCommand、AbstractCommand帮助GenericCommand 做了不少公共的事情,而负责执行具体的方法和fallback时的方法。
2.2、GenericCommand.run()方法分析
hystrix-javanica-1.5.12.jar!\com\netflix\hystrix\contrib\javanica\command\GenericCommand.class
com.netflix.hystrix.contrib.javanica.command.GenericCommand
@ThreadSafe
public class GenericCommand extends AbstractHystrixCommand<Object> {
private static final Logger LOGGER = LoggerFactory.getLogger(GenericCommand.class);
public GenericCommand(HystrixCommandBuilder builder) {
super(builder);
}
protected Object run() throws Exception {
LOGGER.debug("execute command: {}", this.getCommandKey().name());
return this.process(new AbstractHystrixCommand<Object>.Action() {
Object execute() {
return GenericCommand.this.getCommandAction().execute(GenericCommand.this.getExecutionType());
}
});
}
2.3、利用工具CommandExecutor来执行
public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder)
throws RuntimeException {
Validate.notNull(invokable);
Validate.notNull(metaHolder);
switch(executionType) {
case SYNCHRONOUS:
//以同步方式
return castToExecutable(invokable, executionType).execute();
case ASYNCHRONOUS:
//以异步方式,转为 HystrixExecutable 并执行
HystrixExecutable executable = castToExecutable(invokable, executionType);
if (metaHolder.hasFallbackMethodCommand() && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
return new FutureDecorator(executable.queue());
}
return executable.queue();
case OBSERVABLE:
HystrixObservable observable = castToObservable(invokable);
return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
default:
throw new RuntimeException("unsupported execution type: " + executionType);
}
}
private static HystrixExecutable castToExecutable(HystrixInvokable invokable, ExecutionType executionType) {
if (invokable instanceof HystrixExecutable) {
return (HystrixExecutable)invokable;
} else {
throw new RuntimeException("Command should implement " + HystrixExecutable.class.getCanonicalName() + " interface to execute in: " + executionType + " mode");
}
}
2.4、执行com.netflix.hystrix.HystrixCommand#execute方法
public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R>
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
public Future<R> queue() {
//利用 JUC 的 Future 来异步执行
final Future<R> delegate = toObservable().toBlocking().toFuture();
final Future<R> f = new Future<R>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (delegate.isCancelled()) {
return false;
}
if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
}
final boolean res = delegate.cancel(interruptOnFutureCancel.get());
if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
final Thread t = executionThread.get();
if (t != null && !t.equals(Thread.currentThread())) {
t.interrupt();
}
}
return res;
}
@Override
public boolean isCancelled() {
return delegate.isCancelled();
}
@Override
public boolean isDone() {
return delegate.isDone();
}
@Override
public R get() throws InterruptedException, ExecutionException {
return delegate.get();
}
@Override
public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return delegate.get(timeout, unit);
}
};
if (f.isDone()) {
try {
f.get(); //执行正常的业务方法
return f;
} catch (Exception e) {
Throwable t = decomposeException(e);
if (t instanceof HystrixBadRequestException) {
return f;
} else if (t instanceof HystrixRuntimeException) {
HystrixRuntimeException hre = (HystrixRuntimeException) t;
switch (hre.getFailureType()) {
case COMMAND_EXCEPTION:
case TIMEOUT:
return f;
default:
throw hre;
}
} else {
throw Exceptions.sneakyThrow(t);
}
}
}
return f;
}
其实上述过程算是总览,可以知道Hystrix是通过AOP来实现的。
3、工作原理流程总结
1)构建命令:@HystrixCommand
2)执行命令:@com.netflix.hystrix.HystrixCommand#execute
3)检查缓存:
如果启用了Hystrix Cache,任务执行前将先判断是否有相同命令执行的缓存。如果有则直接返回缓存的结果;如果没有缓存的结果,但启动了缓存,将缓存本次执行结果以供后续使用。
4)检查断路器是否打开:
断路器(circuit-breaker)和保险丝类似,保险丝在发生危险时将会烧断以保护电路,而断路器可以在达到我们设定的阀值时触发短路(比如请求失败率达到50%),拒绝执行任何请求。如果断路器被打开,Hystrix将不会执行命令,直接进入Fallback处理逻辑。
5)检查线程池/信号量情况
Hystrix隔离方式有线程池隔离和信号量隔离。当使用Hystrix线程池时,Hystrix默认为每个依赖服务分配10个线程,当10个线程都繁忙时,将拒绝执行命令,信号量同理。
6)执行具体的任务
HystrixCommand.run() 来运行用户真正的任务。
7)计算链路健康情况
每次开始执行command、结束执行command以及发生异常等情况时,都会记录执行情况,例如:成功、失败、拒绝以及超时等情况,会定期处理这些数据,再根据设定的条件来判断是否开启断路器。
8)命令失败时执行Fallback逻辑
在命令失败时执行用户指定的Fallback逻辑。上图中的断路、线程池拒绝、信号量拒绝、执行执行、执行超时都会进入Fallback处理。
9)返回执行结果Observable
原始结果将以Observable形式返回,在返回给用户之前,会根据调用方式的不同做一些处理。
3.1、com.netflix.hystrix.AbstractCommand#toObservable
public Observable<R> toObservable() {
final AbstractCommand<R> _cmd = this;
//TODO 命令执行结束后的清理者
final Action0 terminateCommandCleanup = new Action0() {...};
// TODO 取消订阅时处理者
final Action0 unsubscribeCommandCleanup = new Action0() {...};
//TODO Hystrix 核心逻辑: 断路器、隔离
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {...};
//TODO 发射数据(OnNext表示发射数据)时的Hook
final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {...};
//TODO 命令执行完成的Hook
final Action0 fireOnCompletedHook = new Action0() {...};
//TODO 通过Observable.defer()创建一个Observable
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
final boolean requestCacheEnabled = isRequestCachingEnabled();
final String cacheKey = getCacheKey();
// TODO 首先尝试从请求缓存中获取结果
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
//TODO 使用上面的Func0:applyHystrixSemantics 来创建Observable
Observable<R> hystrixObservable =Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
//TODO 如果启用请求缓存,将Observable包装成HystrixCachedObservable并进行相关处理
if (requestCacheEnabled && cacheKey != null) {
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
...
} else {
afterCache = hystrixObservable;
}
//TODO 返回Observable
return afterCache .doOnTerminate(terminateCommandCleanup)
.doOnUnsubscribe(unsubscribeCommandCleanup)
.doOnCompleted(fireOnCompletedHook);
}
});
}
解释下Action0、Func1这种对象。Action、Func和Runnable、Callable类似,是一个可以被执行的实体。Action没有返回值,Action0…ActionN表示有0..N个参数,Action0就表示没有参数;Func有返值,0..N一样表示参数。
3.2、下面用核心的applyHystrixSemantics来阐述一下。
// applyHystrixSemantics 是一个Func0(理解为执行实体或处理者),表示没有参数,返回值是Observable。
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// 如果未订阅,返回一个"哑炮" Observable, 即一个不会发射任何数据的Observable
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
// 调用applyHystrixSemantics()来创建Observable
return applyHystrixSemantics(_cmd);
}
};
toObservable 到底做了什么?
其实就是主备大量的处理中(观察者),实际使用时是最后的Observable.defer(new Func0>(){…}
3.3、Observable.defer方法分析
defer译为延迟,表示演讲者会等有观众来时才开始分享,Observable.defer()就是说:必须有观察者订阅时,Observable才开始发射数据。而defer()的参数是个Func0,是一个会返回Observable的执行实体。下面看看defer():
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
//TODO 再一次使用Observable.defer()技能,这次用的是applyHystrixSemantics这个Func0
Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks);
...
//TODO 此处忽略了请求缓存处理,上面已有提及
Observable<R> afterCache;
...
//TODO 为Observable绑定几个特定事件的处理者,这都是上门创建的Action0
return afterCache.doOnTerminate(terminateCommandCleanup)
.doOnUnsubscribe(unsubscribeCommandCleanup)
.doOnCompleted(fireOnCompletedHook);
}
});
3.4、调用applyHystrixSemantics()来创建Observable
com.netflix.hystrix.AbstractCommand.applyHystrixSemantics()
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// 源码中有很多executionHook、eventNotifier的操作,这是Hystrix拓展性的一种体现。
// 这里面啥事也没做,留了个口子,开发人员可以拓展
executionHook.onStart(_cmd);
// 判断断路器是否开启
if (circuitBreaker.attemptExecution()) {
// 获取执行信号
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
// 判断是否信号量拒绝
if (executionSemaphore.tryAcquire()) {
try {
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
// 处理隔离策略和Fallback策略
return handleSemaphoreRejectionViaFallback();
}
} else { // 开启了断路器,执行Fallback
return handleShortCircuitViaFallback();
}
}
com.netflix.hystrix.AbstractCommand.executeCommandAndObserve(_cmd)方法:
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
final Action1<R> markEmits = new Action1<R>() {...};
final Action0 markOnCompleted = new Action0() {...};
//TODO 利用Func1获取处理Fallback的 Observable
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
circuitBreaker.markNonSuccess();
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
//TODO 拒绝处理
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
//TODO 超时处理
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
...
return handleFailureViaFallback(e);
}
}
};
final Action1<Notification<? super R>> setRequestContext ...
Observable<R> execution;
// TODO 利用特定的隔离策略来处理
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
//TODO 绑定Fallback的处理者
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
3.5、利用特定的隔离策略来处理
com.netflix.hystrix.AbstractCommand.executeCommandWithSpecifiedIsolation(_cmd)方法:
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
//TODO 1、线程池隔离
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
//TODO 再次使用 Observable.defer(), 通过执行Func0来得到Observable
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
//TODO 收集metric信息
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
...
try {
...
//TODO 获取包裹实际Task的Observable
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
...
}
// TODO 绑定各种处理者
}).doOnTerminate(new Action0() {...})
.doOnUnsubscribe(new Action0() {...})
//TODO 绑定超时处理者
.subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get()
&& _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
}
//TODO 2、信号量隔离,和线程池大同小异
else {
return Observable.defer(new Func0<Observable<R>>() {...}
}
}
3.6、这里出现了两种隔离方式:
Thread Pools(线程池):
将各依赖服务的访问交由独立的线程池来处理,会为每个依赖服务创建一个线程池,虽然可以起到很好的隔离作用,但也增加了计算开。
Semaphores(信号量):
通过为各依赖服务设置信号量(或计数器)来限制并发调用,相当于对各依赖服务做限流。信号量模式下任务由当前线程直接处理,不涉及到线程切换,自然也就没有超时控制。
线程池模式核心代码是这句:subscribeOn(threadPool.getScheduler(new Func0<Boolean>()...
3.7、使用Scheduler来处理当前任务
com.netflix.hystrix.HystrixThreadPool.getScheduler方法处理任务
@Override
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
touchConfig();
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}
threadPool为HystrixThreadPool类型com.netflix.hystrix.AbstractCommand#initThreadPool里面初始化的
private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) {
if (fromConstructor == null) {
//每个threadPoolKey会维护一个HystrixThreadPool
return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults);
} else {
return fromConstructor;
}
}
static class Factory为AbstractCommand的一个内部类实现,获取HystrixThreadPool
3.8、获取信号量getExecutionSemaphore()
再回到com.netflix.hystrix.AbstractCommand#applyHystrixSemantics方法分析getExecutionSemaphore();方法的实现:
// 获取信号量com.netflix.hystrix.AbstractCommand#getExecutionSemaphore
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
protected TryableSemaphore getExecutionSemaphore() {
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.SEMAPHORE) {
if (executionSemaphoreOverride == null) {
TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name());
if (_s == null) {
executionSemaphorePerCircuit.putIfAbsent(commandKey.name(),
new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests()));
return executionSemaphorePerCircuit.get(commandKey.name());
} else {
return _s;
}
} else {
return executionSemaphoreOverride;
}
} else {
// return NoOp implementation since we're not using SEMAPHORE isolation
return TryableSemaphoreNoOp.DEFAULT;
}
}
只有在隔离策略为SEMAPHORE时,才会创建TryableSemaphoreActual,否则返回一个什么也不做的TryableSemaphoreNoOp(tryAcquire()将永远返回true)。
static class TryableSemaphoreActual implements TryableSemaphore {
protected final HystrixProperty<Integer> numberOfPermits;
private final AtomicInteger count = new AtomicInteger(0);
public TryableSemaphoreActual(HystrixProperty<Integer> numberOfPermits) {
// 每个HystrixCommandKey默认信号量数量,默认10
this.numberOfPermits = numberOfPermits;
}
@Override
public boolean tryAcquire() {
int currentCount = count.incrementAndGet();
// 如果信号量超过设定的信号量,则启动信号量拒绝
if (currentCount > numberOfPermits.get()) {
count.decrementAndGet();
return false;
} else {
return true;
}
}
@Override
public void release() {
count.decrementAndGet();
}
@Override
public int getNumberOfPermitsUsed() {
return count.get();
}
}