206 lines
11 KiB
Markdown
206 lines
11 KiB
Markdown
# CircuitBreaker
|
||
resilience4j是一个轻量级的fault tolerance library,其针对函数式编程进行设计。resilence4j提供了更高阶的函数(`decorator`)来对`function interface, lambda expression, method reference`等内容进行增强。
|
||
|
||
decorators包含如下分类:
|
||
- CircuitBreaker
|
||
- Rate Limiter
|
||
- Retry
|
||
- Bulkhead
|
||
|
||
对于任何`function interface, lambda expression, method reference`,都可以使用多个decorators进行装饰。
|
||
|
||
## Introduction
|
||
在如下示例中,会展示如何通过CircuitBreaker和Retry来对lambda expression进行装饰,令lambda在发生异常时最多重试3次。
|
||
|
||
可以针对多次retry之间的interval进行配置,也支持自定义的backoff algorithm。
|
||
|
||
```java
|
||
// Create a CircuitBreaker with default configuration
|
||
CircuitBreaker circuitBreaker = CircuitBreaker
|
||
.ofDefaults("backendService");
|
||
|
||
// Create a Retry with default configuration
|
||
// 3 retry attempts and a fixed time interval between retries of 500ms
|
||
Retry retry = Retry
|
||
.ofDefaults("backendService");
|
||
|
||
// Create a Bulkhead with default configuration
|
||
Bulkhead bulkhead = Bulkhead
|
||
.ofDefaults("backendService");
|
||
|
||
Supplier<String> supplier = () -> backendService
|
||
.doSomething(param1, param2)
|
||
|
||
// Decorate your call to backendService.doSomething()
|
||
// with a Bulkhead, CircuitBreaker and Retry
|
||
// **note: you will need the resilience4j-all dependency for this
|
||
Supplier<String> decoratedSupplier = Decorators.ofSupplier(supplier)
|
||
.withCircuitBreaker(circuitBreaker)
|
||
.withBulkhead(bulkhead)
|
||
.withRetry(retry)
|
||
.decorate();
|
||
|
||
// When you don't want to decorate your lambda expression,
|
||
// but just execute it and protect the call by a CircuitBreaker.
|
||
String result = circuitBreaker
|
||
.executeSupplier(backendService::doSomething);
|
||
|
||
// You can also run the supplier asynchronously in a ThreadPoolBulkhead
|
||
ThreadPoolBulkhead threadPoolBulkhead = ThreadPoolBulkhead
|
||
.ofDefaults("backendService");
|
||
|
||
// The Scheduler is needed to schedule a timeout
|
||
// on a non-blocking CompletableFuture
|
||
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
|
||
TimeLimiter timeLimiter = TimeLimiter.of(Duration.ofSeconds(1));
|
||
|
||
CompletableFuture<String> future = Decorators.ofSupplier(supplier)
|
||
.withThreadPoolBulkhead(threadPoolBulkhead)
|
||
.withTimeLimiter(timeLimiter, scheduledExecutorService)
|
||
.withCircuitBreaker(circuitBreaker)
|
||
.withFallback(asList(TimeoutException.class,
|
||
CallNotPermittedException.class,
|
||
BulkheadFullException.class),
|
||
throwable -> "Hello from Recovery")
|
||
.get().toCompletableFuture();
|
||
```
|
||
### maven
|
||
resilence4j需要jdk17及以上,如果使用maven,可以按照如下方式来引入
|
||
|
||
引入所有包的方式如下
|
||
```xml
|
||
<dependency>
|
||
<groupId>io.github.resilience4j</groupId>
|
||
<artifactId>resilience4j-all</artifactId>
|
||
<version>${resilience4jVersion}</version>
|
||
</dependency>
|
||
```
|
||
|
||
按需引入方式如下
|
||
```xml
|
||
<dependency>
|
||
<groupId>io.github.resilience4j</groupId>
|
||
<artifactId>resilience4j-circuitbreaker</artifactId>
|
||
<version>${resilience4jVersion}</version>
|
||
</dependency>
|
||
<dependency>
|
||
<groupId>io.github.resilience4j</groupId>
|
||
<artifactId>resilience4j-ratelimiter</artifactId>
|
||
<version>${resilience4jVersion}</version>
|
||
</dependency>
|
||
<dependency>
|
||
<groupId>io.github.resilience4j</groupId>
|
||
<artifactId>resilience4j-retry</artifactId>
|
||
<version>${resilience4jVersion}</version>
|
||
</dependency>
|
||
<dependency>
|
||
<groupId>io.github.resilience4j</groupId>
|
||
<artifactId>resilience4j-bulkhead</artifactId>
|
||
<version>${resilience4jVersion}</version>
|
||
</dependency>
|
||
<dependency>
|
||
<groupId>io.github.resilience4j</groupId>
|
||
<artifactId>resilience4j-cache</artifactId>
|
||
<version>${resilience4jVersion}</version>
|
||
</dependency>
|
||
<dependency>
|
||
<groupId>io.github.resilience4j</groupId>
|
||
<artifactId>resilience4j-timelimiter</artifactId>
|
||
<version>${resilience4jVersion}</version>
|
||
</dependency>
|
||
```
|
||
|
||
## CircuitBreaker
|
||
### State Machine
|
||
CircuitBreaker通过有限状态机实现,其拥有如下状态:
|
||
- `CLOSED`
|
||
- `OPEN`
|
||
- `HALF_OPEN`
|
||
- `METRICS_ONLY`
|
||
- `DISABLED`
|
||
- `FORCED_OPEN`
|
||
|
||
其中,前三个状态为正常状态,后三个状态为特殊状态。
|
||
|
||
<img alt="" loading="lazy" src="https://files.readme.io/39cdd54-state_machine.jpg" title="state_machine.jpg" align="" caption="" height="auto" width="auto">
|
||
|
||
上述circuitbreaker状态转换逻辑如下所示:
|
||
- 处于`CLOSED`状态时,如果实际接口的失败率超过上限后,会从`CLOSED`状态转换为`OPEN`状态
|
||
- 处于`OPEN`状态下经过一段时间后,会从`OPEN`状态转换为`HALF_OPEN`状态
|
||
- 处于`HALF_OPEN`状态下,如果失败率小于上限,会从`HALF_OPEN`状态重新变为`CLOSED`状态
|
||
- 如果`HALF_OPEN`状态下,失败率仍然超过上限,则会从`HALF_OPEN`状态重新变为`OPEN`状态
|
||
|
||
### Sliding Window
|
||
CircuitBreaker会使用`滑动窗口`来存储和聚合调用结果。在使用CircuitBreaker时,可以选择`count-based`的滑动窗口还是`time-based`滑动窗口。
|
||
|
||
- `count-based`:`count-based`滑动窗口会对最近`N`次调用的结果进行聚合
|
||
- `time-based`:`time-based`滑动窗口将会对最近`N`秒的调用结果进行聚合
|
||
|
||
#### Count-based sliding window
|
||
count-based sliding window是通过循环数组来实现的,循环数组中包含了n个measurements。如果count window的大小为10,那么circular array一直都会有10个measurements。
|
||
|
||
count-based的滑动窗口实现会`total aggregation`结果进行更新,更新逻辑如下:
|
||
- 当一个新的调用返回结果后,其结果将会被记录,并且total aggregation也会被更新,将新调用的结果加到total aggregation中
|
||
- 发生新调用时,循环数组中最老(oldest)的measurement将会被淘汰,并且measurement也会从total aggregation中被减去,bucket也会被重置(bucket即measurement,bucket被重置即代表oldest measurement会被重置)
|
||
|
||
对于聚合结果的检查的开销是`O(1)`的,因为其是`pre-aggregated`的,并且和window size无关。
|
||
|
||
#### Time-based sliding window
|
||
Time-based sliding window其也是通过循环数组实现,数组中含有`N`个partial aggregation(bucket)。
|
||
|
||
如果time window大小是10秒,那么circular array一直都会有10的buckets。每个bucket都对应了一个epoch second,bucket会对该epoch second内发生的调用结果进行聚合。(`Partial aggregation`)。
|
||
|
||
在循环数组中,head buket中存储了当前epoch second中发生的调用结果,而其他的partial aggregation则存储的是之前second发生的调用结果。在Time-based的滑动窗口实现中,并不会像`Count-based`那样独立的存储调用结果,而是增量的对`partial aggregation`进行更新。
|
||
|
||
除了更新`Partial aggregation`外,time-based滑动窗口还会在新请求结果返回时,对`total aggregation`进行更新。当oldest bucket被淘汰时,该bucket的partial aggregation也会从total aggregation中被减去,并且bucket也会被重置。
|
||
|
||
检查聚合结果的开销也是`O(1)`的,Time-based滑动窗口也是`pre-aggregated`的。
|
||
|
||
partial aggregation中包含了3个integer,用于记录如下信息:
|
||
- failed calls次数
|
||
- slow calls次数
|
||
- 总共的call次数
|
||
|
||
除此之外,partial aggregation中还会包含一个long,用于存储所有请求的总耗时
|
||
|
||
### Failure rate and slow call rate thresholds
|
||
#### Failure rate & exception list
|
||
当failure rate`大于等于`配置的threshold时,CircuitBreaker的状态将会从`CLOSED`变为`OPEN`。
|
||
|
||
默认情况下,所有的抛出的异常都会被统计为failure,在使用时也可以指定一个`exception list`,在exception list中的异常才会被统计为failure,而不在exception list中的异常会被视为success。除此之外,还可以对异常进行`ignored`,被忽视的异常`既不会被统计为success,也不会被统计为failure`。
|
||
|
||
#### Slow call rate
|
||
当slow call rate大于或等于配置的threshold时,CircuitBreaker的状态也会从`CLOSED`变为`OPEN`。通过slow call rate,可以降低外部系统的负载。
|
||
|
||
只有`当记录的call数量达到最小数量时`,failure rate和slow call rate才能被计算。例如,`minimum number of required calls`为10,只有当被记录的calls次数达到10时,failure rate和slow call rate才能被计算。`如果当前只记录了9个calls,即使9次调用全部都失败,circuitbreaker也不会变为open状态。`
|
||
|
||
#### CircuitBreaker in `OPEN`/`HALF_OPEN` state
|
||
circuitbreaker在`OPEN`状态时,会拒绝所有的调用,并且抛出`CallNotPermittedException`。在等待一段时间后,`CircuitBreaker`将会从`OPEN`状态转为`HALF_OPEN`状态,并允许一个`configurable number`数量的请求进行实际调用,从而检测是否backend已经恢复并且可以再次访问。
|
||
|
||
处于`HALF_OPEN`状态的circuitbreaker,假设`permittedNumberOfCalls`的数量为10,此时存在20个调用,那么前10个调用都能正常调用,而后10个调用将会被拒绝,并且抛出`CallNotPermittedException`。
|
||
|
||
在`HALF_OPEN`状态下,如果failure rate或是slow call rate大于等于配置的threshold,那么circuitbreaker状态将会转为OPEN。如果failure rate和slow call rate小于threshold,那么circuitbreaker状态将变为CLOSED。
|
||
|
||
### Special States
|
||
CircuitBreaker支持3个特殊状态:
|
||
- `METRICS_ONLY`:处于该状态时,其行为如下
|
||
- 所有`circuit breaker events`都会正常生成(除了state transition外),并且metrics会正常记录
|
||
- 该状态和`CLOSED`状态类似,但是circuitbreaker在threshold达到时,不会切换为OPEN状态
|
||
- `DISABLED`:
|
||
- 没有`CircuitBreakerEvent`会被产生,metrics也不会被记录
|
||
- 会允许所有的访问
|
||
- `FORCED_OPEN`:
|
||
- 没有`CircuitBreakerEvent`会被产生,metrics也不会被记录
|
||
- 会拒绝所有的访问
|
||
|
||
退出这些特殊状态的方式如下:
|
||
- 触发state transition
|
||
- 对CircuitBreaker执行reset
|
||
|
||
### thread-safe
|
||
`CircuitBreaker线程安全`,但是CircuitBreaker并不会对function call进行串行化,故而`在使用CircuitBreaker时,function call可能会并行执行`。
|
||
|
||
对于Closed状态的CircuitBreaker而言,如果20个线程同时对cirbuitbreaker进行访问,那么所有的方法调用都能同时并行执行,即使滑动窗口的大小为`15`小于并行数量。`滑动窗口的大小不会对方法的并行程度造成影响`。
|
||
|
||
如果想要对并行程度做出限制,可以使用`Bulkhead`。
|