313 lines
18 KiB
Markdown
313 lines
18 KiB
Markdown
# CircuitBreaker
|
||
resilience4j是一个轻量级的fault tolerance library,其针对函数式编程进行设计。resilence4j提供了更高阶的函数(`decorator`)来对`function interface, lambda expression, method reference`等内容进行增强。
|
||
|
||
decorators包含如下分类:
|
||
- CircuitBreaker
|
||
- Rate Limiter
|
||
- Retry
|
||
- Bulkhead
|
||
|
||
对于任何`function interface, lambda expression, method reference`,都可以使用多个decorators进行装饰。
|
||
|
||
## Introduction
|
||
在如下示例中,会展示如何通过CircuitBreaker和Retry来对lambda expression进行装饰,令lambda在发生异常时最多重试3次。
|
||
|
||
可以针对多次retry之间的interval进行配置,也支持自定义的backoff algorithm。
|
||
|
||
```java
|
||
// Create a CircuitBreaker with default configuration
|
||
CircuitBreaker circuitBreaker = CircuitBreaker
|
||
.ofDefaults("backendService");
|
||
|
||
// Create a Retry with default configuration
|
||
// 3 retry attempts and a fixed time interval between retries of 500ms
|
||
Retry retry = Retry
|
||
.ofDefaults("backendService");
|
||
|
||
// Create a Bulkhead with default configuration
|
||
Bulkhead bulkhead = Bulkhead
|
||
.ofDefaults("backendService");
|
||
|
||
Supplier<String> supplier = () -> backendService
|
||
.doSomething(param1, param2)
|
||
|
||
// Decorate your call to backendService.doSomething()
|
||
// with a Bulkhead, CircuitBreaker and Retry
|
||
// **note: you will need the resilience4j-all dependency for this
|
||
Supplier<String> decoratedSupplier = Decorators.ofSupplier(supplier)
|
||
.withCircuitBreaker(circuitBreaker)
|
||
.withBulkhead(bulkhead)
|
||
.withRetry(retry)
|
||
.decorate();
|
||
|
||
// When you don't want to decorate your lambda expression,
|
||
// but just execute it and protect the call by a CircuitBreaker.
|
||
String result = circuitBreaker
|
||
.executeSupplier(backendService::doSomething);
|
||
|
||
// You can also run the supplier asynchronously in a ThreadPoolBulkhead
|
||
ThreadPoolBulkhead threadPoolBulkhead = ThreadPoolBulkhead
|
||
.ofDefaults("backendService");
|
||
|
||
// The Scheduler is needed to schedule a timeout
|
||
// on a non-blocking CompletableFuture
|
||
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
|
||
TimeLimiter timeLimiter = TimeLimiter.of(Duration.ofSeconds(1));
|
||
|
||
CompletableFuture<String> future = Decorators.ofSupplier(supplier)
|
||
.withThreadPoolBulkhead(threadPoolBulkhead)
|
||
.withTimeLimiter(timeLimiter, scheduledExecutorService)
|
||
.withCircuitBreaker(circuitBreaker)
|
||
.withFallback(asList(TimeoutException.class,
|
||
CallNotPermittedException.class,
|
||
BulkheadFullException.class),
|
||
throwable -> "Hello from Recovery")
|
||
.get().toCompletableFuture();
|
||
```
|
||
### maven
|
||
resilence4j需要jdk17及以上,如果使用maven,可以按照如下方式来引入
|
||
|
||
引入所有包的方式如下
|
||
```xml
|
||
<dependency>
|
||
<groupId>io.github.resilience4j</groupId>
|
||
<artifactId>resilience4j-all</artifactId>
|
||
<version>${resilience4jVersion}</version>
|
||
</dependency>
|
||
```
|
||
|
||
按需引入方式如下
|
||
```xml
|
||
<dependency>
|
||
<groupId>io.github.resilience4j</groupId>
|
||
<artifactId>resilience4j-circuitbreaker</artifactId>
|
||
<version>${resilience4jVersion}</version>
|
||
</dependency>
|
||
<dependency>
|
||
<groupId>io.github.resilience4j</groupId>
|
||
<artifactId>resilience4j-ratelimiter</artifactId>
|
||
<version>${resilience4jVersion}</version>
|
||
</dependency>
|
||
<dependency>
|
||
<groupId>io.github.resilience4j</groupId>
|
||
<artifactId>resilience4j-retry</artifactId>
|
||
<version>${resilience4jVersion}</version>
|
||
</dependency>
|
||
<dependency>
|
||
<groupId>io.github.resilience4j</groupId>
|
||
<artifactId>resilience4j-bulkhead</artifactId>
|
||
<version>${resilience4jVersion}</version>
|
||
</dependency>
|
||
<dependency>
|
||
<groupId>io.github.resilience4j</groupId>
|
||
<artifactId>resilience4j-cache</artifactId>
|
||
<version>${resilience4jVersion}</version>
|
||
</dependency>
|
||
<dependency>
|
||
<groupId>io.github.resilience4j</groupId>
|
||
<artifactId>resilience4j-timelimiter</artifactId>
|
||
<version>${resilience4jVersion}</version>
|
||
</dependency>
|
||
```
|
||
|
||
## CircuitBreaker
|
||
### State Machine
|
||
CircuitBreaker通过有限状态机实现,其拥有如下状态:
|
||
- `CLOSED`
|
||
- `OPEN`
|
||
- `HALF_OPEN`
|
||
- `METRICS_ONLY`
|
||
- `DISABLED`
|
||
- `FORCED_OPEN`
|
||
|
||
其中,前三个状态为正常状态,后三个状态为特殊状态。
|
||
|
||
<img alt="" loading="lazy" src="https://files.readme.io/39cdd54-state_machine.jpg" title="state_machine.jpg" align="" caption="" height="auto" width="auto">
|
||
|
||
上述circuitbreaker状态转换逻辑如下所示:
|
||
- 处于`CLOSED`状态时,如果实际接口的失败率超过上限后,会从`CLOSED`状态转换为`OPEN`状态
|
||
- 处于`OPEN`状态下经过一段时间后,会从`OPEN`状态转换为`HALF_OPEN`状态
|
||
- 处于`HALF_OPEN`状态下,如果失败率小于上限,会从`HALF_OPEN`状态重新变为`CLOSED`状态
|
||
- 如果`HALF_OPEN`状态下,失败率仍然超过上限,则会从`HALF_OPEN`状态重新变为`OPEN`状态
|
||
|
||
### Sliding Window
|
||
CircuitBreaker会使用`滑动窗口`来存储和聚合调用结果。在使用CircuitBreaker时,可以选择`count-based`的滑动窗口还是`time-based`滑动窗口。
|
||
|
||
- `count-based`:`count-based`滑动窗口会对最近`N`次调用的结果进行聚合
|
||
- `time-based`:`time-based`滑动窗口将会对最近`N`秒的调用结果进行聚合
|
||
|
||
#### Count-based sliding window
|
||
count-based sliding window是通过循环数组来实现的,循环数组中包含了n个measurements。如果count window的大小为10,那么circular array一直都会有10个measurements。
|
||
|
||
count-based的滑动窗口实现会`total aggregation`结果进行更新,更新逻辑如下:
|
||
- 当一个新的调用返回结果后,其结果将会被记录,并且total aggregation也会被更新,将新调用的结果加到total aggregation中
|
||
- 发生新调用时,循环数组中最老(oldest)的measurement将会被淘汰,并且measurement也会从total aggregation中被减去,bucket也会被重置(bucket即measurement,bucket被重置即代表oldest measurement会被重置)
|
||
|
||
对于聚合结果的检查的开销是`O(1)`的,因为其是`pre-aggregated`的,并且和window size无关。
|
||
|
||
#### Time-based sliding window
|
||
Time-based sliding window其也是通过循环数组实现,数组中含有`N`个partial aggregation(bucket)。
|
||
|
||
如果time window大小是10秒,那么circular array一直都会有10的buckets。每个bucket都对应了一个epoch second,bucket会对该epoch second内发生的调用结果进行聚合。(`Partial aggregation`)。
|
||
|
||
在循环数组中,head buket中存储了当前epoch second中发生的调用结果,而其他的partial aggregation则存储的是之前second发生的调用结果。在Time-based的滑动窗口实现中,并不会像`Count-based`那样独立的存储调用结果,而是增量的对`partial aggregation`进行更新。
|
||
|
||
除了更新`Partial aggregation`外,time-based滑动窗口还会在新请求结果返回时,对`total aggregation`进行更新。当oldest bucket被淘汰时,该bucket的partial aggregation也会从total aggregation中被减去,并且bucket也会被重置。
|
||
|
||
检查聚合结果的开销也是`O(1)`的,Time-based滑动窗口也是`pre-aggregated`的。
|
||
|
||
partial aggregation中包含了3个integer,用于记录如下信息:
|
||
- failed calls次数
|
||
- slow calls次数
|
||
- 总共的call次数
|
||
|
||
除此之外,partial aggregation中还会包含一个long,用于存储所有请求的总耗时
|
||
|
||
### Failure rate and slow call rate thresholds
|
||
#### Failure rate & exception list
|
||
当failure rate`大于等于`配置的threshold时,CircuitBreaker的状态将会从`CLOSED`变为`OPEN`。
|
||
|
||
默认情况下,所有的抛出的异常都会被统计为failure,在使用时也可以指定一个`exception list`,在exception list中的异常才会被统计为failure,而不在exception list中的异常会被视为success。除此之外,还可以对异常进行`ignored`,被忽视的异常`既不会被统计为success,也不会被统计为failure`。
|
||
|
||
#### Slow call rate
|
||
当slow call rate大于或等于配置的threshold时,CircuitBreaker的状态也会从`CLOSED`变为`OPEN`。通过slow call rate,可以降低外部系统的负载。
|
||
|
||
只有`当记录的call数量达到最小数量时`,failure rate和slow call rate才能被计算。例如,`minimum number of required calls`为10,只有当被记录的calls次数达到10时,failure rate和slow call rate才能被计算。`如果当前只记录了9个calls,即使9次调用全部都失败,circuitbreaker也不会变为open状态。`
|
||
|
||
#### CircuitBreaker in `OPEN`/`HALF_OPEN` state
|
||
circuitbreaker在`OPEN`状态时,会拒绝所有的调用,并且抛出`CallNotPermittedException`。在等待一段时间后,`CircuitBreaker`将会从`OPEN`状态转为`HALF_OPEN`状态,并允许一个`configurable number`数量的请求进行实际调用,从而检测是否backend已经恢复并且可以再次访问。
|
||
|
||
处于`HALF_OPEN`状态的circuitbreaker,假设`permittedNumberOfCalls`的数量为10,此时存在20个调用,那么前10个调用都能正常调用,而后10个调用将会被拒绝,并且抛出`CallNotPermittedException`。
|
||
|
||
在`HALF_OPEN`状态下,如果failure rate或是slow call rate大于等于配置的threshold,那么circuitbreaker状态将会转为OPEN。如果failure rate和slow call rate小于threshold,那么circuitbreaker状态将变为CLOSED。
|
||
|
||
### Special States
|
||
CircuitBreaker支持3个特殊状态:
|
||
- `METRICS_ONLY`:处于该状态时,其行为如下
|
||
- 所有`circuit breaker events`都会正常生成(除了state transition外),并且metrics会正常记录
|
||
- 该状态和`CLOSED`状态类似,但是circuitbreaker在threshold达到时,不会切换为OPEN状态
|
||
- `DISABLED`:
|
||
- 没有`CircuitBreakerEvent`会被产生,metrics也不会被记录
|
||
- 会允许所有的访问
|
||
- `FORCED_OPEN`:
|
||
- 没有`CircuitBreakerEvent`会被产生,metrics也不会被记录
|
||
- 会拒绝所有的访问
|
||
|
||
退出这些特殊状态的方式如下:
|
||
- 触发state transition
|
||
- 对CircuitBreaker执行reset
|
||
|
||
### thread-safe
|
||
`CircuitBreaker线程安全`,但是CircuitBreaker并不会对function call进行串行化,故而`在使用CircuitBreaker时,function call可能会并行执行`。
|
||
|
||
对于Closed状态的CircuitBreaker而言,如果20个线程同时对cirbuitbreaker进行访问,那么所有的方法调用都能同时并行执行,即使滑动窗口的大小为`15`小于并行数量。`滑动窗口的大小不会对方法的并行程度造成影响`。
|
||
|
||
如果想要对并行程度做出限制,可以使用`Bulkhead`。
|
||
|
||
### CircuitBreakerRegistry
|
||
resilence4j附带了一个`in-memory`的`CircuitBreakerRegistry`,基于`ConcurrentHashMap`实现。可以通过`CircuitBreakerRegistry`来管理(创建和获取)CircuitBreaker实例。可以按照如下示例,根据`默认的CircuitBreakerConfig`来创建`CircuitBreakerRegistry`:
|
||
```java
|
||
CircuitBreakerRegistry circuitBreakerRegistry =
|
||
CircuitBreakerRegistry.ofDefaults();
|
||
```
|
||
|
||
#### Create and configure CircuitBreakerConfig
|
||
除了使用默认的CircuitBreakerConfig外,还可以提供自定义的`CircuitBreakerConfig`,对象可以通过builder来构建。
|
||
|
||
`CircuitBreakerConfig`的可配置属性如下:
|
||
- `failureRateThreshold`:配置failure rate threshold的默认百分比
|
||
- `default value`: 50
|
||
- `description`:当failure rate`大于等于`该threshold值时,CircuitBreaker会切为`OPEN`状态,并开始`short-circuiting calls`
|
||
- `slowCallRateThreshold`:配置threshold百分比
|
||
- `default value`: 100
|
||
- `description`: 当slow calls的百分比等于或超过该threshold时,CircuitBreaker会切换到`OPEN`状态,并且开始short-circuiting calls
|
||
- `slowCallDurationThreshold`: 配置slow calls的duration threshold
|
||
- `default value`: 60000 [ms]
|
||
- `description`: 当call的耗时超过该duration threshold限制时,会被认定为slow call,并且会增加slow call rate
|
||
- `permittedNumberOfCallsInHalfOpenState`:
|
||
- `default value`: 10
|
||
- `description`:配置circuitbreaker切换到half open状态时,permitted calls的数量
|
||
- `maxWairDurationInfHalfOpenState`:
|
||
- `default value`: 0 [ms]
|
||
- `description`:配置`在CircuitBreaker从Half Open状态切换回Open状态前,其可以处于Half Open抓过你太的最长时间`。默认值为`0`,代表其等待时间没有上限,直到所有的permitted calls都执行完成
|
||
- `slidingWindowType`:配置滑动窗口的类型
|
||
- `default value`: `COUNT_BASED`
|
||
- `desceiption`:在CircuitBreaker处于closed状态时,滑动窗口用于记录调用的结果。滑动窗口类型可以是`count-based`和`time-based`
|
||
- `counted_based`:会记录最后`slidingWindowSize`个请求的结果并对其进行聚合
|
||
- `time_based`:会记录最后`slidingWindowSize`秒的调用结果,并且会对其进行聚合
|
||
- `slidingWindowSize`:
|
||
- `default value`: 100
|
||
- `description`:用于配置滑动窗口的大小
|
||
- `minimumNumberOfCalls`:
|
||
- `default value`:100
|
||
- `description`: 在可以计算error rate和slow call rate之前,至少需要记录`minimum number`个调用。例如,如果`minimumNumberOfCalls`为10,那么在可以计算failure rate前,必须至少记录10个calls(范围是整个滑动窗口期间范围内)。如果已经计算了9个calls,即使9个calls都调用失败,在记录的请求数达到`minimumNumberOfCalls`之前,也不会切换到open状态
|
||
- `waitDurationInOpenState`:
|
||
- `default value`: 60000 [ms]
|
||
- `description`:该值代表`处于OPEN状态的CircuitBreaker`在切换为`HALF-OPEN`之前,会等待的时间长度
|
||
- `automaticTransitionFromOpenToHalfOpenEnabled`:
|
||
- `default value`: false
|
||
- `description`:
|
||
- `如果该值设置为true`,会创建一个线程来对所有`CircuitBreakers`对象进行监控,并在waitDurationInOpenState设置的时间达到后将CircuitBreaker从Open切换到HALF_OPEN状态
|
||
- `如果该值设置为false(默认)`,那么`CircuitBreaker`从`OPEN`状态切换到`HALF_OPEN`状态的过程并不由专门的线程来触发,`而是由请求来触发`。如果该值设置为false(默认),即使`waitDurationInOpenState`设置的时间达到,如果没有后续请求,那么从`OPEN`到`HALF_OPEN`的变化也不会自动被触发
|
||
- `recordExceptions`:
|
||
- `default value`: empty
|
||
- `description`: 该属性用于配置exception list,位于该exception list中的异常类型将会被视为failure,并增加failure rate。
|
||
- 任何匹配或者继承exception list中的异常将会被视为failure,除非通过`ignoreExceptions`显式的忽略了该异常
|
||
- 如果通过`recordExcepions`指定了exception list,那么所有其他的异常都会被视为`success`,除非显式被`ignoreExceptions`指定
|
||
- `ignoreExceptions`:
|
||
- `default value`: empty
|
||
- `description`: 用于配置exception list,该list中配置的异常类型将会被忽略,既不会被记录为failure也不会被记录为success
|
||
- 任何匹配或者继承了该list中异常类型的异常将会被忽略,即使该异常在`recordExceptions`中被指定
|
||
- `recordFailurePredicate`:
|
||
- `default value`: throwable -> true
|
||
- `description`: 一个自定义的predicate,用于评估一个异常是否应该被记录为failure。如果该异常应当被记录为failure,那么该predicate应当返回true;如果该异常应当被记录为failure,那么该predicate应当返回false
|
||
- `ignoreExceptionPredicate`:
|
||
- `default value`: throwable -> false
|
||
- `description`: 一个自定义的predicate,用于评估一个异常是否应当被忽略或是记录为failure/success。
|
||
- 如果异常应当被忽略,那么该predicate应当返回true
|
||
- 如果异常不应该被忽略,predicate应当返回为false,该异常应该被视为failure/success
|
||
|
||
##### success/failure/ignore判断流程
|
||
在实际调用发生异常时,决定将异常视为success/failure/ignore的判断流程如下:
|
||
- 如果实际调用未抛出异常,则记录为调用成功,否则继续
|
||
- 如果抛出异常,首先会检查异常是否位于`Ignored Exceptions`中,如果位于其中,则忽略该异常,否则继续
|
||
- 如果ignoreExceptionPredicate不为空,根据该predicate进行判断,如果返回为true,则忽略该异常,否则继续
|
||
- 校验异常是否位于`recordExceptions`中,如果位于其中,则将其视为failure,否则继续
|
||
- 如果recordFailurePredicate不为空,根据`recordFailurePredicate`判断是否该异常应当被视为failure,如果返回为true,将其视为failure,否则继续
|
||
- 如果上述都不满足,那么将其视为success
|
||
|
||
创建自定义`CircuitBreakerConfig`的示例如下所示:
|
||
```java
|
||
// Create a custom configuration for a CircuitBreaker
|
||
CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
|
||
.failureRateThreshold(50)
|
||
.slowCallRateThreshold(50)
|
||
.waitDurationInOpenState(Duration.ofMillis(1000))
|
||
.slowCallDurationThreshold(Duration.ofSeconds(2))
|
||
.permittedNumberOfCallsInHalfOpenState(3)
|
||
.minimumNumberOfCalls(10)
|
||
.slidingWindowType(SlidingWindowType.TIME_BASED)
|
||
.slidingWindowSize(5)
|
||
.recordException(e -> INTERNAL_SERVER_ERROR
|
||
.equals(getResponse().getStatus()))
|
||
.recordExceptions(IOException.class, TimeoutException.class)
|
||
.ignoreExceptions(BusinessException.class, OtherBusinessException.class)
|
||
.build();
|
||
|
||
// Create a CircuitBreakerRegistry with a custom global configuration
|
||
CircuitBreakerRegistry circuitBreakerRegistry =
|
||
CircuitBreakerRegistry.of(circuitBreakerConfig);
|
||
|
||
// Get or create a CircuitBreaker from the CircuitBreakerRegistry
|
||
// with the global default configuration
|
||
CircuitBreaker circuitBreakerWithDefaultConfig =
|
||
circuitBreakerRegistry.circuitBreaker("name1");
|
||
|
||
// Get or create a CircuitBreaker from the CircuitBreakerRegistry
|
||
// with a custom configuration
|
||
CircuitBreaker circuitBreakerWithCustomConfig = circuitBreakerRegistry
|
||
.circuitBreaker("name2", circuitBreakerConfig);
|
||
```
|
||
|
||
|