Files
rikako-note/spring/Spring Cloud/CircuitBreaker/CircuitBreaker.md
2025-09-04 11:10:04 +08:00

976 lines
48 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

- [CircuitBreaker](#circuitbreaker)
- [Introduction](#introduction)
- [maven](#maven)
- [CircuitBreaker](#circuitbreaker-1)
- [State Machine](#state-machine)
- [Sliding Window](#sliding-window)
- [Count-based sliding window](#count-based-sliding-window)
- [Time-based sliding window](#time-based-sliding-window)
- [Failure rate and slow call rate thresholds](#failure-rate-and-slow-call-rate-thresholds)
- [Failure rate \& exception list](#failure-rate--exception-list)
- [Slow call rate](#slow-call-rate)
- [CircuitBreaker in `OPEN`/`HALF_OPEN` state](#circuitbreaker-in-openhalf_open-state)
- [Special States](#special-states)
- [thread-safe](#thread-safe)
- [CircuitBreakerRegistry](#circuitbreakerregistry)
- [Create and configure CircuitBreakerConfig](#create-and-configure-circuitbreakerconfig)
- [success/failure/ignore判断流程](#successfailureignore判断流程)
- [Decorate and execute a functional interface](#decorate-and-execute-a-functional-interface)
- [Consume emitted RegistryEvents](#consume-emitted-registryevents)
- [consume emitted CircuitBreakerEvents](#consume-emitted-circuitbreakerevents)
- [Bulkhead](#bulkhead)
- [create a BulkheadRegistry](#create-a-bulkheadregistry)
- [Create and configure a Bulkhead](#create-and-configure-a-bulkhead)
- [Create and configure a ThreadPoolBulkhead](#create-and-configure-a-threadpoolbulkhead)
- [decorate and execute a functional interface](#decorate-and-execute-a-functional-interface-1)
- [consume emitted RegistryEvents](#consume-emitted-registryevents-1)
- [consume emitted BulkheadEvents](#consume-emitted-bulkheadevents)
- [ThreadPoolBulkhead弊端](#threadpoolbulkhead弊端)
- [RateLimiter](#ratelimiter)
- [Create RateLimiterRegistry](#create-ratelimiterregistry)
- [Create and configure a RateLimiter](#create-and-configure-a-ratelimiter)
- [decorate and execute a functional interface](#decorate-and-execute-a-functional-interface-2)
- [consume emitted RegistryEvents](#consume-emitted-registryevents-2)
- [consume emitted RateLimiterEvents](#consume-emitted-ratelimiterevents)
- [Retry](#retry)
- [Create a RetryRegistry](#create-a-retryregistry)
- [Create and configure Retry](#create-and-configure-retry)
- [Decorate and execute a functional interface](#decorate-and-execute-a-functional-interface-3)
- [consume emitted RegistryEvents](#consume-emitted-registryevents-3)
- [use custom IntervalFunction](#use-custom-intervalfunction)
- [TimeLimiter](#timelimiter)
- [Create a TimeLimiterRegistry](#create-a-timelimiterregistry)
- [Create and configure TimeLimiter](#create-and-configure-timelimiter)
- [decorate and execute a functional interface](#decorate-and-execute-a-functional-interface-4)
- [Spring Cloud CircuitBreaker](#spring-cloud-circuitbreaker)
- [core concepts](#core-concepts)
- [Configuration](#configuration)
# CircuitBreaker
resilience4j是一个轻量级的fault tolerance library其针对函数式编程进行设计。resilence4j提供了更高阶的函数`decorator`)来对`function interface, lambda expression, method reference`等内容进行增强。
decorators包含如下分类
- CircuitBreaker
- Rate Limiter
- Retry
- Bulkhead
对于任何`function interface, lambda expression, method reference`都可以使用多个decorators进行装饰。
## Introduction
在如下示例中会展示如何通过CircuitBreaker和Retry来对lambda expression进行装饰令lambda在发生异常时最多重试3次。
可以针对多次retry之间的interval进行配置也支持自定义的backoff algorithm。
```java
// Create a CircuitBreaker with default configuration
CircuitBreaker circuitBreaker = CircuitBreaker
.ofDefaults("backendService");
// Create a Retry with default configuration
// 3 retry attempts and a fixed time interval between retries of 500ms
Retry retry = Retry
.ofDefaults("backendService");
// Create a Bulkhead with default configuration
Bulkhead bulkhead = Bulkhead
.ofDefaults("backendService");
Supplier<String> supplier = () -> backendService
.doSomething(param1, param2)
// Decorate your call to backendService.doSomething()
// with a Bulkhead, CircuitBreaker and Retry
// **note: you will need the resilience4j-all dependency for this
Supplier<String> decoratedSupplier = Decorators.ofSupplier(supplier)
.withCircuitBreaker(circuitBreaker)
.withBulkhead(bulkhead)
.withRetry(retry)
.decorate();
// When you don't want to decorate your lambda expression,
// but just execute it and protect the call by a CircuitBreaker.
String result = circuitBreaker
.executeSupplier(backendService::doSomething);
// You can also run the supplier asynchronously in a ThreadPoolBulkhead
ThreadPoolBulkhead threadPoolBulkhead = ThreadPoolBulkhead
.ofDefaults("backendService");
// The Scheduler is needed to schedule a timeout
// on a non-blocking CompletableFuture
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
TimeLimiter timeLimiter = TimeLimiter.of(Duration.ofSeconds(1));
CompletableFuture<String> future = Decorators.ofSupplier(supplier)
.withThreadPoolBulkhead(threadPoolBulkhead)
.withTimeLimiter(timeLimiter, scheduledExecutorService)
.withCircuitBreaker(circuitBreaker)
.withFallback(asList(TimeoutException.class,
CallNotPermittedException.class,
BulkheadFullException.class),
throwable -> "Hello from Recovery")
.get().toCompletableFuture();
```
### maven
resilence4j需要jdk17及以上如果使用maven可以按照如下方式来引入
引入所有包的方式如下
```xml
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-all</artifactId>
<version>${resilience4jVersion}</version>
</dependency>
```
按需引入方式如下
```xml
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-circuitbreaker</artifactId>
<version>${resilience4jVersion}</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-ratelimiter</artifactId>
<version>${resilience4jVersion}</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-retry</artifactId>
<version>${resilience4jVersion}</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-bulkhead</artifactId>
<version>${resilience4jVersion}</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-cache</artifactId>
<version>${resilience4jVersion}</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-timelimiter</artifactId>
<version>${resilience4jVersion}</version>
</dependency>
```
## CircuitBreaker
### State Machine
CircuitBreaker通过有限状态机实现其拥有如下状态
- `CLOSED`
- `OPEN`
- `HALF_OPEN`
- `METRICS_ONLY`
- `DISABLED`
- `FORCED_OPEN`
其中,前三个状态为正常状态,后三个状态为特殊状态。
<img alt="" loading="lazy" src="https://files.readme.io/39cdd54-state_machine.jpg" title="state_machine.jpg" align="" caption="" height="auto" width="auto">
上述circuitbreaker状态转换逻辑如下所示
- 处于`CLOSED`状态时,如果实际接口的失败率超过上限后,会从`CLOSED`状态转换为`OPEN`状态
- 处于`OPEN`状态下经过一段时间后,会从`OPEN`状态转换为`HALF_OPEN`状态
- 处于`HALF_OPEN`状态下,如果失败率小于上限,会从`HALF_OPEN`状态重新变为`CLOSED`状态
- 如果`HALF_OPEN`状态下,失败率仍然超过上限,则会从`HALF_OPEN`状态重新变为`OPEN`状态
### Sliding Window
CircuitBreaker会使用`滑动窗口`来存储和聚合调用结果。在使用CircuitBreaker时可以选择`count-based`的滑动窗口还是`time-based`滑动窗口。
- `count-based``count-based`滑动窗口会对最近`N`次调用的结果进行聚合
- `time-based``time-based`滑动窗口将会对最近`N`秒的调用结果进行聚合
#### Count-based sliding window
count-based sliding window是通过循环数组来实现的循环数组中包含了n个measurements。如果count window的大小为10那么circular array一直都会有10个measurements。
count-based的滑动窗口实现会`total aggregation`结果进行更新,更新逻辑如下:
- 当一个新的调用返回结果后其结果将会被记录并且total aggregation也会被更新将新调用的结果加到total aggregation中
- 发生新调用时循环数组中最老oldest的measurement将会被淘汰并且measurement也会从total aggregation中被减去bucket也会被重置bucket即measurementbucket被重置即代表oldest measurement会被重置
对于聚合结果的检查的开销是`O(1)`的,因为其是`pre-aggregated`并且和window size无关。
#### Time-based sliding window
Time-based sliding window其也是通过循环数组实现数组中含有`N`个partial aggregationbucket
如果time window大小是10秒那么circular array一直都会有10的buckets。每个bucket都对应了一个epoch secondbucket会对该epoch second内发生的调用结果进行聚合。`Partial aggregation`)。
在循环数组中head buket中存储了当前epoch second中发生的调用结果而其他的partial aggregation则存储的是之前second发生的调用结果。在Time-based的滑动窗口实现中并不会像`Count-based`那样独立的存储调用结果,而是增量的对`partial aggregation`进行更新。
除了更新`Partial aggregation`time-based滑动窗口还会在新请求结果返回时`total aggregation`进行更新。当oldest bucket被淘汰时该bucket的partial aggregation也会从total aggregation中被减去并且bucket也会被重置。
检查聚合结果的开销也是`O(1)`Time-based滑动窗口也是`pre-aggregated`的。
partial aggregation中包含了3个integer用于记录如下信息
- failed calls次数
- slow calls次数
- 总共的call次数
除此之外partial aggregation中还会包含一个long用于存储所有请求的总耗时
### Failure rate and slow call rate thresholds
#### Failure rate & exception list
当failure rate`大于等于`配置的threshold时CircuitBreaker的状态将会从`CLOSED`变为`OPEN`
默认情况下所有的抛出的异常都会被统计为failure在使用时也可以指定一个`exception list`在exception list中的异常才会被统计为failure而不在exception list中的异常会被视为success。除此之外还可以对异常进行`ignored`,被忽视的异常`既不会被统计为success也不会被统计为failure`
#### Slow call rate
当slow call rate大于或等于配置的threshold时CircuitBreaker的状态也会从`CLOSED`变为`OPEN`。通过slow call rate可以降低外部系统的负载。
只有`当记录的call数量达到最小数量时`failure rate和slow call rate才能被计算。例如`minimum number of required calls`为10只有当被记录的calls次数达到10时failure rate和slow call rate才能被计算。`如果当前只记录了9个calls即使9次调用全部都失败circuitbreaker也不会变为open状态。`
#### CircuitBreaker in `OPEN`/`HALF_OPEN` state
circuitbreaker在`OPEN`状态时,会拒绝所有的调用,并且抛出`CallNotPermittedException`。在等待一段时间后,`CircuitBreaker`将会从`OPEN`状态转为`HALF_OPEN`状态,并允许一个`configurable number`数量的请求进行实际调用从而检测是否backend已经恢复并且可以再次访问。
处于`HALF_OPEN`状态的circuitbreaker假设`permittedNumberOfCalls`的数量为10此时存在20个调用那么前10个调用都能正常调用而后10个调用将会被拒绝并且抛出`CallNotPermittedException`
`HALF_OPEN`状态下如果failure rate或是slow call rate大于等于配置的threshold那么circuitbreaker状态将会转为OPEN。如果failure rate和slow call rate小于threshold那么circuitbreaker状态将变为CLOSED。
### Special States
CircuitBreaker支持3个特殊状态
- `METRICS_ONLY`:处于该状态时,其行为如下
- 所有`circuit breaker events`都会正常生成除了state transition外并且metrics会正常记录
- 该状态和`CLOSED`状态类似但是circuitbreaker在threshold达到时不会切换为OPEN状态
- `DISABLED`
- 没有`CircuitBreakerEvent`会被产生metrics也不会被记录
- 会允许所有的访问
- `FORCED_OPEN`:
- 没有`CircuitBreakerEvent`会被产生metrics也不会被记录
- 会拒绝所有的访问
退出这些特殊状态的方式如下:
- 触发state transition
- 对CircuitBreaker执行reset
### thread-safe
`CircuitBreaker线程安全`但是CircuitBreaker并不会对function call进行串行化故而`在使用CircuitBreaker时function call可能会并行执行`
对于Closed状态的CircuitBreaker而言如果20个线程同时对cirbuitbreaker进行访问那么所有的方法调用都能同时并行执行即使滑动窗口的大小为`15`小于并行数量。`滑动窗口的大小不会对方法的并行程度造成影响`
如果想要对并行程度做出限制,可以使用`Bulkhead`
### CircuitBreakerRegistry
resilence4j附带了一个`in-memory``CircuitBreakerRegistry`,基于`ConcurrentHashMap`实现。可以通过`CircuitBreakerRegistry`来管理创建和获取CircuitBreaker实例。可以按照如下示例根据`默认的CircuitBreakerConfig`来创建`CircuitBreakerRegistry`:
```java
CircuitBreakerRegistry circuitBreakerRegistry =
CircuitBreakerRegistry.ofDefaults();
```
#### Create and configure CircuitBreakerConfig
除了使用默认的CircuitBreakerConfig外还可以提供自定义的`CircuitBreakerConfig`对象可以通过builder来构建。
`CircuitBreakerConfig`的可配置属性如下:
- `failureRateThreshold`配置failure rate threshold的默认百分比
- `default value`: 50
- `description`当failure rate`大于等于`该threshold值时CircuitBreaker会切为`OPEN`状态,并开始`short-circuiting calls`
- `slowCallRateThreshold`配置threshold百分比
- `default value`: 100
- `description`: 当slow calls的百分比等于或超过该threshold时CircuitBreaker会切换到`OPEN`状态并且开始short-circuiting calls
- `slowCallDurationThreshold`: 配置slow calls的duration threshold
- `default value` 60000 [ms]
- `description`: 当call的耗时超过该duration threshold限制时会被认定为slow call并且会增加slow call rate
- `permittedNumberOfCallsInHalfOpenState`:
- `default value`: 10
- `description`配置circuitbreaker切换到half open状态时permitted calls的数量
- `maxWairDurationInfHalfOpenState`
- `default value`: 0 [ms]
- `description`:配置`在CircuitBreaker从Half Open状态切换回Open状态前其可以处于Half Open抓过你太的最长时间`。默认值为`0`代表其等待时间没有上限直到所有的permitted calls都执行完成
- `slidingWindowType`:配置滑动窗口的类型
- `default value`: `COUNT_BASED`
- `desceiption`在CircuitBreaker处于closed状态时滑动窗口用于记录调用的结果。滑动窗口类型可以是`count-based``time-based`
- `counted_based`:会记录最后`slidingWindowSize`个请求的结果并对其进行聚合
- `time_based`:会记录最后`slidingWindowSize`秒的调用结果,并且会对其进行聚合
- `slidingWindowSize`:
- `default value`: 100
- `description`:用于配置滑动窗口的大小
- `minimumNumberOfCalls`:
- `default value`:100
- `description`: 在可以计算error rate和slow call rate之前至少需要记录`minimum number`个调用。例如,如果`minimumNumberOfCalls`为10那么在可以计算failure rate前必须至少记录10个calls范围是整个滑动窗口期间范围内。如果已经计算了9个calls即使9个calls都调用失败在记录的请求数达到`minimumNumberOfCalls`之前也不会切换到open状态
- `waitDurationInOpenState`
- `default value`: 60000 [ms]
- `description`:该值代表`处于OPEN状态的CircuitBreaker`在切换为`HALF-OPEN`之前,会等待的时间长度
- `automaticTransitionFromOpenToHalfOpenEnabled`:
- `default value`: false
- `description`:
- `如果该值设置为true`,会创建一个线程来对所有`CircuitBreakers`对象进行监控并在waitDurationInOpenState设置的时间达到后将CircuitBreaker从Open切换到HALF_OPEN状态
- `如果该值设置为false默认`,那么`CircuitBreaker``OPEN`状态切换到`HALF_OPEN`状态的过程并不由专门的线程来触发,`而是由请求来触发`。如果该值设置为false默认即使`waitDurationInOpenState`设置的时间达到,如果没有后续请求,那么从`OPEN``HALF_OPEN`的变化也不会自动被触发
- `recordExceptions`:
- `default value`: empty
- `description`: 该属性用于配置exception list位于该exception list中的异常类型将会被视为failure并增加failure rate。
- 任何匹配或者继承exception list中的异常将会被视为failure除非通过`ignoreExceptions`显式的忽略了该异常
- 如果通过`recordExcepions`指定了exception list那么所有其他的异常都会被视为`success`,除非显式被`ignoreExceptions`指定
- `ignoreExceptions`:
- `default value`: empty
- `description`: 用于配置exception list该list中配置的异常类型将会被忽略既不会被记录为failure也不会被记录为success
- 任何匹配或者继承了该list中异常类型的异常将会被忽略即使该异常在`recordExceptions`中被指定
- `recordFailurePredicate`
- `default value`: throwable -> true
- `description`: 一个自定义的predicate用于评估一个异常是否应该被记录为failure。如果该异常应当被记录为failure那么该predicate应当返回true如果该异常应当被记录为failure那么该predicate应当返回false
- `ignoreExceptionPredicate`:
- `default value`: throwable -> false
- `description`: 一个自定义的predicate用于评估一个异常是否应当被忽略或是记录为failure/success。
- 如果异常应当被忽略那么该predicate应当返回true
- 如果异常不应该被忽略predicate应当返回为false该异常应该被视为failure/success
##### success/failure/ignore判断流程
在实际调用发生异常时决定将异常视为success/failure/ignore的判断流程如下
- 如果实际调用未抛出异常,则记录为调用成功,否则继续
- 如果抛出异常,首先会检查异常是否位于`Ignored Exceptions`中,如果位于其中,则忽略该异常,否则继续
- 如果ignoreExceptionPredicate不为空根据该predicate进行判断如果返回为true则忽略该异常否则继续
- 校验异常是否位于`recordExceptions`如果位于其中则将其视为failure否则继续
- 如果recordFailurePredicate不为空根据`recordFailurePredicate`判断是否该异常应当被视为failure如果返回为true将其视为failure否则继续
- 如果上述都不满足那么将其视为success
创建自定义`CircuitBreakerConfig`的示例如下所示:
```java
// Create a custom configuration for a CircuitBreaker
CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
.failureRateThreshold(50)
.slowCallRateThreshold(50)
.waitDurationInOpenState(Duration.ofMillis(1000))
.slowCallDurationThreshold(Duration.ofSeconds(2))
.permittedNumberOfCallsInHalfOpenState(3)
.minimumNumberOfCalls(10)
.slidingWindowType(SlidingWindowType.TIME_BASED)
.slidingWindowSize(5)
.recordException(e -> INTERNAL_SERVER_ERROR
.equals(getResponse().getStatus()))
.recordExceptions(IOException.class, TimeoutException.class)
.ignoreExceptions(BusinessException.class, OtherBusinessException.class)
.build();
// Create a CircuitBreakerRegistry with a custom global configuration
CircuitBreakerRegistry circuitBreakerRegistry =
CircuitBreakerRegistry.of(circuitBreakerConfig);
// Get or create a CircuitBreaker from the CircuitBreakerRegistry
// with the global default configuration
CircuitBreaker circuitBreakerWithDefaultConfig =
circuitBreakerRegistry.circuitBreaker("name1");
// Get or create a CircuitBreaker from the CircuitBreakerRegistry
// with a custom configuration
CircuitBreaker circuitBreakerWithCustomConfig = circuitBreakerRegistry
.circuitBreaker("name2", circuitBreakerConfig);
```
除此之外还可以在circuitRegistry中添加配置该配置可以被多个CircuitBreaker实例共享
```java
CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
.failureRateThreshold(70)
.build();
circuitBreakerRegistry.addConfiguration("someSharedConfig", config);
CircuitBreaker circuitBreaker = circuitBreakerRegistry
.circuitBreaker("name", "someSharedConfig");
```
并且可以针对默认配置进行overwrite
```java
CircuitBreakerConfig defaultConfig = circuitBreakerRegistry
.getDefaultConfig();
CircuitBreakerConfig overwrittenConfig = CircuitBreakerConfig
.from(defaultConfig)
.waitDurationInOpenState(Duration.ofSeconds(20))
.build();
```
如果不想使用CircuitBreakerRegistry来管理CircuitBreaker实例也可以自己直接创建CircuitBreaker实例
```java
// Create a custom configuration for a CircuitBreaker
CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
.recordExceptions(IOException.class, TimeoutException.class)
.ignoreExceptions(BusinessException.class, OtherBusinessException.class)
.build();
CircuitBreaker customCircuitBreaker = CircuitBreaker
.of("testName", circuitBreakerConfig);
```
如果想要plugin in自己的registry实现可以提供一个自定义的`RegistryStore`实现并且通过Builder方法来plug in
```java
CircuitBreakerRegistry registry = CircuitBreakerRegistry.custom()
.withRegistryStore(new YourRegistryStoreImplementation())
.withCircuitBreakerConfig(CircuitBreakerConfig.ofDefaults())
.build();
```
### Decorate and execute a functional interface
CircuitBreaker可以针对`callable, supplier, runnable, consumer, checkedrunnable, checkedsupplier, checkedconsumer, completionStage`来进行decorate。
可以通过`Try.of``Try.run`来调用decorated function这样允许链式调用使用示例如下
```java
// Given
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("testName");
// When I decorate my function
CheckedFunction0<String> decoratedSupplier = CircuitBreaker
.decorateCheckedSupplier(circuitBreaker, () -> "This can be any method which returns: 'Hello");
// and chain an other function with map
Try<String> result = Try.of(decoratedSupplier)
.map(value -> value + " world'");
// Then the Try Monad returns a Success<String>, if all functions ran successfully.
assertThat(result.isSuccess()).isTrue();
assertThat(result.get()).isEqualTo("This can be any method which returns: 'Hello world'");
```
### Consume emitted RegistryEvents
可以向CircuitBreakerRegistry注册一个event consumer并且在`CircuitBreaker`被创建、替换、删除时执行对应的action
```java
CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.ofDefaults();
circuitBreakerRegistry.getEventPublisher()
.onEntryAdded(entryAddedEvent -> {
CircuitBreaker addedCircuitBreaker = entryAddedEvent.getAddedEntry();
LOG.info("CircuitBreaker {} added", addedCircuitBreaker.getName());
})
.onEntryRemoved(entryRemovedEvent -> {
CircuitBreaker removedCircuitBreaker = entryRemovedEvent.getRemovedEntry();
LOG.info("CircuitBreaker {} removed", removedCircuitBreaker.getName());
});
```
### consume emitted CircuitBreakerEvents
CircuitBreakerEvent在如下场景下会被发出
- state transition
- circuit breaker reset
- successful call
- recorded error
- ignored error
所有的event都包含额外的信息例如事件创建时间、call的处理时长等。如果想要消费该类事件需要向CircuitBreaker注册事件
```java
circuitBreaker.getEventPublisher()
.onSuccess(event -> logger.info(...))
.onError(event -> logger.info(...))
.onIgnoredError(event -> logger.info(...))
.onReset(event -> logger.info(...))
.onStateTransition(event -> logger.info(...));
// Or if you want to register a consumer listening
// to all events, you can do:
circuitBreaker.getEventPublisher()
.onEvent(event -> logger.info(...));
```
可以使用`CircularEventConsumer`来对事件进行存储事件会被存储在一个固定容量的circular buffer中
```java
CircularEventConsumer<CircuitBreakerEvent> ringBuffer =
new CircularEventConsumer<>(10);
circuitBreaker.getEventPublisher().onEvent(ringBuffer);
List<CircuitBreakerEvent> bufferedEvents = ringBuffer.getBufferedEvents()
```
## Bulkhead
resilence4j提供了两种bulkhead的实现bukhead可以用于限制并发执行的数量
- `SemaphoreBulkhead`: 该bulkhead实现基于信号量
- `FixedThreadPoolBulkhead`: 该实现使用了`a bounded queue and a fixed thread pool`
### create a BulkheadRegistry
和CircuitBreaker module类似Bulkhead module也提供了in-memory的`BulkheadRegistry和ThreadPoolBulkheadRegistry`用于管理Bulkhead实例示例如下
```java
BulkheadRegistry bulkheadRegistry = BulkheadRegistry.ofDefaults();
ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry =
ThreadPoolBulkheadRegistry.ofDefaults();
```
### Create and configure a Bulkhead
可以自己提供一个全局的`BulkheadConfig`,可通过`BulkheadConfig` builder来创建config对象该config对象支持如下属性配置
- `maxConcurentCalls`
- `default value`25
- `description`代表bulkhead允许的最大并行执行数量
- `maxWaitDuration`:
- `default value` 0
- `description`: 代表一个线程尝试进入饱和的bulkhead时该线程的最长等待时间
示例如下所示:
```java
// Create a custom configuration for a Bulkhead
BulkheadConfig config = BulkheadConfig.custom()
.maxConcurrentCalls(150)
.maxWaitDuration(Duration.ofMillis(500))
.build();
// Create a BulkheadRegistry with a custom global configuration
BulkheadRegistry registry = BulkheadRegistry.of(config);
// Get or create a Bulkhead from the registry -
// bulkhead will be backed by the default config
Bulkhead bulkheadWithDefaultConfig = registry.bulkhead("name1");
// Get or create a Bulkhead from the registry,
// use a custom configuration when creating the bulkhead
Bulkhead bulkheadWithCustomConfig = registry.bulkhead("name2", custom);
```
### Create and configure a ThreadPoolBulkhead
可以提供一个自定义的global `ThreadPoolBulkheadConfig`支持通过builder创建。
`ThreadPoolBulkheadConfig`支持如下配置属性:
- `maxThreadPoolSize`
- `default value`: `Runtime.getRuntime().availableProcessors()`
- `description`: 配置线程池的最大线程数
- `coreThreadPoolSize`:
- `default value`: `Runtime.getRuntime().availableProcessors()-1`
- `description`: 配置线程池的核心线程数
- `queueCapacity`:
- `default value`: 100
- `descritpion`: 配置queue的容量
- `keepAliveDuration`:
- `default value`: 20 [ms]
- `description`: 当线程池中线程数量大于coreSize时该值代表非核心线程在销毁前空闲的最大时长
- `writableStackTraceEnabled`:
- `default value`: true
- `descritpion`: 当bulkhead抛出异常时是否打印除Stack Trace当该值设置为false时仅打印单行
```java
ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom()
.maxThreadPoolSize(10)
.coreThreadPoolSize(2)
.queueCapacity(20)
.build();
// Create a BulkheadRegistry with a custom global configuration
ThreadPoolBulkheadRegistry registry = ThreadPoolBulkheadRegistry.of(config);
// Get or create a ThreadPoolBulkhead from the registry -
// bulkhead will be backed by the default config
ThreadPoolBulkhead bulkheadWithDefaultConfig = registry.bulkhead("name1");
// Get or create a Bulkhead from the registry,
// use a custom configuration when creating the bulkhead
ThreadPoolBulkheadConfig custom = ThreadPoolBulkheadConfig.custom()
.maxThreadPoolSize(5)
.build();
ThreadPoolBulkhead bulkheadWithCustomConfig = registry.bulkhead("name2", custom);
```
### decorate and execute a functional interface
可以使用Bulkhead对`callable, supplier, runnable, consumer, checkedrunnable, checkedsupplier, checkedconsumer, completionStage`来进行decorate示例如下
```java
// Given
Bulkhead bulkhead = Bulkhead.of("name", config);
// When I decorate my function
CheckedFunction0<String> decoratedSupplier = Bulkhead
.decorateCheckedSupplier(bulkhead, () -> "This can be any method which returns: 'Hello");
// and chain an other function with map
Try<String> result = Try.of(decoratedSupplier)
.map(value -> value + " world'");
// Then the Try Monad returns a Success<String>, if all functions ran successfully.
assertThat(result.isSuccess()).isTrue();
assertThat(result.get()).isEqualTo("This can be any method which returns: 'Hello world'");
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
```
```java
ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom()
.maxThreadPoolSize(10)
.coreThreadPoolSize(2)
.queueCapacity(20)
.build();
ThreadPoolBulkhead bulkhead = ThreadPoolBulkhead.of("name", config);
CompletionStage<String> supplier = ThreadPoolBulkhead
.executeSupplier(bulkhead, backendService::doSomething);
```
### consume emitted RegistryEvents
可以向`BulkheadRegistry`注册event consumer用于监听Bulkhead的创建、替换和删除事件
```java
BulkheadRegistry registry = BulkheadRegistry.ofDefaults();
registry.getEventPublisher()
.onEntryAdded(entryAddedEvent -> {
Bulkhead addedBulkhead = entryAddedEvent.getAddedEntry();
LOG.info("Bulkhead {} added", addedBulkhead.getName());
})
.onEntryRemoved(entryRemovedEvent -> {
Bulkhead removedBulkhead = entryRemovedEvent.getRemovedEntry();
LOG.info("Bulkhead {} removed", removedBulkhead.getName());
});
```
### consume emitted BulkheadEvents
BulkHead会发送BulkHeadEvent事件发送的事件包含如下类型
- permitted execution
- rejected execution
- finished execution
如果想要消费上述事件可以按照如下示例注册event consumer
```java
bulkhead.getEventPublisher()
.onCallPermitted(event -> logger.info(...))
.onCallRejected(event -> logger.info(...))
.onCallFinished(event -> logger.info(...));
```
### ThreadPoolBulkhead弊端
在ThreadPoolBulkhead的实现中会为每一个bulkhead都创建独立的线程池故而应当避免在项目中创建大量的bulkhead避免项目线程数量的膨胀以及线程切换带来的巨大开销。
## RateLimiter
Resilience4j提供了RateLimiter其将从`epoch``某一特定时间点详细见System.nanoTime方法的注释`开始的所有nanos划分为了一系列周期每个周期的时长可以通过`RateLimiterConfig.limitRefreshPeriod`来进行配置。在每个周期的开始RateLimiter都会将`active permissions number`设置为`RateLimiterConfig.limitForPeriod`
RateLimiter的默认实现为`AtomicRateLimiter`,其通过`AtomicReference`来管理自身的状态。`AtomicRateLimiter.State`其本身是不可变的并且包含如下fields:
- `activeCycle`: 上次调用所使用的cycle number
- `activePermissions` 在上次调用之后的available permissions该field可以为负数
- `nanosToWait`为了最后一次调用所需要等待的nanos数
除了`AtomicRateLimiter`之外,还存在`SemaphoreBasedRateLimiter`实现其使用了semaphore和scheduler在每次`RatelImiterConfig#limitRefreshPeriod`之后对permissions进行刷新
### Create RateLimiterRegistry
和CircuitBreaker module类似RateLimiter module也提供了in-memory RateLimiterRegistry用于管理RateLimiter实例。
```java
RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.ofDefaults();
```
### Create and configure a RateLimiter
可以提供自定义的`RateLimiterConfig`可通过builder进行创建。`RateLimiterConfig`支持如下配置属性:
- `timeoutDuration`:
- `default value`: 5 [s]
- `description`: 线程等待permission的默认等待时间
- `limitRefreshPeriod`:
- `default value`: 500 [ns]
- `description`: limit refresh period。在每个period刷新后rate limiter都会将其permission count重新设置为`limitForPeriod`
- `limitForPeriod`:
- `default value`: 50
- `description`: 一个refresh period内的可获取permissions数量
通过rateLimiter限制某些接口调用速率不能超过`10 req/ms`的示例如下:
```java
RateLimiterConfig config = RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofMillis(1))
.limitForPeriod(10)
.timeoutDuration(Duration.ofMillis(25))
.build();
// Create registry
RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.of(config);
// Use registry
RateLimiter rateLimiterWithDefaultConfig = rateLimiterRegistry
.rateLimiter("name1");
RateLimiter rateLimiterWithCustomConfig = rateLimiterRegistry
.rateLimiter("name2", config);
```
### decorate and execute a functional interface
RateLimiter支持对`callable, supplier, runnable, consumer, checkedRunnalbe, checkedSupplier, checkedConsumer, CompletionStage`进行decorate
```java
// Decorate your call to BackendService.doSomething()
CheckedRunnable restrictedCall = RateLimiter
.decorateCheckedRunnable(rateLimiter, backendService::doSomething);
Try.run(restrictedCall)
.andThenTry(restrictedCall)
.onFailure((RequestNotPermitted throwable) -> LOG.info("Wait before call it again :)"));
```
可以使用`changeTimeoutDuration``changeLimitForPeriod`在运行时改变rateLimiter的参数。new timeout并不会影响`正在等待permissions`的线程。并且new limit不会影响当前period的permissionsnew limiter会从下个period开始应用
```java
// Decorate your call to BackendService.doSomething()
CheckedRunnable restrictedCall = RateLimiter
.decorateCheckedRunnable(rateLimiter, backendService::doSomething);
// during second refresh cycle limiter will get 100 permissions
rateLimiter.changeLimitForPeriod(100);
```
### consume emitted RegistryEvents
可以针对`RateLimiterRegistry`注册event consumer对rateLimiter的创建、替换和删除事件进行监听
```java
RateLimiterRegistry registry = RateLimiterRegistry.ofDefaults();
registry.getEventPublisher()
.onEntryAdded(entryAddedEvent -> {
RateLimiter addedRateLimiter = entryAddedEvent.getAddedEntry();
LOG.info("RateLimiter {} added", addedRateLimiter.getName());
})
.onEntryRemoved(entryRemovedEvent -> {
RateLimiter removedRateLimiter = entryRemovedEvent.getRemovedEntry();
LOG.info("RateLimiter {} removed", removedRateLimiter.getName());
});
```
### consume emitted RateLimiterEvents
RateLimiter会在如下场景下发送事件
- a successful permission acquire
- acquire failure
所有的事件都包含额外信息例如事件创建事件和rate limiter name等。如果想要消费事件可以针对rateLimiter注册event consumer
```java
rateLimiter.getEventPublisher()
.onSuccess(event -> logger.info(...))
.onFailure(event -> logger.info(...));
```
如果使用project reactor可以使用如下方式进行注册
```java
ReactorAdapter.toFlux(rateLimiter.getEventPublisher())
.filter(event -> event.getEventType() == FAILED_ACQUIRE)
.subscribe(event -> logger.info(...))
```
## Retry
### Create a RetryRegistry
和CircuitBreaker module类似该module也提供了in-memory RetryRegistry用于对Retry对象进行管理。
```java
RetryRegistry retryRegistry = RetryRegistry.ofDefaults();
```
### Create and configure Retry
可以提供一个自定义的global RetryConfig可以使用builder来创建RetryConfig。
RetryConfig支持如下配置
- `maxAttempts`:
- `default value`: 3
- `description`该参数代表最大尝试次数包含最初始的调用最初始的调用为first attemp
- `waitDuration`:
- `default value`: 500 [ms]
- `description`: 在多次retry attempts之间的固定等待间隔
- `intervalFunction`:
- `defaultValue`: `numOfAttempts -> waitDuration`
- `description`: 该参数对应的function用于在失败后修改等待时间默认情况下每次失败后等待时间是固定的都是waitDuration
- `intervalBiFunction`:
- `default value`: `(int numOfAttempts, Either<Throwable, T> result)->waitDuration`
- `description`: 该function用于在failure后修改等待时间间隔即基于attempt number和result/exception来计算等待时间间隔。当同时使用`intervalFunction``intervalBiFunction`时,会抛出异常
- `retryOnResultPredicate`
- `default value`: `result -> false`
- `description`: 该参数用于配置predicate用于判断result是否应该被重试。如果result应当被重试那么返回true否则返回false
- `retryOnExceptionPredicate`:
- `default`: `throwable -> true`
- `description`: 该参数用于判断是否exception应当被重试。如果exception应当被重试predicate返回true否则返回false
- `retryExceptions`
- `default value`: empty
- `description`: 配置exception list其将被记录为failure并且应当被重试。
- `ignoreExceptions`:
- `default value`: empty
- `descritpion`: 配置exception list该列表中的异常会被ignore并且不会重试
- `failAfterMaxAttempts`:
- `default value`: false
- `description`: 该参数用于启用和关闭`MaxRetriesExceededException`的抛出当Retry达到配置的maxAttempts后若result没有通过`retryOnResultPredicate`,则会根据`failAfterMaxAttempts`来重试
> 在抛出异常后,是否重试的逻辑如下:
> - 根据predicate判断该异常是否应该重试predicate逻辑判断流程如下
> - 如果异常位于ignoreExceptions中则不应重试
> - 如果异常位于retryExceptions中则predicate返回为true
> - 如果异常不位于retryExceptions中则根据retryOnExceptionPredicate来判断是否异常应当触发重试
> - 如果上述的predicate判断异常应该被重试那么再递增重试次数判断当前重试是否超过maxAttempts
> - 如果没有超过则在等待interval后触发重试
> - 如果超过maxAttempts规定的上限则不再重试直接抛出异常
> 在未抛出异常时,判断是否重试的逻辑如下:
> - 首先,根据`retryOnResultPredicate`判断当前返回结果是否应当触发重试,如果不应触发重试,则流程结束
> - 如果应当触发重试则增加当前的重试次数并和maxAttempts进行比较
> - 如果当前重试次数未超过maxAttempts则在等待interval后触发重试
> - 如果重试次数超过maxAttempts规定的值那么将根据failAfterMaxAttempts来决定是否抛出异常。当failAfterMaxAttempts为true时抛出异常当为false时不跑出异常。默认不会抛出异常。
创建RetryConfig的默认示例如下
```java
RetryConfig config = RetryConfig.custom()
.maxAttempts(2)
.waitDuration(Duration.ofMillis(1000))
.retryOnResult(response -> response.getStatus() == 500)
.retryOnException(e -> e instanceof WebServiceException)
.retryExceptions(IOException.class, TimeoutException.class)
.ignoreExceptions(BusinessException.class, OtherBusinessException.class)
.failAfterMaxAttempts(true)
.build();
// Create a RetryRegistry with a custom global configuration
RetryRegistry registry = RetryRegistry.of(config);
// Get or create a Retry from the registry -
// Retry will be backed by the default config
Retry retryWithDefaultConfig = registry.retry("name1");
// Get or create a Retry from the registry,
// use a custom configuration when creating the retry
RetryConfig custom = RetryConfig.custom()
.waitDuration(Duration.ofMillis(100))
.build();
Retry retryWithCustomConfig = registry.retry("name2", custom);
```
### Decorate and execute a functional interface
Retry可以针对`callable, supplier, runnable, consumer, checkedrunnable, checkedsupplier, checkedconsumer, completionstage`进行decorate使用示例如下
```java
// Given I have a HelloWorldService which throws an exception
HelloWorldService helloWorldService = mock(HelloWorldService.class);
given(helloWorldService.sayHelloWorld())
.willThrow(new WebServiceException("BAM!"));
// Create a Retry with default configuration
Retry retry = Retry.ofDefaults("id");
// Decorate the invocation of the HelloWorldService
CheckedFunction0<String> retryableSupplier = Retry
.decorateCheckedSupplier(retry, helloWorldService::sayHelloWorld);
// When I invoke the function
Try<String> result = Try.of(retryableSupplier)
.recover((throwable) -> "Hello world from recovery function");
// Then the helloWorldService should be invoked 3 times
BDDMockito.then(helloWorldService).should(times(3)).sayHelloWorld();
// and the exception should be handled by the recovery function
assertThat(result.get()).isEqualTo("Hello world from recovery function");
```
### consume emitted RegistryEvents
可以向RetryRegistry注册监听消费Retry的create, replace, delete事件
```java
RetryRegistry registry = RetryRegistry.ofDefaults();
registry.getEventPublisher()
.onEntryAdded(entryAddedEvent -> {
Retry addedRetry = entryAddedEvent.getAddedEntry();
LOG.info("Retry {} added", addedRetry.getName());
})
.onEntryRemoved(entryRemovedEvent -> {
Retry removedRetry = entryRemovedEvent.getRemovedEntry();
LOG.info("Retry {} removed", removedRetry.getName());
});
```
### use custom IntervalFunction
如果不想使用fixed wait duration可以自定义`IntervalFunction`该函数可以在每次attempt时独立计算wait duration。resilience4j支持一些工厂方法用于创建IntervalFunction示例如下
```java
IntervalFunction defaultWaitInterval = IntervalFunction
.ofDefaults();
// This interval function is used internally
// when you only configure waitDuration
IntervalFunction fixedWaitInterval = IntervalFunction
.of(Duration.ofSeconds(5));
IntervalFunction intervalWithExponentialBackoff = IntervalFunction
.ofExponentialBackoff();
IntervalFunction intervalWithCustomExponentialBackoff = IntervalFunction
.ofExponentialBackoff(IntervalFunction.DEFAULT_INITIAL_INTERVAL, 2d);
IntervalFunction randomWaitInterval = IntervalFunction
.ofRandomized();
// Overwrite the default intervalFunction with your custom one
RetryConfig retryConfig = RetryConfig.custom()
.intervalFunction(intervalWithExponentialBackoff)
.build();
```
> intervalFunction和intervalBiFunction不能同时指定同时指定时会抛出异常。
>
> 如果指定了intervalFunction那么在通过builder创建RetryConfig时会自动通过intervalFunction给intervalBiFunction也赋值。
>
> 如果指定了intervalFunction或intervalBiFunction中任一则使用指定的函数来计算waitDuration当二者都没有指定时则waitDuration固定为waitDuration
## TimeLimiter
### Create a TimeLimiterRegistry
`CircuitBreaker` module类似TimeLimiter module支持提供in-memory TimeLimiterRegistry可用于管理TimeLimiter实例。
```java
TimeLimiterRegistry timeLimiterRegistry = TimeLimiterRegistry.ofDefaults();
```
### Create and configure TimeLimiter
可以提供一个全局的自定义TimeLimiterConfig支持通过builder创建。
TimeLimiterConfig支持配置如下两个参数
- the timeout duration
- whether cancel should be called on the running future
使用示例如下所示:
```java
TimeLimiterConfig config = TimeLimiterConfig.custom()
.cancelRunningFuture(true)
.timeoutDuration(Duration.ofMillis(500))
.build();
// Create a TimeLimiterRegistry with a custom global configuration
TimeLimiterRegistry timeLimiterRegistry = TimeLimiterRegistry.of(config);
// Get or create a TimeLimiter from the registry -
// TimeLimiter will be backed by the default config
TimeLimiter timeLimiterWithDefaultConfig = registry.timeLimiter("name1");
// Get or create a TimeLimiter from the registry,
// use a custom configuration when creating the TimeLimiter
TimeLimiterConfig config = TimeLimiterConfig.custom()
.cancelRunningFuture(false)
.timeoutDuration(Duration.ofMillis(1000))
.build();
TimeLimiter timeLimiterWithCustomConfig = registry.timeLimiter("name2", config);
```
### decorate and execute a functional interface
TimeLimiter可以对`CompletionStage``Future`进行decorate示例如下
```java
// Given I have a helloWorldService.sayHelloWorld() method which takes too long
HelloWorldService helloWorldService = mock(HelloWorldService.class);
// Create a TimeLimiter
TimeLimiter timeLimiter = TimeLimiter.of(Duration.ofSeconds(1));
// The Scheduler is needed to schedule a timeout on a non-blocking CompletableFuture
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3);
// The non-blocking variant with a CompletableFuture
CompletableFuture<String> result = timeLimiter.executeCompletionStage(
scheduler, () -> CompletableFuture.supplyAsync(helloWorldService::sayHelloWorld)).toCompletableFuture();
// The blocking variant which is basically future.get(timeoutDuration, MILLISECONDS)
String result = timeLimiter.executeFutureSupplier(
() -> CompletableFuture.supplyAsync(() -> helloWorldService::sayHelloWorld));
```
在TimeLimiter的实现中`timeoutDuration`设置了`等待future执行结束的最长等待时间`,如果该等待时间超时,将会抛出`TimeoutException`
当发生等待超时时,将会根据`shouldCancelRunningFuture`参数的配置来决定`是否针对尚未执行完成的future调用cancel`
## Spring Cloud CircuitBreaker
Spring Cloud Circuit Breaker提供了circuit breaker的抽象可以让开发者自由选择circuit breaker的实现。目前spring cloud支持如下circuit breaker的实现
- Resilience4j
- Sentinel
- spring retry
### core concepts
可以通过`CircuitBreakerFactory` api来创建circuit breaker。在classpath中包含spring cloud circuit breaker starter时将会自动创建实现了`CircuitBreakerFactory`的bean。如下示例展示了如何使用该api
```java
@Service
public static class DemoControllerService {
private RestTemplate rest;
private CircuitBreakerFactory cbFactory;
public DemoControllerService(RestTemplate rest, CircuitBreakerFactory cbFactory) {
this.rest = rest;
this.cbFactory = cbFactory;
}
public String slow() {
return cbFactory.create("slow").run(() -> rest.getForObject("/slow", String.class), throwable -> "fallback");
}
}
```
`CircuitBreakerFactory.create`方法将会创建一个`CircuitBreaker`的实例run方法将会接收一个Supplier和一个Function
- supplier是实际被封装在circuitbreaker中的方法
- function是circuit breaker被触发之后的fallback
- function接收一个Throwablethrowable为导致该fallback被触发的异常
### Configuration
可以通过创建`Customizer`类型的bean来配置circuit breaker。
`Resilience4JCircuitBreaker`的实现中,每次调用`circuitbreaker#run`时,都会调用`Customizer#customize`方法,该行为可能会存在效率问题。故而,可以使用`Customizer#once`方法来创建Customizer其会保证customizer中的逻辑最多只会被调用一次。
示例如下所示:
```java
Customizer.once(circuitBreaker -> {
circuitBreaker.getEventPublisher()
.onStateTransition(event -> log.info("{}: {}", event.getCircuitBreakerName(), event.getStateTransition()));
}, CircuitBreaker::getName)
```