48 KiB
- CircuitBreaker
- Introduction
- CircuitBreaker
- Bulkhead
- RateLimiter
- Retry
- TimeLimiter
- Spring Cloud CircuitBreaker
CircuitBreaker
resilience4j是一个轻量级的fault tolerance library,其针对函数式编程进行设计。resilence4j提供了更高阶的函数(decorator)来对function interface, lambda expression, method reference等内容进行增强。
decorators包含如下分类:
- CircuitBreaker
- Rate Limiter
- Retry
- Bulkhead
对于任何function interface, lambda expression, method reference,都可以使用多个decorators进行装饰。
Introduction
在如下示例中,会展示如何通过CircuitBreaker和Retry来对lambda expression进行装饰,令lambda在发生异常时最多重试3次。
可以针对多次retry之间的interval进行配置,也支持自定义的backoff algorithm。
// Create a CircuitBreaker with default configuration
CircuitBreaker circuitBreaker = CircuitBreaker
.ofDefaults("backendService");
// Create a Retry with default configuration
// 3 retry attempts and a fixed time interval between retries of 500ms
Retry retry = Retry
.ofDefaults("backendService");
// Create a Bulkhead with default configuration
Bulkhead bulkhead = Bulkhead
.ofDefaults("backendService");
Supplier<String> supplier = () -> backendService
.doSomething(param1, param2)
// Decorate your call to backendService.doSomething()
// with a Bulkhead, CircuitBreaker and Retry
// **note: you will need the resilience4j-all dependency for this
Supplier<String> decoratedSupplier = Decorators.ofSupplier(supplier)
.withCircuitBreaker(circuitBreaker)
.withBulkhead(bulkhead)
.withRetry(retry)
.decorate();
// When you don't want to decorate your lambda expression,
// but just execute it and protect the call by a CircuitBreaker.
String result = circuitBreaker
.executeSupplier(backendService::doSomething);
// You can also run the supplier asynchronously in a ThreadPoolBulkhead
ThreadPoolBulkhead threadPoolBulkhead = ThreadPoolBulkhead
.ofDefaults("backendService");
// The Scheduler is needed to schedule a timeout
// on a non-blocking CompletableFuture
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
TimeLimiter timeLimiter = TimeLimiter.of(Duration.ofSeconds(1));
CompletableFuture<String> future = Decorators.ofSupplier(supplier)
.withThreadPoolBulkhead(threadPoolBulkhead)
.withTimeLimiter(timeLimiter, scheduledExecutorService)
.withCircuitBreaker(circuitBreaker)
.withFallback(asList(TimeoutException.class,
CallNotPermittedException.class,
BulkheadFullException.class),
throwable -> "Hello from Recovery")
.get().toCompletableFuture();
maven
resilence4j需要jdk17及以上,如果使用maven,可以按照如下方式来引入
引入所有包的方式如下
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-all</artifactId>
<version>${resilience4jVersion}</version>
</dependency>
按需引入方式如下
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-circuitbreaker</artifactId>
<version>${resilience4jVersion}</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-ratelimiter</artifactId>
<version>${resilience4jVersion}</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-retry</artifactId>
<version>${resilience4jVersion}</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-bulkhead</artifactId>
<version>${resilience4jVersion}</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-cache</artifactId>
<version>${resilience4jVersion}</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-timelimiter</artifactId>
<version>${resilience4jVersion}</version>
</dependency>
CircuitBreaker
State Machine
CircuitBreaker通过有限状态机实现,其拥有如下状态:
CLOSEDOPENHALF_OPENMETRICS_ONLYDISABLEDFORCED_OPEN
其中,前三个状态为正常状态,后三个状态为特殊状态。
上述circuitbreaker状态转换逻辑如下所示:
- 处于
CLOSED状态时,如果实际接口的失败率超过上限后,会从CLOSED状态转换为OPEN状态 - 处于
OPEN状态下经过一段时间后,会从OPEN状态转换为HALF_OPEN状态 - 处于
HALF_OPEN状态下,如果失败率小于上限,会从HALF_OPEN状态重新变为CLOSED状态 - 如果
HALF_OPEN状态下,失败率仍然超过上限,则会从HALF_OPEN状态重新变为OPEN状态
Sliding Window
CircuitBreaker会使用滑动窗口来存储和聚合调用结果。在使用CircuitBreaker时,可以选择count-based的滑动窗口还是time-based滑动窗口。
count-based:count-based滑动窗口会对最近N次调用的结果进行聚合time-based:time-based滑动窗口将会对最近N秒的调用结果进行聚合
Count-based sliding window
count-based sliding window是通过循环数组来实现的,循环数组中包含了n个measurements。如果count window的大小为10,那么circular array一直都会有10个measurements。
count-based的滑动窗口实现会total aggregation结果进行更新,更新逻辑如下:
- 当一个新的调用返回结果后,其结果将会被记录,并且total aggregation也会被更新,将新调用的结果加到total aggregation中
- 发生新调用时,循环数组中最老(oldest)的measurement将会被淘汰,并且measurement也会从total aggregation中被减去,bucket也会被重置(bucket即measurement,bucket被重置即代表oldest measurement会被重置)
对于聚合结果的检查的开销是O(1)的,因为其是pre-aggregated的,并且和window size无关。
Time-based sliding window
Time-based sliding window其也是通过循环数组实现,数组中含有N个partial aggregation(bucket)。
如果time window大小是10秒,那么circular array一直都会有10的buckets。每个bucket都对应了一个epoch second,bucket会对该epoch second内发生的调用结果进行聚合。(Partial aggregation)。
在循环数组中,head buket中存储了当前epoch second中发生的调用结果,而其他的partial aggregation则存储的是之前second发生的调用结果。在Time-based的滑动窗口实现中,并不会像Count-based那样独立的存储调用结果,而是增量的对partial aggregation进行更新。
除了更新Partial aggregation外,time-based滑动窗口还会在新请求结果返回时,对total aggregation进行更新。当oldest bucket被淘汰时,该bucket的partial aggregation也会从total aggregation中被减去,并且bucket也会被重置。
检查聚合结果的开销也是O(1)的,Time-based滑动窗口也是pre-aggregated的。
partial aggregation中包含了3个integer,用于记录如下信息:
- failed calls次数
- slow calls次数
- 总共的call次数
除此之外,partial aggregation中还会包含一个long,用于存储所有请求的总耗时
Failure rate and slow call rate thresholds
Failure rate & exception list
当failure rate大于等于配置的threshold时,CircuitBreaker的状态将会从CLOSED变为OPEN。
默认情况下,所有的抛出的异常都会被统计为failure,在使用时也可以指定一个exception list,在exception list中的异常才会被统计为failure,而不在exception list中的异常会被视为success。除此之外,还可以对异常进行ignored,被忽视的异常既不会被统计为success,也不会被统计为failure。
Slow call rate
当slow call rate大于或等于配置的threshold时,CircuitBreaker的状态也会从CLOSED变为OPEN。通过slow call rate,可以降低外部系统的负载。
只有当记录的call数量达到最小数量时,failure rate和slow call rate才能被计算。例如,minimum number of required calls为10,只有当被记录的calls次数达到10时,failure rate和slow call rate才能被计算。如果当前只记录了9个calls,即使9次调用全部都失败,circuitbreaker也不会变为open状态。
CircuitBreaker in OPEN/HALF_OPEN state
circuitbreaker在OPEN状态时,会拒绝所有的调用,并且抛出CallNotPermittedException。在等待一段时间后,CircuitBreaker将会从OPEN状态转为HALF_OPEN状态,并允许一个configurable number数量的请求进行实际调用,从而检测是否backend已经恢复并且可以再次访问。
处于HALF_OPEN状态的circuitbreaker,假设permittedNumberOfCalls的数量为10,此时存在20个调用,那么前10个调用都能正常调用,而后10个调用将会被拒绝,并且抛出CallNotPermittedException。
在HALF_OPEN状态下,如果failure rate或是slow call rate大于等于配置的threshold,那么circuitbreaker状态将会转为OPEN。如果failure rate和slow call rate小于threshold,那么circuitbreaker状态将变为CLOSED。
Special States
CircuitBreaker支持3个特殊状态:
METRICS_ONLY:处于该状态时,其行为如下- 所有
circuit breaker events都会正常生成(除了state transition外),并且metrics会正常记录 - 该状态和
CLOSED状态类似,但是circuitbreaker在threshold达到时,不会切换为OPEN状态
- 所有
DISABLED:- 没有
CircuitBreakerEvent会被产生,metrics也不会被记录 - 会允许所有的访问
- 没有
FORCED_OPEN:- 没有
CircuitBreakerEvent会被产生,metrics也不会被记录 - 会拒绝所有的访问
- 没有
退出这些特殊状态的方式如下:
- 触发state transition
- 对CircuitBreaker执行reset
thread-safe
CircuitBreaker线程安全,但是CircuitBreaker并不会对function call进行串行化,故而在使用CircuitBreaker时,function call可能会并行执行。
对于Closed状态的CircuitBreaker而言,如果20个线程同时对cirbuitbreaker进行访问,那么所有的方法调用都能同时并行执行,即使滑动窗口的大小为15小于并行数量。滑动窗口的大小不会对方法的并行程度造成影响。
如果想要对并行程度做出限制,可以使用Bulkhead。
CircuitBreakerRegistry
resilence4j附带了一个in-memory的CircuitBreakerRegistry,基于ConcurrentHashMap实现。可以通过CircuitBreakerRegistry来管理(创建和获取)CircuitBreaker实例。可以按照如下示例,根据默认的CircuitBreakerConfig来创建CircuitBreakerRegistry:
CircuitBreakerRegistry circuitBreakerRegistry =
CircuitBreakerRegistry.ofDefaults();
Create and configure CircuitBreakerConfig
除了使用默认的CircuitBreakerConfig外,还可以提供自定义的CircuitBreakerConfig,对象可以通过builder来构建。
CircuitBreakerConfig的可配置属性如下:
failureRateThreshold:配置failure rate threshold的默认百分比default value: 50description:当failure rate大于等于该threshold值时,CircuitBreaker会切为OPEN状态,并开始short-circuiting calls
slowCallRateThreshold:配置threshold百分比default value: 100description: 当slow calls的百分比等于或超过该threshold时,CircuitBreaker会切换到OPEN状态,并且开始short-circuiting calls
slowCallDurationThreshold: 配置slow calls的duration thresholddefault value: 60000 [ms]description: 当call的耗时超过该duration threshold限制时,会被认定为slow call,并且会增加slow call rate
permittedNumberOfCallsInHalfOpenState:default value: 10description:配置circuitbreaker切换到half open状态时,permitted calls的数量
maxWairDurationInfHalfOpenState:default value: 0 [ms]description:配置在CircuitBreaker从Half Open状态切换回Open状态前,其可以处于Half Open抓过你太的最长时间。默认值为0,代表其等待时间没有上限,直到所有的permitted calls都执行完成
slidingWindowType:配置滑动窗口的类型default value:COUNT_BASEDdesceiption:在CircuitBreaker处于closed状态时,滑动窗口用于记录调用的结果。滑动窗口类型可以是count-based和time-basedcounted_based:会记录最后slidingWindowSize个请求的结果并对其进行聚合time_based:会记录最后slidingWindowSize秒的调用结果,并且会对其进行聚合
slidingWindowSize:default value: 100description:用于配置滑动窗口的大小
minimumNumberOfCalls:default value:100description: 在可以计算error rate和slow call rate之前,至少需要记录minimum number个调用。例如,如果minimumNumberOfCalls为10,那么在可以计算failure rate前,必须至少记录10个calls(范围是整个滑动窗口期间范围内)。如果已经计算了9个calls,即使9个calls都调用失败,在记录的请求数达到minimumNumberOfCalls之前,也不会切换到open状态
waitDurationInOpenState:default value: 60000 [ms]description:该值代表处于OPEN状态的CircuitBreaker在切换为HALF-OPEN之前,会等待的时间长度
automaticTransitionFromOpenToHalfOpenEnabled:default value: falsedescription:如果该值设置为true,会创建一个线程来对所有CircuitBreakers对象进行监控,并在waitDurationInOpenState设置的时间达到后将CircuitBreaker从Open切换到HALF_OPEN状态如果该值设置为false(默认),那么CircuitBreaker从OPEN状态切换到HALF_OPEN状态的过程并不由专门的线程来触发,而是由请求来触发。如果该值设置为false(默认),即使waitDurationInOpenState设置的时间达到,如果没有后续请求,那么从OPEN到HALF_OPEN的变化也不会自动被触发
recordExceptions:default value: emptydescription: 该属性用于配置exception list,位于该exception list中的异常类型将会被视为failure,并增加failure rate。- 任何匹配或者继承exception list中的异常将会被视为failure,除非通过
ignoreExceptions显式的忽略了该异常 - 如果通过
recordExcepions指定了exception list,那么所有其他的异常都会被视为success,除非显式被ignoreExceptions指定
- 任何匹配或者继承exception list中的异常将会被视为failure,除非通过
ignoreExceptions:default value: emptydescription: 用于配置exception list,该list中配置的异常类型将会被忽略,既不会被记录为failure也不会被记录为success- 任何匹配或者继承了该list中异常类型的异常将会被忽略,即使该异常在
recordExceptions中被指定
- 任何匹配或者继承了该list中异常类型的异常将会被忽略,即使该异常在
recordFailurePredicate:default value: throwable -> truedescription: 一个自定义的predicate,用于评估一个异常是否应该被记录为failure。如果该异常应当被记录为failure,那么该predicate应当返回true;如果该异常应当被记录为failure,那么该predicate应当返回false
ignoreExceptionPredicate:default value: throwable -> falsedescription: 一个自定义的predicate,用于评估一个异常是否应当被忽略或是记录为failure/success。- 如果异常应当被忽略,那么该predicate应当返回true
- 如果异常不应该被忽略,predicate应当返回为false,该异常应该被视为failure/success
success/failure/ignore判断流程
在实际调用发生异常时,决定将异常视为success/failure/ignore的判断流程如下:
- 如果实际调用未抛出异常,则记录为调用成功,否则继续
- 如果抛出异常,首先会检查异常是否位于
Ignored Exceptions中,如果位于其中,则忽略该异常,否则继续 - 如果ignoreExceptionPredicate不为空,根据该predicate进行判断,如果返回为true,则忽略该异常,否则继续
- 校验异常是否位于
recordExceptions中,如果位于其中,则将其视为failure,否则继续 - 如果recordFailurePredicate不为空,根据
recordFailurePredicate判断是否该异常应当被视为failure,如果返回为true,将其视为failure,否则继续 - 如果上述都不满足,那么将其视为success
创建自定义CircuitBreakerConfig的示例如下所示:
// Create a custom configuration for a CircuitBreaker
CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
.failureRateThreshold(50)
.slowCallRateThreshold(50)
.waitDurationInOpenState(Duration.ofMillis(1000))
.slowCallDurationThreshold(Duration.ofSeconds(2))
.permittedNumberOfCallsInHalfOpenState(3)
.minimumNumberOfCalls(10)
.slidingWindowType(SlidingWindowType.TIME_BASED)
.slidingWindowSize(5)
.recordException(e -> INTERNAL_SERVER_ERROR
.equals(getResponse().getStatus()))
.recordExceptions(IOException.class, TimeoutException.class)
.ignoreExceptions(BusinessException.class, OtherBusinessException.class)
.build();
// Create a CircuitBreakerRegistry with a custom global configuration
CircuitBreakerRegistry circuitBreakerRegistry =
CircuitBreakerRegistry.of(circuitBreakerConfig);
// Get or create a CircuitBreaker from the CircuitBreakerRegistry
// with the global default configuration
CircuitBreaker circuitBreakerWithDefaultConfig =
circuitBreakerRegistry.circuitBreaker("name1");
// Get or create a CircuitBreaker from the CircuitBreakerRegistry
// with a custom configuration
CircuitBreaker circuitBreakerWithCustomConfig = circuitBreakerRegistry
.circuitBreaker("name2", circuitBreakerConfig);
除此之外,还可以在circuitRegistry中添加配置,该配置可以被多个CircuitBreaker实例共享
CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
.failureRateThreshold(70)
.build();
circuitBreakerRegistry.addConfiguration("someSharedConfig", config);
CircuitBreaker circuitBreaker = circuitBreakerRegistry
.circuitBreaker("name", "someSharedConfig");
并且,可以针对默认配置进行overwrite
CircuitBreakerConfig defaultConfig = circuitBreakerRegistry
.getDefaultConfig();
CircuitBreakerConfig overwrittenConfig = CircuitBreakerConfig
.from(defaultConfig)
.waitDurationInOpenState(Duration.ofSeconds(20))
.build();
如果不想使用CircuitBreakerRegistry来管理CircuitBreaker实例,也可以自己直接创建CircuitBreaker实例:
// Create a custom configuration for a CircuitBreaker
CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
.recordExceptions(IOException.class, TimeoutException.class)
.ignoreExceptions(BusinessException.class, OtherBusinessException.class)
.build();
CircuitBreaker customCircuitBreaker = CircuitBreaker
.of("testName", circuitBreakerConfig);
如果想要plugin in自己的registry实现,可以提供一个自定义的RegistryStore实现,并且通过Builder方法来plug in
CircuitBreakerRegistry registry = CircuitBreakerRegistry.custom()
.withRegistryStore(new YourRegistryStoreImplementation())
.withCircuitBreakerConfig(CircuitBreakerConfig.ofDefaults())
.build();
Decorate and execute a functional interface
CircuitBreaker可以针对callable, supplier, runnable, consumer, checkedrunnable, checkedsupplier, checkedconsumer, completionStage来进行decorate。
可以通过Try.of或Try.run来调用decorated function,这样允许链式调用,使用示例如下:
// Given
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("testName");
// When I decorate my function
CheckedFunction0<String> decoratedSupplier = CircuitBreaker
.decorateCheckedSupplier(circuitBreaker, () -> "This can be any method which returns: 'Hello");
// and chain an other function with map
Try<String> result = Try.of(decoratedSupplier)
.map(value -> value + " world'");
// Then the Try Monad returns a Success<String>, if all functions ran successfully.
assertThat(result.isSuccess()).isTrue();
assertThat(result.get()).isEqualTo("This can be any method which returns: 'Hello world'");
Consume emitted RegistryEvents
可以向CircuitBreakerRegistry注册一个event consumer,并且在CircuitBreaker被创建、替换、删除时执行对应的action
CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.ofDefaults();
circuitBreakerRegistry.getEventPublisher()
.onEntryAdded(entryAddedEvent -> {
CircuitBreaker addedCircuitBreaker = entryAddedEvent.getAddedEntry();
LOG.info("CircuitBreaker {} added", addedCircuitBreaker.getName());
})
.onEntryRemoved(entryRemovedEvent -> {
CircuitBreaker removedCircuitBreaker = entryRemovedEvent.getRemovedEntry();
LOG.info("CircuitBreaker {} removed", removedCircuitBreaker.getName());
});
consume emitted CircuitBreakerEvents
CircuitBreakerEvent在如下场景下会被发出:
- state transition
- circuit breaker reset
- successful call
- recorded error
- ignored error
所有的event都包含额外的信息,例如事件创建时间、call的处理时长等。如果想要消费该类事件,需要向CircuitBreaker注册事件:
circuitBreaker.getEventPublisher()
.onSuccess(event -> logger.info(...))
.onError(event -> logger.info(...))
.onIgnoredError(event -> logger.info(...))
.onReset(event -> logger.info(...))
.onStateTransition(event -> logger.info(...));
// Or if you want to register a consumer listening
// to all events, you can do:
circuitBreaker.getEventPublisher()
.onEvent(event -> logger.info(...));
可以使用CircularEventConsumer来对事件进行存储,事件会被存储在一个固定容量的circular buffer中
CircularEventConsumer<CircuitBreakerEvent> ringBuffer =
new CircularEventConsumer<>(10);
circuitBreaker.getEventPublisher().onEvent(ringBuffer);
List<CircuitBreakerEvent> bufferedEvents = ringBuffer.getBufferedEvents()
Bulkhead
resilence4j提供了两种bulkhead的实现,bukhead可以用于限制并发执行的数量:
SemaphoreBulkhead: 该bulkhead实现基于信号量FixedThreadPoolBulkhead: 该实现使用了a bounded queue and a fixed thread pool
create a BulkheadRegistry
和CircuitBreaker module类似,Bulkhead module也提供了in-memory的BulkheadRegistry和ThreadPoolBulkheadRegistry,用于管理Bulkhead实例,示例如下:
BulkheadRegistry bulkheadRegistry = BulkheadRegistry.ofDefaults();
ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry =
ThreadPoolBulkheadRegistry.ofDefaults();
Create and configure a Bulkhead
可以自己提供一个全局的BulkheadConfig,可通过BulkheadConfig builder来创建config对象,该config对象支持如下属性配置:
maxConcurentCalls:default value:25description:代表bulkhead允许的最大并行执行数量
maxWaitDuration:default value: 0description: 代表一个线程尝试进入饱和的bulkhead时,该线程的最长等待时间
示例如下所示:
// Create a custom configuration for a Bulkhead
BulkheadConfig config = BulkheadConfig.custom()
.maxConcurrentCalls(150)
.maxWaitDuration(Duration.ofMillis(500))
.build();
// Create a BulkheadRegistry with a custom global configuration
BulkheadRegistry registry = BulkheadRegistry.of(config);
// Get or create a Bulkhead from the registry -
// bulkhead will be backed by the default config
Bulkhead bulkheadWithDefaultConfig = registry.bulkhead("name1");
// Get or create a Bulkhead from the registry,
// use a custom configuration when creating the bulkhead
Bulkhead bulkheadWithCustomConfig = registry.bulkhead("name2", custom);
Create and configure a ThreadPoolBulkhead
可以提供一个自定义的global ThreadPoolBulkheadConfig,支持通过builder创建。
ThreadPoolBulkheadConfig支持如下配置属性:
maxThreadPoolSize:default value:Runtime.getRuntime().availableProcessors()description: 配置线程池的最大线程数
coreThreadPoolSize:default value:Runtime.getRuntime().availableProcessors()-1description: 配置线程池的核心线程数
queueCapacity:default value: 100descritpion: 配置queue的容量
keepAliveDuration:default value: 20 [ms]description: 当线程池中线程数量大于coreSize时,该值代表非核心线程在销毁前空闲的最大时长
writableStackTraceEnabled:default value: truedescritpion: 当bulkhead抛出异常时,是否打印除Stack Trace,当该值设置为false时,仅打印单行
ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom()
.maxThreadPoolSize(10)
.coreThreadPoolSize(2)
.queueCapacity(20)
.build();
// Create a BulkheadRegistry with a custom global configuration
ThreadPoolBulkheadRegistry registry = ThreadPoolBulkheadRegistry.of(config);
// Get or create a ThreadPoolBulkhead from the registry -
// bulkhead will be backed by the default config
ThreadPoolBulkhead bulkheadWithDefaultConfig = registry.bulkhead("name1");
// Get or create a Bulkhead from the registry,
// use a custom configuration when creating the bulkhead
ThreadPoolBulkheadConfig custom = ThreadPoolBulkheadConfig.custom()
.maxThreadPoolSize(5)
.build();
ThreadPoolBulkhead bulkheadWithCustomConfig = registry.bulkhead("name2", custom);
decorate and execute a functional interface
可以使用Bulkhead对callable, supplier, runnable, consumer, checkedrunnable, checkedsupplier, checkedconsumer, completionStage来进行decorate,示例如下:
// Given
Bulkhead bulkhead = Bulkhead.of("name", config);
// When I decorate my function
CheckedFunction0<String> decoratedSupplier = Bulkhead
.decorateCheckedSupplier(bulkhead, () -> "This can be any method which returns: 'Hello");
// and chain an other function with map
Try<String> result = Try.of(decoratedSupplier)
.map(value -> value + " world'");
// Then the Try Monad returns a Success<String>, if all functions ran successfully.
assertThat(result.isSuccess()).isTrue();
assertThat(result.get()).isEqualTo("This can be any method which returns: 'Hello world'");
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom()
.maxThreadPoolSize(10)
.coreThreadPoolSize(2)
.queueCapacity(20)
.build();
ThreadPoolBulkhead bulkhead = ThreadPoolBulkhead.of("name", config);
CompletionStage<String> supplier = ThreadPoolBulkhead
.executeSupplier(bulkhead, backendService::doSomething);
consume emitted RegistryEvents
可以向BulkheadRegistry注册event consumer,用于监听Bulkhead的创建、替换和删除事件
BulkheadRegistry registry = BulkheadRegistry.ofDefaults();
registry.getEventPublisher()
.onEntryAdded(entryAddedEvent -> {
Bulkhead addedBulkhead = entryAddedEvent.getAddedEntry();
LOG.info("Bulkhead {} added", addedBulkhead.getName());
})
.onEntryRemoved(entryRemovedEvent -> {
Bulkhead removedBulkhead = entryRemovedEvent.getRemovedEntry();
LOG.info("Bulkhead {} removed", removedBulkhead.getName());
});
consume emitted BulkheadEvents
BulkHead会发送BulkHeadEvent事件,发送的事件包含如下类型:
- permitted execution
- rejected execution
- finished execution
如果想要消费上述事件,可以按照如下示例注册event consumer
bulkhead.getEventPublisher()
.onCallPermitted(event -> logger.info(...))
.onCallRejected(event -> logger.info(...))
.onCallFinished(event -> logger.info(...));
ThreadPoolBulkhead弊端
在ThreadPoolBulkhead的实现中,会为每一个bulkhead都创建独立的线程池,故而应当避免在项目中创建大量的bulkhead,避免项目线程数量的膨胀以及线程切换带来的巨大开销。
RateLimiter
Resilience4j提供了RateLimiter,其将从epoch(某一特定时间点,详细见System.nanoTime方法的注释)开始的所有nanos划分为了一系列周期,每个周期的时长可以通过RateLimiterConfig.limitRefreshPeriod来进行配置。在每个周期的开始,RateLimiter都会将active permissions number设置为RateLimiterConfig.limitForPeriod。
RateLimiter的默认实现为AtomicRateLimiter,其通过AtomicReference来管理自身的状态。AtomicRateLimiter.State其本身是不可变的,并且包含如下fields:
activeCycle: 上次调用所使用的cycle numberactivePermissions: 在上次调用之后的available permissions,该field可以为负数nanosToWait:为了最后一次调用所需要等待的nanos数
除了AtomicRateLimiter之外,还存在SemaphoreBasedRateLimiter实现,其使用了semaphore和scheduler在每次RatelImiterConfig#limitRefreshPeriod之后对permissions进行刷新
Create RateLimiterRegistry
和CircuitBreaker module类似,RateLimiter module也提供了in-memory RateLimiterRegistry用于管理RateLimiter实例。
RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.ofDefaults();
Create and configure a RateLimiter
可以提供自定义的RateLimiterConfig,可通过builder进行创建。RateLimiterConfig支持如下配置属性:
timeoutDuration:default value: 5 [s]description: 线程等待permission的默认等待时间
limitRefreshPeriod:default value: 500 [ns]description: limit refresh period。在每个period刷新后,rate limiter都会将其permission count重新设置为limitForPeriod
limitForPeriod:default value: 50description: 一个refresh period内的可获取permissions数量
通过rateLimiter限制某些接口调用速率不能超过10 req/ms的示例如下:
RateLimiterConfig config = RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofMillis(1))
.limitForPeriod(10)
.timeoutDuration(Duration.ofMillis(25))
.build();
// Create registry
RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.of(config);
// Use registry
RateLimiter rateLimiterWithDefaultConfig = rateLimiterRegistry
.rateLimiter("name1");
RateLimiter rateLimiterWithCustomConfig = rateLimiterRegistry
.rateLimiter("name2", config);
decorate and execute a functional interface
RateLimiter支持对callable, supplier, runnable, consumer, checkedRunnalbe, checkedSupplier, checkedConsumer, CompletionStage进行decorate:
// Decorate your call to BackendService.doSomething()
CheckedRunnable restrictedCall = RateLimiter
.decorateCheckedRunnable(rateLimiter, backendService::doSomething);
Try.run(restrictedCall)
.andThenTry(restrictedCall)
.onFailure((RequestNotPermitted throwable) -> LOG.info("Wait before call it again :)"));
可以使用changeTimeoutDuration和changeLimitForPeriod在运行时改变rateLimiter的参数。new timeout并不会影响正在等待permissions的线程。并且,new limit不会影响当前period的permissions,new limiter会从下个period开始应用:
// Decorate your call to BackendService.doSomething()
CheckedRunnable restrictedCall = RateLimiter
.decorateCheckedRunnable(rateLimiter, backendService::doSomething);
// during second refresh cycle limiter will get 100 permissions
rateLimiter.changeLimitForPeriod(100);
consume emitted RegistryEvents
可以针对RateLimiterRegistry注册event consumer,对rateLimiter的创建、替换和删除事件进行监听:
RateLimiterRegistry registry = RateLimiterRegistry.ofDefaults();
registry.getEventPublisher()
.onEntryAdded(entryAddedEvent -> {
RateLimiter addedRateLimiter = entryAddedEvent.getAddedEntry();
LOG.info("RateLimiter {} added", addedRateLimiter.getName());
})
.onEntryRemoved(entryRemovedEvent -> {
RateLimiter removedRateLimiter = entryRemovedEvent.getRemovedEntry();
LOG.info("RateLimiter {} removed", removedRateLimiter.getName());
});
consume emitted RateLimiterEvents
RateLimiter会在如下场景下发送事件:
- a successful permission acquire
- acquire failure
所有的事件都包含额外信息,例如事件创建事件和rate limiter name等。如果想要消费事件,可以针对rateLimiter注册event consumer:
rateLimiter.getEventPublisher()
.onSuccess(event -> logger.info(...))
.onFailure(event -> logger.info(...));
如果使用project reactor,可以使用如下方式进行注册:
ReactorAdapter.toFlux(rateLimiter.getEventPublisher())
.filter(event -> event.getEventType() == FAILED_ACQUIRE)
.subscribe(event -> logger.info(...))
Retry
Create a RetryRegistry
和CircuitBreaker module类似,该module也提供了in-memory RetryRegistry,用于对Retry对象进行管理。
RetryRegistry retryRegistry = RetryRegistry.ofDefaults();
Create and configure Retry
可以提供一个自定义的global RetryConfig,可以使用builder来创建RetryConfig。
RetryConfig支持如下配置:
maxAttempts:default value: 3description:该参数代表最大尝试次数(包含最初始的调用,最初始的调用为first attemp)
waitDuration:default value: 500 [ms]description: 在多次retry attempts之间的固定等待间隔
intervalFunction:defaultValue:numOfAttempts -> waitDurationdescription: 该参数对应的function用于在失败后修改等待时间,默认情况下每次失败后等待时间是固定的,都是waitDuration
intervalBiFunction:default value:(int numOfAttempts, Either<Throwable, T> result)->waitDurationdescription: 该function用于在failure后修改等待时间间隔,即基于attempt number和result/exception来计算等待时间间隔。当同时使用intervalFunction和intervalBiFunction时,会抛出异常
retryOnResultPredicate:default value:result -> falsedescription: 该参数用于配置predicate,用于判断result是否应该被重试。如果result应当被重试,那么返回true,否则返回false
retryOnExceptionPredicate:default:throwable -> truedescription: 该参数用于判断是否exception应当被重试。如果exception应当被重试,predicate返回true,否则返回false
retryExceptions:default value: emptydescription: 配置exception list,其将被记录为failure并且应当被重试。
ignoreExceptions:default value: emptydescritpion: 配置exception list,该列表中的异常会被ignore并且不会重试
failAfterMaxAttempts:default value: falsedescription: 该参数用于启用和关闭MaxRetriesExceededException的抛出,当Retry达到配置的maxAttempts后,若result没有通过retryOnResultPredicate,则会根据failAfterMaxAttempts来重试
在抛出异常后,是否重试的逻辑如下:
- 根据predicate判断该异常是否应该重试,predicate逻辑判断流程如下
- 如果异常位于ignoreExceptions中,则不应重试
- 如果异常位于retryExceptions中,则predicate返回为true
- 如果异常不位于retryExceptions中,则根据retryOnExceptionPredicate来判断是否异常应当触发重试
- 如果上述的predicate判断异常应该被重试,那么再递增重试次数,判断当前重试是否超过maxAttempts
- 如果没有超过,则在等待interval后触发重试
- 如果超过maxAttempts规定的上限,则不再重试直接抛出异常
在未抛出异常时,判断是否重试的逻辑如下:
- 首先,根据
retryOnResultPredicate判断当前返回结果是否应当触发重试,如果不应触发重试,则流程结束- 如果应当触发重试,则增加当前的重试次数,并和maxAttempts进行比较
- 如果当前重试次数未超过maxAttempts,则在等待interval后触发重试
- 如果重试次数超过maxAttempts规定的值,那么将根据failAfterMaxAttempts来决定是否抛出异常。当failAfterMaxAttempts为true时,抛出异常;当为false时,不跑出异常。默认不会抛出异常。
创建RetryConfig的默认示例如下:
RetryConfig config = RetryConfig.custom()
.maxAttempts(2)
.waitDuration(Duration.ofMillis(1000))
.retryOnResult(response -> response.getStatus() == 500)
.retryOnException(e -> e instanceof WebServiceException)
.retryExceptions(IOException.class, TimeoutException.class)
.ignoreExceptions(BusinessException.class, OtherBusinessException.class)
.failAfterMaxAttempts(true)
.build();
// Create a RetryRegistry with a custom global configuration
RetryRegistry registry = RetryRegistry.of(config);
// Get or create a Retry from the registry -
// Retry will be backed by the default config
Retry retryWithDefaultConfig = registry.retry("name1");
// Get or create a Retry from the registry,
// use a custom configuration when creating the retry
RetryConfig custom = RetryConfig.custom()
.waitDuration(Duration.ofMillis(100))
.build();
Retry retryWithCustomConfig = registry.retry("name2", custom);
Decorate and execute a functional interface
Retry可以针对callable, supplier, runnable, consumer, checkedrunnable, checkedsupplier, checkedconsumer, completionstage进行decorate,使用示例如下:
// Given I have a HelloWorldService which throws an exception
HelloWorldService helloWorldService = mock(HelloWorldService.class);
given(helloWorldService.sayHelloWorld())
.willThrow(new WebServiceException("BAM!"));
// Create a Retry with default configuration
Retry retry = Retry.ofDefaults("id");
// Decorate the invocation of the HelloWorldService
CheckedFunction0<String> retryableSupplier = Retry
.decorateCheckedSupplier(retry, helloWorldService::sayHelloWorld);
// When I invoke the function
Try<String> result = Try.of(retryableSupplier)
.recover((throwable) -> "Hello world from recovery function");
// Then the helloWorldService should be invoked 3 times
BDDMockito.then(helloWorldService).should(times(3)).sayHelloWorld();
// and the exception should be handled by the recovery function
assertThat(result.get()).isEqualTo("Hello world from recovery function");
consume emitted RegistryEvents
可以向RetryRegistry注册监听,消费Retry的create, replace, delete事件
RetryRegistry registry = RetryRegistry.ofDefaults();
registry.getEventPublisher()
.onEntryAdded(entryAddedEvent -> {
Retry addedRetry = entryAddedEvent.getAddedEntry();
LOG.info("Retry {} added", addedRetry.getName());
})
.onEntryRemoved(entryRemovedEvent -> {
Retry removedRetry = entryRemovedEvent.getRemovedEntry();
LOG.info("Retry {} removed", removedRetry.getName());
});
use custom IntervalFunction
如果不想使用fixed wait duration,可以自定义IntervalFunction,该函数可以在每次attempt时独立计算wait duration。resilience4j支持一些工厂方法用于创建IntervalFunction,示例如下
IntervalFunction defaultWaitInterval = IntervalFunction
.ofDefaults();
// This interval function is used internally
// when you only configure waitDuration
IntervalFunction fixedWaitInterval = IntervalFunction
.of(Duration.ofSeconds(5));
IntervalFunction intervalWithExponentialBackoff = IntervalFunction
.ofExponentialBackoff();
IntervalFunction intervalWithCustomExponentialBackoff = IntervalFunction
.ofExponentialBackoff(IntervalFunction.DEFAULT_INITIAL_INTERVAL, 2d);
IntervalFunction randomWaitInterval = IntervalFunction
.ofRandomized();
// Overwrite the default intervalFunction with your custom one
RetryConfig retryConfig = RetryConfig.custom()
.intervalFunction(intervalWithExponentialBackoff)
.build();
intervalFunction和intervalBiFunction不能同时指定,同时指定时会抛出异常。
如果指定了intervalFunction,那么在通过builder创建RetryConfig时,会自动通过intervalFunction给intervalBiFunction也赋值。
如果指定了intervalFunction或intervalBiFunction中任一,则使用指定的函数来计算waitDuration,当二者都没有指定时,则waitDuration固定为waitDuration
TimeLimiter
Create a TimeLimiterRegistry
和CircuitBreaker module类似,TimeLimiter module支持提供in-memory TimeLimiterRegistry,可用于管理TimeLimiter实例。
TimeLimiterRegistry timeLimiterRegistry = TimeLimiterRegistry.ofDefaults();
Create and configure TimeLimiter
可以提供一个全局的自定义TimeLimiterConfig,支持通过builder创建。
TimeLimiterConfig支持配置如下两个参数:
- the timeout duration
- whether cancel should be called on the running future
使用示例如下所示:
TimeLimiterConfig config = TimeLimiterConfig.custom()
.cancelRunningFuture(true)
.timeoutDuration(Duration.ofMillis(500))
.build();
// Create a TimeLimiterRegistry with a custom global configuration
TimeLimiterRegistry timeLimiterRegistry = TimeLimiterRegistry.of(config);
// Get or create a TimeLimiter from the registry -
// TimeLimiter will be backed by the default config
TimeLimiter timeLimiterWithDefaultConfig = registry.timeLimiter("name1");
// Get or create a TimeLimiter from the registry,
// use a custom configuration when creating the TimeLimiter
TimeLimiterConfig config = TimeLimiterConfig.custom()
.cancelRunningFuture(false)
.timeoutDuration(Duration.ofMillis(1000))
.build();
TimeLimiter timeLimiterWithCustomConfig = registry.timeLimiter("name2", config);
decorate and execute a functional interface
TimeLimiter可以对CompletionStage和Future进行decorate,示例如下:
// Given I have a helloWorldService.sayHelloWorld() method which takes too long
HelloWorldService helloWorldService = mock(HelloWorldService.class);
// Create a TimeLimiter
TimeLimiter timeLimiter = TimeLimiter.of(Duration.ofSeconds(1));
// The Scheduler is needed to schedule a timeout on a non-blocking CompletableFuture
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3);
// The non-blocking variant with a CompletableFuture
CompletableFuture<String> result = timeLimiter.executeCompletionStage(
scheduler, () -> CompletableFuture.supplyAsync(helloWorldService::sayHelloWorld)).toCompletableFuture();
// The blocking variant which is basically future.get(timeoutDuration, MILLISECONDS)
String result = timeLimiter.executeFutureSupplier(
() -> CompletableFuture.supplyAsync(() -> helloWorldService::sayHelloWorld));
在TimeLimiter的实现中,timeoutDuration设置了等待future执行结束的最长等待时间,如果该等待时间超时,将会抛出TimeoutException。
当发生等待超时时,将会根据shouldCancelRunningFuture参数的配置来决定是否针对尚未执行完成的future调用cancel。
Spring Cloud CircuitBreaker
Spring Cloud Circuit Breaker提供了circuit breaker的抽象,可以让开发者自由选择circuit breaker的实现。目前,spring cloud支持如下circuit breaker的实现:
- Resilience4j
- Sentinel
- spring retry
core concepts
可以通过CircuitBreakerFactory api来创建circuit breaker。在classpath中包含spring cloud circuit breaker starter时,将会自动创建实现了CircuitBreakerFactory的bean。如下示例展示了如何使用该api:
@Service
public static class DemoControllerService {
private RestTemplate rest;
private CircuitBreakerFactory cbFactory;
public DemoControllerService(RestTemplate rest, CircuitBreakerFactory cbFactory) {
this.rest = rest;
this.cbFactory = cbFactory;
}
public String slow() {
return cbFactory.create("slow").run(() -> rest.getForObject("/slow", String.class), throwable -> "fallback");
}
}
CircuitBreakerFactory.create方法将会创建一个CircuitBreaker的实例,run方法将会接收一个Supplier和一个Function:
- supplier是实际被封装在circuitbreaker中的方法
- function是circuit breaker被触发之后的fallback
- function接收一个Throwable,throwable为导致该fallback被触发的异常
Configuration
可以通过创建Customizer类型的bean来配置circuit breaker。
在Resilience4JCircuitBreaker的实现中,每次调用circuitbreaker#run时,都会调用Customizer#customize方法,该行为可能会存在效率问题。故而,可以使用Customizer#once方法来创建Customizer,其会保证customizer中的逻辑最多只会被调用一次。
示例如下所示:
Customizer.once(circuitBreaker -> {
circuitBreaker.getEventPublisher()
.onStateTransition(event -> log.info("{}: {}", event.getCircuitBreakerName(), event.getStateTransition()));
}, CircuitBreaker::getName)