- [CircuitBreaker](#circuitbreaker) - [Introduction](#introduction) - [maven](#maven) - [CircuitBreaker](#circuitbreaker-1) - [State Machine](#state-machine) - [Sliding Window](#sliding-window) - [Count-based sliding window](#count-based-sliding-window) - [Time-based sliding window](#time-based-sliding-window) - [Failure rate and slow call rate thresholds](#failure-rate-and-slow-call-rate-thresholds) - [Failure rate \& exception list](#failure-rate--exception-list) - [Slow call rate](#slow-call-rate) - [CircuitBreaker in `OPEN`/`HALF_OPEN` state](#circuitbreaker-in-openhalf_open-state) - [Special States](#special-states) - [thread-safe](#thread-safe) - [CircuitBreakerRegistry](#circuitbreakerregistry) - [Create and configure CircuitBreakerConfig](#create-and-configure-circuitbreakerconfig) - [success/failure/ignore判断流程](#successfailureignore判断流程) - [Decorate and execute a functional interface](#decorate-and-execute-a-functional-interface) - [Consume emitted RegistryEvents](#consume-emitted-registryevents) - [consume emitted CircuitBreakerEvents](#consume-emitted-circuitbreakerevents) - [Bulkhead](#bulkhead) - [create a BulkheadRegistry](#create-a-bulkheadregistry) - [Create and configure a Bulkhead](#create-and-configure-a-bulkhead) - [Create and configure a ThreadPoolBulkhead](#create-and-configure-a-threadpoolbulkhead) - [decorate and execute a functional interface](#decorate-and-execute-a-functional-interface-1) - [consume emitted RegistryEvents](#consume-emitted-registryevents-1) - [consume emitted BulkheadEvents](#consume-emitted-bulkheadevents) - [ThreadPoolBulkhead弊端](#threadpoolbulkhead弊端) - [RateLimiter](#ratelimiter) - [Create RateLimiterRegistry](#create-ratelimiterregistry) - [Create and configure a RateLimiter](#create-and-configure-a-ratelimiter) - [decorate and execute a functional interface](#decorate-and-execute-a-functional-interface-2) - [consume emitted RegistryEvents](#consume-emitted-registryevents-2) - [consume emitted RateLimiterEvents](#consume-emitted-ratelimiterevents) - [Retry](#retry) - [Create a RetryRegistry](#create-a-retryregistry) - [Create and configure Retry](#create-and-configure-retry) - [Decorate and execute a functional interface](#decorate-and-execute-a-functional-interface-3) - [consume emitted RegistryEvents](#consume-emitted-registryevents-3) - [use custom IntervalFunction](#use-custom-intervalfunction) - [TimeLimiter](#timelimiter) - [Create a TimeLimiterRegistry](#create-a-timelimiterregistry) - [Create and configure TimeLimiter](#create-and-configure-timelimiter) - [decorate and execute a functional interface](#decorate-and-execute-a-functional-interface-4) - [Spring Cloud CircuitBreaker](#spring-cloud-circuitbreaker) - [core concepts](#core-concepts) - [Configuration](#configuration) # CircuitBreaker resilience4j是一个轻量级的fault tolerance library,其针对函数式编程进行设计。resilence4j提供了更高阶的函数(`decorator`)来对`function interface, lambda expression, method reference`等内容进行增强。 decorators包含如下分类: - CircuitBreaker - Rate Limiter - Retry - Bulkhead 对于任何`function interface, lambda expression, method reference`,都可以使用多个decorators进行装饰。 ## Introduction 在如下示例中,会展示如何通过CircuitBreaker和Retry来对lambda expression进行装饰,令lambda在发生异常时最多重试3次。 可以针对多次retry之间的interval进行配置,也支持自定义的backoff algorithm。 ```java // Create a CircuitBreaker with default configuration CircuitBreaker circuitBreaker = CircuitBreaker .ofDefaults("backendService"); // Create a Retry with default configuration // 3 retry attempts and a fixed time interval between retries of 500ms Retry retry = Retry .ofDefaults("backendService"); // Create a Bulkhead with default configuration Bulkhead bulkhead = Bulkhead .ofDefaults("backendService"); Supplier supplier = () -> backendService .doSomething(param1, param2) // Decorate your call to backendService.doSomething() // with a Bulkhead, CircuitBreaker and Retry // **note: you will need the resilience4j-all dependency for this Supplier decoratedSupplier = Decorators.ofSupplier(supplier) .withCircuitBreaker(circuitBreaker) .withBulkhead(bulkhead) .withRetry(retry) .decorate(); // When you don't want to decorate your lambda expression, // but just execute it and protect the call by a CircuitBreaker. String result = circuitBreaker .executeSupplier(backendService::doSomething); // You can also run the supplier asynchronously in a ThreadPoolBulkhead ThreadPoolBulkhead threadPoolBulkhead = ThreadPoolBulkhead .ofDefaults("backendService"); // The Scheduler is needed to schedule a timeout // on a non-blocking CompletableFuture ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3); TimeLimiter timeLimiter = TimeLimiter.of(Duration.ofSeconds(1)); CompletableFuture future = Decorators.ofSupplier(supplier) .withThreadPoolBulkhead(threadPoolBulkhead) .withTimeLimiter(timeLimiter, scheduledExecutorService) .withCircuitBreaker(circuitBreaker) .withFallback(asList(TimeoutException.class, CallNotPermittedException.class, BulkheadFullException.class), throwable -> "Hello from Recovery") .get().toCompletableFuture(); ``` ### maven resilence4j需要jdk17及以上,如果使用maven,可以按照如下方式来引入 引入所有包的方式如下 ```xml io.github.resilience4j resilience4j-all ${resilience4jVersion} ``` 按需引入方式如下 ```xml io.github.resilience4j resilience4j-circuitbreaker ${resilience4jVersion} io.github.resilience4j resilience4j-ratelimiter ${resilience4jVersion} io.github.resilience4j resilience4j-retry ${resilience4jVersion} io.github.resilience4j resilience4j-bulkhead ${resilience4jVersion} io.github.resilience4j resilience4j-cache ${resilience4jVersion} io.github.resilience4j resilience4j-timelimiter ${resilience4jVersion} ``` ## CircuitBreaker ### State Machine CircuitBreaker通过有限状态机实现,其拥有如下状态: - `CLOSED` - `OPEN` - `HALF_OPEN` - `METRICS_ONLY` - `DISABLED` - `FORCED_OPEN` 其中,前三个状态为正常状态,后三个状态为特殊状态。 上述circuitbreaker状态转换逻辑如下所示: - 处于`CLOSED`状态时,如果实际接口的失败率超过上限后,会从`CLOSED`状态转换为`OPEN`状态 - 处于`OPEN`状态下经过一段时间后,会从`OPEN`状态转换为`HALF_OPEN`状态 - 处于`HALF_OPEN`状态下,如果失败率小于上限,会从`HALF_OPEN`状态重新变为`CLOSED`状态 - 如果`HALF_OPEN`状态下,失败率仍然超过上限,则会从`HALF_OPEN`状态重新变为`OPEN`状态 ### Sliding Window CircuitBreaker会使用`滑动窗口`来存储和聚合调用结果。在使用CircuitBreaker时,可以选择`count-based`的滑动窗口还是`time-based`滑动窗口。 - `count-based`:`count-based`滑动窗口会对最近`N`次调用的结果进行聚合 - `time-based`:`time-based`滑动窗口将会对最近`N`秒的调用结果进行聚合 #### Count-based sliding window count-based sliding window是通过循环数组来实现的,循环数组中包含了n个measurements。如果count window的大小为10,那么circular array一直都会有10个measurements。 count-based的滑动窗口实现会`total aggregation`结果进行更新,更新逻辑如下: - 当一个新的调用返回结果后,其结果将会被记录,并且total aggregation也会被更新,将新调用的结果加到total aggregation中 - 发生新调用时,循环数组中最老(oldest)的measurement将会被淘汰,并且measurement也会从total aggregation中被减去,bucket也会被重置(bucket即measurement,bucket被重置即代表oldest measurement会被重置) 对于聚合结果的检查的开销是`O(1)`的,因为其是`pre-aggregated`的,并且和window size无关。 #### Time-based sliding window Time-based sliding window其也是通过循环数组实现,数组中含有`N`个partial aggregation(bucket)。 如果time window大小是10秒,那么circular array一直都会有10的buckets。每个bucket都对应了一个epoch second,bucket会对该epoch second内发生的调用结果进行聚合。(`Partial aggregation`)。 在循环数组中,head buket中存储了当前epoch second中发生的调用结果,而其他的partial aggregation则存储的是之前second发生的调用结果。在Time-based的滑动窗口实现中,并不会像`Count-based`那样独立的存储调用结果,而是增量的对`partial aggregation`进行更新。 除了更新`Partial aggregation`外,time-based滑动窗口还会在新请求结果返回时,对`total aggregation`进行更新。当oldest bucket被淘汰时,该bucket的partial aggregation也会从total aggregation中被减去,并且bucket也会被重置。 检查聚合结果的开销也是`O(1)`的,Time-based滑动窗口也是`pre-aggregated`的。 partial aggregation中包含了3个integer,用于记录如下信息: - failed calls次数 - slow calls次数 - 总共的call次数 除此之外,partial aggregation中还会包含一个long,用于存储所有请求的总耗时 ### Failure rate and slow call rate thresholds #### Failure rate & exception list 当failure rate`大于等于`配置的threshold时,CircuitBreaker的状态将会从`CLOSED`变为`OPEN`。 默认情况下,所有的抛出的异常都会被统计为failure,在使用时也可以指定一个`exception list`,在exception list中的异常才会被统计为failure,而不在exception list中的异常会被视为success。除此之外,还可以对异常进行`ignored`,被忽视的异常`既不会被统计为success,也不会被统计为failure`。 #### Slow call rate 当slow call rate大于或等于配置的threshold时,CircuitBreaker的状态也会从`CLOSED`变为`OPEN`。通过slow call rate,可以降低外部系统的负载。 只有`当记录的call数量达到最小数量时`,failure rate和slow call rate才能被计算。例如,`minimum number of required calls`为10,只有当被记录的calls次数达到10时,failure rate和slow call rate才能被计算。`如果当前只记录了9个calls,即使9次调用全部都失败,circuitbreaker也不会变为open状态。` #### CircuitBreaker in `OPEN`/`HALF_OPEN` state circuitbreaker在`OPEN`状态时,会拒绝所有的调用,并且抛出`CallNotPermittedException`。在等待一段时间后,`CircuitBreaker`将会从`OPEN`状态转为`HALF_OPEN`状态,并允许一个`configurable number`数量的请求进行实际调用,从而检测是否backend已经恢复并且可以再次访问。 处于`HALF_OPEN`状态的circuitbreaker,假设`permittedNumberOfCalls`的数量为10,此时存在20个调用,那么前10个调用都能正常调用,而后10个调用将会被拒绝,并且抛出`CallNotPermittedException`。 在`HALF_OPEN`状态下,如果failure rate或是slow call rate大于等于配置的threshold,那么circuitbreaker状态将会转为OPEN。如果failure rate和slow call rate小于threshold,那么circuitbreaker状态将变为CLOSED。 ### Special States CircuitBreaker支持3个特殊状态: - `METRICS_ONLY`:处于该状态时,其行为如下 - 所有`circuit breaker events`都会正常生成(除了state transition外),并且metrics会正常记录 - 该状态和`CLOSED`状态类似,但是circuitbreaker在threshold达到时,不会切换为OPEN状态 - `DISABLED`: - 没有`CircuitBreakerEvent`会被产生,metrics也不会被记录 - 会允许所有的访问 - `FORCED_OPEN`: - 没有`CircuitBreakerEvent`会被产生,metrics也不会被记录 - 会拒绝所有的访问 退出这些特殊状态的方式如下: - 触发state transition - 对CircuitBreaker执行reset ### thread-safe `CircuitBreaker线程安全`,但是CircuitBreaker并不会对function call进行串行化,故而`在使用CircuitBreaker时,function call可能会并行执行`。 对于Closed状态的CircuitBreaker而言,如果20个线程同时对cirbuitbreaker进行访问,那么所有的方法调用都能同时并行执行,即使滑动窗口的大小为`15`小于并行数量。`滑动窗口的大小不会对方法的并行程度造成影响`。 如果想要对并行程度做出限制,可以使用`Bulkhead`。 ### CircuitBreakerRegistry resilence4j附带了一个`in-memory`的`CircuitBreakerRegistry`,基于`ConcurrentHashMap`实现。可以通过`CircuitBreakerRegistry`来管理(创建和获取)CircuitBreaker实例。可以按照如下示例,根据`默认的CircuitBreakerConfig`来创建`CircuitBreakerRegistry`: ```java CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.ofDefaults(); ``` #### Create and configure CircuitBreakerConfig 除了使用默认的CircuitBreakerConfig外,还可以提供自定义的`CircuitBreakerConfig`,对象可以通过builder来构建。 `CircuitBreakerConfig`的可配置属性如下: - `failureRateThreshold`:配置failure rate threshold的默认百分比 - `default value`: 50 - `description`:当failure rate`大于等于`该threshold值时,CircuitBreaker会切为`OPEN`状态,并开始`short-circuiting calls` - `slowCallRateThreshold`:配置threshold百分比 - `default value`: 100 - `description`: 当slow calls的百分比等于或超过该threshold时,CircuitBreaker会切换到`OPEN`状态,并且开始short-circuiting calls - `slowCallDurationThreshold`: 配置slow calls的duration threshold - `default value`: 60000 [ms] - `description`: 当call的耗时超过该duration threshold限制时,会被认定为slow call,并且会增加slow call rate - `permittedNumberOfCallsInHalfOpenState`: - `default value`: 10 - `description`:配置circuitbreaker切换到half open状态时,permitted calls的数量 - `maxWairDurationInfHalfOpenState`: - `default value`: 0 [ms] - `description`:配置`在CircuitBreaker从Half Open状态切换回Open状态前,其可以处于Half Open抓过你太的最长时间`。默认值为`0`,代表其等待时间没有上限,直到所有的permitted calls都执行完成 - `slidingWindowType`:配置滑动窗口的类型 - `default value`: `COUNT_BASED` - `desceiption`:在CircuitBreaker处于closed状态时,滑动窗口用于记录调用的结果。滑动窗口类型可以是`count-based`和`time-based` - `counted_based`:会记录最后`slidingWindowSize`个请求的结果并对其进行聚合 - `time_based`:会记录最后`slidingWindowSize`秒的调用结果,并且会对其进行聚合 - `slidingWindowSize`: - `default value`: 100 - `description`:用于配置滑动窗口的大小 - `minimumNumberOfCalls`: - `default value`:100 - `description`: 在可以计算error rate和slow call rate之前,至少需要记录`minimum number`个调用。例如,如果`minimumNumberOfCalls`为10,那么在可以计算failure rate前,必须至少记录10个calls(范围是整个滑动窗口期间范围内)。如果已经计算了9个calls,即使9个calls都调用失败,在记录的请求数达到`minimumNumberOfCalls`之前,也不会切换到open状态 - `waitDurationInOpenState`: - `default value`: 60000 [ms] - `description`:该值代表`处于OPEN状态的CircuitBreaker`在切换为`HALF-OPEN`之前,会等待的时间长度 - `automaticTransitionFromOpenToHalfOpenEnabled`: - `default value`: false - `description`: - `如果该值设置为true`,会创建一个线程来对所有`CircuitBreakers`对象进行监控,并在waitDurationInOpenState设置的时间达到后将CircuitBreaker从Open切换到HALF_OPEN状态 - `如果该值设置为false(默认)`,那么`CircuitBreaker`从`OPEN`状态切换到`HALF_OPEN`状态的过程并不由专门的线程来触发,`而是由请求来触发`。如果该值设置为false(默认),即使`waitDurationInOpenState`设置的时间达到,如果没有后续请求,那么从`OPEN`到`HALF_OPEN`的变化也不会自动被触发 - `recordExceptions`: - `default value`: empty - `description`: 该属性用于配置exception list,位于该exception list中的异常类型将会被视为failure,并增加failure rate。 - 任何匹配或者继承exception list中的异常将会被视为failure,除非通过`ignoreExceptions`显式的忽略了该异常 - 如果通过`recordExcepions`指定了exception list,那么所有其他的异常都会被视为`success`,除非显式被`ignoreExceptions`指定 - `ignoreExceptions`: - `default value`: empty - `description`: 用于配置exception list,该list中配置的异常类型将会被忽略,既不会被记录为failure也不会被记录为success - 任何匹配或者继承了该list中异常类型的异常将会被忽略,即使该异常在`recordExceptions`中被指定 - `recordFailurePredicate`: - `default value`: throwable -> true - `description`: 一个自定义的predicate,用于评估一个异常是否应该被记录为failure。如果该异常应当被记录为failure,那么该predicate应当返回true;如果该异常应当被记录为failure,那么该predicate应当返回false - `ignoreExceptionPredicate`: - `default value`: throwable -> false - `description`: 一个自定义的predicate,用于评估一个异常是否应当被忽略或是记录为failure/success。 - 如果异常应当被忽略,那么该predicate应当返回true - 如果异常不应该被忽略,predicate应当返回为false,该异常应该被视为failure/success ##### success/failure/ignore判断流程 在实际调用发生异常时,决定将异常视为success/failure/ignore的判断流程如下: - 如果实际调用未抛出异常,则记录为调用成功,否则继续 - 如果抛出异常,首先会检查异常是否位于`Ignored Exceptions`中,如果位于其中,则忽略该异常,否则继续 - 如果ignoreExceptionPredicate不为空,根据该predicate进行判断,如果返回为true,则忽略该异常,否则继续 - 校验异常是否位于`recordExceptions`中,如果位于其中,则将其视为failure,否则继续 - 如果recordFailurePredicate不为空,根据`recordFailurePredicate`判断是否该异常应当被视为failure,如果返回为true,将其视为failure,否则继续 - 如果上述都不满足,那么将其视为success 创建自定义`CircuitBreakerConfig`的示例如下所示: ```java // Create a custom configuration for a CircuitBreaker CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom() .failureRateThreshold(50) .slowCallRateThreshold(50) .waitDurationInOpenState(Duration.ofMillis(1000)) .slowCallDurationThreshold(Duration.ofSeconds(2)) .permittedNumberOfCallsInHalfOpenState(3) .minimumNumberOfCalls(10) .slidingWindowType(SlidingWindowType.TIME_BASED) .slidingWindowSize(5) .recordException(e -> INTERNAL_SERVER_ERROR .equals(getResponse().getStatus())) .recordExceptions(IOException.class, TimeoutException.class) .ignoreExceptions(BusinessException.class, OtherBusinessException.class) .build(); // Create a CircuitBreakerRegistry with a custom global configuration CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.of(circuitBreakerConfig); // Get or create a CircuitBreaker from the CircuitBreakerRegistry // with the global default configuration CircuitBreaker circuitBreakerWithDefaultConfig = circuitBreakerRegistry.circuitBreaker("name1"); // Get or create a CircuitBreaker from the CircuitBreakerRegistry // with a custom configuration CircuitBreaker circuitBreakerWithCustomConfig = circuitBreakerRegistry .circuitBreaker("name2", circuitBreakerConfig); ``` 除此之外,还可以在circuitRegistry中添加配置,该配置可以被多个CircuitBreaker实例共享 ```java CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom() .failureRateThreshold(70) .build(); circuitBreakerRegistry.addConfiguration("someSharedConfig", config); CircuitBreaker circuitBreaker = circuitBreakerRegistry .circuitBreaker("name", "someSharedConfig"); ``` 并且,可以针对默认配置进行overwrite ```java CircuitBreakerConfig defaultConfig = circuitBreakerRegistry .getDefaultConfig(); CircuitBreakerConfig overwrittenConfig = CircuitBreakerConfig .from(defaultConfig) .waitDurationInOpenState(Duration.ofSeconds(20)) .build(); ``` 如果不想使用CircuitBreakerRegistry来管理CircuitBreaker实例,也可以自己直接创建CircuitBreaker实例: ```java // Create a custom configuration for a CircuitBreaker CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom() .recordExceptions(IOException.class, TimeoutException.class) .ignoreExceptions(BusinessException.class, OtherBusinessException.class) .build(); CircuitBreaker customCircuitBreaker = CircuitBreaker .of("testName", circuitBreakerConfig); ``` 如果想要plugin in自己的registry实现,可以提供一个自定义的`RegistryStore`实现,并且通过Builder方法来plug in ```java CircuitBreakerRegistry registry = CircuitBreakerRegistry.custom() .withRegistryStore(new YourRegistryStoreImplementation()) .withCircuitBreakerConfig(CircuitBreakerConfig.ofDefaults()) .build(); ``` ### Decorate and execute a functional interface CircuitBreaker可以针对`callable, supplier, runnable, consumer, checkedrunnable, checkedsupplier, checkedconsumer, completionStage`来进行decorate。 可以通过`Try.of`或`Try.run`来调用decorated function,这样允许链式调用,使用示例如下: ```java // Given CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("testName"); // When I decorate my function CheckedFunction0 decoratedSupplier = CircuitBreaker .decorateCheckedSupplier(circuitBreaker, () -> "This can be any method which returns: 'Hello"); // and chain an other function with map Try result = Try.of(decoratedSupplier) .map(value -> value + " world'"); // Then the Try Monad returns a Success, if all functions ran successfully. assertThat(result.isSuccess()).isTrue(); assertThat(result.get()).isEqualTo("This can be any method which returns: 'Hello world'"); ``` ### Consume emitted RegistryEvents 可以向CircuitBreakerRegistry注册一个event consumer,并且在`CircuitBreaker`被创建、替换、删除时执行对应的action ```java CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.ofDefaults(); circuitBreakerRegistry.getEventPublisher() .onEntryAdded(entryAddedEvent -> { CircuitBreaker addedCircuitBreaker = entryAddedEvent.getAddedEntry(); LOG.info("CircuitBreaker {} added", addedCircuitBreaker.getName()); }) .onEntryRemoved(entryRemovedEvent -> { CircuitBreaker removedCircuitBreaker = entryRemovedEvent.getRemovedEntry(); LOG.info("CircuitBreaker {} removed", removedCircuitBreaker.getName()); }); ``` ### consume emitted CircuitBreakerEvents CircuitBreakerEvent在如下场景下会被发出: - state transition - circuit breaker reset - successful call - recorded error - ignored error 所有的event都包含额外的信息,例如事件创建时间、call的处理时长等。如果想要消费该类事件,需要向CircuitBreaker注册事件: ```java circuitBreaker.getEventPublisher() .onSuccess(event -> logger.info(...)) .onError(event -> logger.info(...)) .onIgnoredError(event -> logger.info(...)) .onReset(event -> logger.info(...)) .onStateTransition(event -> logger.info(...)); // Or if you want to register a consumer listening // to all events, you can do: circuitBreaker.getEventPublisher() .onEvent(event -> logger.info(...)); ``` 可以使用`CircularEventConsumer`来对事件进行存储,事件会被存储在一个固定容量的circular buffer中 ```java CircularEventConsumer ringBuffer = new CircularEventConsumer<>(10); circuitBreaker.getEventPublisher().onEvent(ringBuffer); List bufferedEvents = ringBuffer.getBufferedEvents() ``` ## Bulkhead resilence4j提供了两种bulkhead的实现,bukhead可以用于限制并发执行的数量: - `SemaphoreBulkhead`: 该bulkhead实现基于信号量 - `FixedThreadPoolBulkhead`: 该实现使用了`a bounded queue and a fixed thread pool` ### create a BulkheadRegistry 和CircuitBreaker module类似,Bulkhead module也提供了in-memory的`BulkheadRegistry和ThreadPoolBulkheadRegistry`,用于管理Bulkhead实例,示例如下: ```java BulkheadRegistry bulkheadRegistry = BulkheadRegistry.ofDefaults(); ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry = ThreadPoolBulkheadRegistry.ofDefaults(); ``` ### Create and configure a Bulkhead 可以自己提供一个全局的`BulkheadConfig`,可通过`BulkheadConfig` builder来创建config对象,该config对象支持如下属性配置: - `maxConcurentCalls`: - `default value`:25 - `description`:代表bulkhead允许的最大并行执行数量 - `maxWaitDuration`: - `default value`: 0 - `description`: 代表一个线程尝试进入饱和的bulkhead时,该线程的最长等待时间 示例如下所示: ```java // Create a custom configuration for a Bulkhead BulkheadConfig config = BulkheadConfig.custom() .maxConcurrentCalls(150) .maxWaitDuration(Duration.ofMillis(500)) .build(); // Create a BulkheadRegistry with a custom global configuration BulkheadRegistry registry = BulkheadRegistry.of(config); // Get or create a Bulkhead from the registry - // bulkhead will be backed by the default config Bulkhead bulkheadWithDefaultConfig = registry.bulkhead("name1"); // Get or create a Bulkhead from the registry, // use a custom configuration when creating the bulkhead Bulkhead bulkheadWithCustomConfig = registry.bulkhead("name2", custom); ``` ### Create and configure a ThreadPoolBulkhead 可以提供一个自定义的global `ThreadPoolBulkheadConfig`,支持通过builder创建。 `ThreadPoolBulkheadConfig`支持如下配置属性: - `maxThreadPoolSize`: - `default value`: `Runtime.getRuntime().availableProcessors()` - `description`: 配置线程池的最大线程数 - `coreThreadPoolSize`: - `default value`: `Runtime.getRuntime().availableProcessors()-1` - `description`: 配置线程池的核心线程数 - `queueCapacity`: - `default value`: 100 - `descritpion`: 配置queue的容量 - `keepAliveDuration`: - `default value`: 20 [ms] - `description`: 当线程池中线程数量大于coreSize时,该值代表非核心线程在销毁前空闲的最大时长 - `writableStackTraceEnabled`: - `default value`: true - `descritpion`: 当bulkhead抛出异常时,是否打印除Stack Trace,当该值设置为false时,仅打印单行 ```java ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom() .maxThreadPoolSize(10) .coreThreadPoolSize(2) .queueCapacity(20) .build(); // Create a BulkheadRegistry with a custom global configuration ThreadPoolBulkheadRegistry registry = ThreadPoolBulkheadRegistry.of(config); // Get or create a ThreadPoolBulkhead from the registry - // bulkhead will be backed by the default config ThreadPoolBulkhead bulkheadWithDefaultConfig = registry.bulkhead("name1"); // Get or create a Bulkhead from the registry, // use a custom configuration when creating the bulkhead ThreadPoolBulkheadConfig custom = ThreadPoolBulkheadConfig.custom() .maxThreadPoolSize(5) .build(); ThreadPoolBulkhead bulkheadWithCustomConfig = registry.bulkhead("name2", custom); ``` ### decorate and execute a functional interface 可以使用Bulkhead对`callable, supplier, runnable, consumer, checkedrunnable, checkedsupplier, checkedconsumer, completionStage`来进行decorate,示例如下: ```java // Given Bulkhead bulkhead = Bulkhead.of("name", config); // When I decorate my function CheckedFunction0 decoratedSupplier = Bulkhead .decorateCheckedSupplier(bulkhead, () -> "This can be any method which returns: 'Hello"); // and chain an other function with map Try result = Try.of(decoratedSupplier) .map(value -> value + " world'"); // Then the Try Monad returns a Success, if all functions ran successfully. assertThat(result.isSuccess()).isTrue(); assertThat(result.get()).isEqualTo("This can be any method which returns: 'Hello world'"); assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1); ``` ```java ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom() .maxThreadPoolSize(10) .coreThreadPoolSize(2) .queueCapacity(20) .build(); ThreadPoolBulkhead bulkhead = ThreadPoolBulkhead.of("name", config); CompletionStage supplier = ThreadPoolBulkhead .executeSupplier(bulkhead, backendService::doSomething); ``` ### consume emitted RegistryEvents 可以向`BulkheadRegistry`注册event consumer,用于监听Bulkhead的创建、替换和删除事件 ```java BulkheadRegistry registry = BulkheadRegistry.ofDefaults(); registry.getEventPublisher() .onEntryAdded(entryAddedEvent -> { Bulkhead addedBulkhead = entryAddedEvent.getAddedEntry(); LOG.info("Bulkhead {} added", addedBulkhead.getName()); }) .onEntryRemoved(entryRemovedEvent -> { Bulkhead removedBulkhead = entryRemovedEvent.getRemovedEntry(); LOG.info("Bulkhead {} removed", removedBulkhead.getName()); }); ``` ### consume emitted BulkheadEvents BulkHead会发送BulkHeadEvent事件,发送的事件包含如下类型: - permitted execution - rejected execution - finished execution 如果想要消费上述事件,可以按照如下示例注册event consumer ```java bulkhead.getEventPublisher() .onCallPermitted(event -> logger.info(...)) .onCallRejected(event -> logger.info(...)) .onCallFinished(event -> logger.info(...)); ``` ### ThreadPoolBulkhead弊端 在ThreadPoolBulkhead的实现中,会为每一个bulkhead都创建独立的线程池,故而应当避免在项目中创建大量的bulkhead,避免项目线程数量的膨胀以及线程切换带来的巨大开销。 ## RateLimiter Resilience4j提供了RateLimiter,其将从`epoch`(`某一特定时间点,详细见System.nanoTime方法的注释`)开始的所有nanos划分为了一系列周期,每个周期的时长可以通过`RateLimiterConfig.limitRefreshPeriod`来进行配置。在每个周期的开始,RateLimiter都会将`active permissions number`设置为`RateLimiterConfig.limitForPeriod`。 RateLimiter的默认实现为`AtomicRateLimiter`,其通过`AtomicReference`来管理自身的状态。`AtomicRateLimiter.State`其本身是不可变的,并且包含如下fields: - `activeCycle`: 上次调用所使用的cycle number - `activePermissions`: 在上次调用之后的available permissions,该field可以为负数 - `nanosToWait`:为了最后一次调用所需要等待的nanos数 除了`AtomicRateLimiter`之外,还存在`SemaphoreBasedRateLimiter`实现,其使用了semaphore和scheduler在每次`RatelImiterConfig#limitRefreshPeriod`之后对permissions进行刷新 ### Create RateLimiterRegistry 和CircuitBreaker module类似,RateLimiter module也提供了in-memory RateLimiterRegistry用于管理RateLimiter实例。 ```java RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.ofDefaults(); ``` ### Create and configure a RateLimiter 可以提供自定义的`RateLimiterConfig`,可通过builder进行创建。`RateLimiterConfig`支持如下配置属性: - `timeoutDuration`: - `default value`: 5 [s] - `description`: 线程等待permission的默认等待时间 - `limitRefreshPeriod`: - `default value`: 500 [ns] - `description`: limit refresh period。在每个period刷新后,rate limiter都会将其permission count重新设置为`limitForPeriod` - `limitForPeriod`: - `default value`: 50 - `description`: 一个refresh period内的可获取permissions数量 通过rateLimiter限制某些接口调用速率不能超过`10 req/ms`的示例如下: ```java RateLimiterConfig config = RateLimiterConfig.custom() .limitRefreshPeriod(Duration.ofMillis(1)) .limitForPeriod(10) .timeoutDuration(Duration.ofMillis(25)) .build(); // Create registry RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.of(config); // Use registry RateLimiter rateLimiterWithDefaultConfig = rateLimiterRegistry .rateLimiter("name1"); RateLimiter rateLimiterWithCustomConfig = rateLimiterRegistry .rateLimiter("name2", config); ``` ### decorate and execute a functional interface RateLimiter支持对`callable, supplier, runnable, consumer, checkedRunnalbe, checkedSupplier, checkedConsumer, CompletionStage`进行decorate: ```java // Decorate your call to BackendService.doSomething() CheckedRunnable restrictedCall = RateLimiter .decorateCheckedRunnable(rateLimiter, backendService::doSomething); Try.run(restrictedCall) .andThenTry(restrictedCall) .onFailure((RequestNotPermitted throwable) -> LOG.info("Wait before call it again :)")); ``` 可以使用`changeTimeoutDuration`和`changeLimitForPeriod`在运行时改变rateLimiter的参数。new timeout并不会影响`正在等待permissions`的线程。并且,new limit不会影响当前period的permissions,new limiter会从下个period开始应用: ```java // Decorate your call to BackendService.doSomething() CheckedRunnable restrictedCall = RateLimiter .decorateCheckedRunnable(rateLimiter, backendService::doSomething); // during second refresh cycle limiter will get 100 permissions rateLimiter.changeLimitForPeriod(100); ``` ### consume emitted RegistryEvents 可以针对`RateLimiterRegistry`注册event consumer,对rateLimiter的创建、替换和删除事件进行监听: ```java RateLimiterRegistry registry = RateLimiterRegistry.ofDefaults(); registry.getEventPublisher() .onEntryAdded(entryAddedEvent -> { RateLimiter addedRateLimiter = entryAddedEvent.getAddedEntry(); LOG.info("RateLimiter {} added", addedRateLimiter.getName()); }) .onEntryRemoved(entryRemovedEvent -> { RateLimiter removedRateLimiter = entryRemovedEvent.getRemovedEntry(); LOG.info("RateLimiter {} removed", removedRateLimiter.getName()); }); ``` ### consume emitted RateLimiterEvents RateLimiter会在如下场景下发送事件: - a successful permission acquire - acquire failure 所有的事件都包含额外信息,例如事件创建事件和rate limiter name等。如果想要消费事件,可以针对rateLimiter注册event consumer: ```java rateLimiter.getEventPublisher() .onSuccess(event -> logger.info(...)) .onFailure(event -> logger.info(...)); ``` 如果使用project reactor,可以使用如下方式进行注册: ```java ReactorAdapter.toFlux(rateLimiter.getEventPublisher()) .filter(event -> event.getEventType() == FAILED_ACQUIRE) .subscribe(event -> logger.info(...)) ``` ## Retry ### Create a RetryRegistry 和CircuitBreaker module类似,该module也提供了in-memory RetryRegistry,用于对Retry对象进行管理。 ```java RetryRegistry retryRegistry = RetryRegistry.ofDefaults(); ``` ### Create and configure Retry 可以提供一个自定义的global RetryConfig,可以使用builder来创建RetryConfig。 RetryConfig支持如下配置: - `maxAttempts`: - `default value`: 3 - `description`:该参数代表最大尝试次数(包含最初始的调用,最初始的调用为first attemp) - `waitDuration`: - `default value`: 500 [ms] - `description`: 在多次retry attempts之间的固定等待间隔 - `intervalFunction`: - `defaultValue`: `numOfAttempts -> waitDuration` - `description`: 该参数对应的function用于在失败后修改等待时间,默认情况下每次失败后等待时间是固定的,都是waitDuration - `intervalBiFunction`: - `default value`: `(int numOfAttempts, Either result)->waitDuration` - `description`: 该function用于在failure后修改等待时间间隔,即基于attempt number和result/exception来计算等待时间间隔。当同时使用`intervalFunction`和`intervalBiFunction`时,会抛出异常 - `retryOnResultPredicate`: - `default value`: `result -> false` - `description`: 该参数用于配置predicate,用于判断result是否应该被重试。如果result应当被重试,那么返回true,否则返回false - `retryOnExceptionPredicate`: - `default`: `throwable -> true` - `description`: 该参数用于判断是否exception应当被重试。如果exception应当被重试,predicate返回true,否则返回false - `retryExceptions`: - `default value`: empty - `description`: 配置exception list,其将被记录为failure并且应当被重试。 - `ignoreExceptions`: - `default value`: empty - `descritpion`: 配置exception list,该列表中的异常会被ignore并且不会重试 - `failAfterMaxAttempts`: - `default value`: false - `description`: 该参数用于启用和关闭`MaxRetriesExceededException`的抛出,当Retry达到配置的maxAttempts后,若result没有通过`retryOnResultPredicate`,则会根据`failAfterMaxAttempts`来重试 > 在抛出异常后,是否重试的逻辑如下: > - 根据predicate判断该异常是否应该重试,predicate逻辑判断流程如下 > - 如果异常位于ignoreExceptions中,则不应重试 > - 如果异常位于retryExceptions中,则predicate返回为true > - 如果异常不位于retryExceptions中,则根据retryOnExceptionPredicate来判断是否异常应当触发重试 > - 如果上述的predicate判断异常应该被重试,那么再递增重试次数,判断当前重试是否超过maxAttempts > - 如果没有超过,则在等待interval后触发重试 > - 如果超过maxAttempts规定的上限,则不再重试直接抛出异常 > 在未抛出异常时,判断是否重试的逻辑如下: > - 首先,根据`retryOnResultPredicate`判断当前返回结果是否应当触发重试,如果不应触发重试,则流程结束 > - 如果应当触发重试,则增加当前的重试次数,并和maxAttempts进行比较 > - 如果当前重试次数未超过maxAttempts,则在等待interval后触发重试 > - 如果重试次数超过maxAttempts规定的值,那么将根据failAfterMaxAttempts来决定是否抛出异常。当failAfterMaxAttempts为true时,抛出异常;当为false时,不跑出异常。默认不会抛出异常。 创建RetryConfig的默认示例如下: ```java RetryConfig config = RetryConfig.custom() .maxAttempts(2) .waitDuration(Duration.ofMillis(1000)) .retryOnResult(response -> response.getStatus() == 500) .retryOnException(e -> e instanceof WebServiceException) .retryExceptions(IOException.class, TimeoutException.class) .ignoreExceptions(BusinessException.class, OtherBusinessException.class) .failAfterMaxAttempts(true) .build(); // Create a RetryRegistry with a custom global configuration RetryRegistry registry = RetryRegistry.of(config); // Get or create a Retry from the registry - // Retry will be backed by the default config Retry retryWithDefaultConfig = registry.retry("name1"); // Get or create a Retry from the registry, // use a custom configuration when creating the retry RetryConfig custom = RetryConfig.custom() .waitDuration(Duration.ofMillis(100)) .build(); Retry retryWithCustomConfig = registry.retry("name2", custom); ``` ### Decorate and execute a functional interface Retry可以针对`callable, supplier, runnable, consumer, checkedrunnable, checkedsupplier, checkedconsumer, completionstage`进行decorate,使用示例如下: ```java // Given I have a HelloWorldService which throws an exception HelloWorldService helloWorldService = mock(HelloWorldService.class); given(helloWorldService.sayHelloWorld()) .willThrow(new WebServiceException("BAM!")); // Create a Retry with default configuration Retry retry = Retry.ofDefaults("id"); // Decorate the invocation of the HelloWorldService CheckedFunction0 retryableSupplier = Retry .decorateCheckedSupplier(retry, helloWorldService::sayHelloWorld); // When I invoke the function Try result = Try.of(retryableSupplier) .recover((throwable) -> "Hello world from recovery function"); // Then the helloWorldService should be invoked 3 times BDDMockito.then(helloWorldService).should(times(3)).sayHelloWorld(); // and the exception should be handled by the recovery function assertThat(result.get()).isEqualTo("Hello world from recovery function"); ``` ### consume emitted RegistryEvents 可以向RetryRegistry注册监听,消费Retry的create, replace, delete事件 ```java RetryRegistry registry = RetryRegistry.ofDefaults(); registry.getEventPublisher() .onEntryAdded(entryAddedEvent -> { Retry addedRetry = entryAddedEvent.getAddedEntry(); LOG.info("Retry {} added", addedRetry.getName()); }) .onEntryRemoved(entryRemovedEvent -> { Retry removedRetry = entryRemovedEvent.getRemovedEntry(); LOG.info("Retry {} removed", removedRetry.getName()); }); ``` ### use custom IntervalFunction 如果不想使用fixed wait duration,可以自定义`IntervalFunction`,该函数可以在每次attempt时独立计算wait duration。resilience4j支持一些工厂方法用于创建IntervalFunction,示例如下 ```java IntervalFunction defaultWaitInterval = IntervalFunction .ofDefaults(); // This interval function is used internally // when you only configure waitDuration IntervalFunction fixedWaitInterval = IntervalFunction .of(Duration.ofSeconds(5)); IntervalFunction intervalWithExponentialBackoff = IntervalFunction .ofExponentialBackoff(); IntervalFunction intervalWithCustomExponentialBackoff = IntervalFunction .ofExponentialBackoff(IntervalFunction.DEFAULT_INITIAL_INTERVAL, 2d); IntervalFunction randomWaitInterval = IntervalFunction .ofRandomized(); // Overwrite the default intervalFunction with your custom one RetryConfig retryConfig = RetryConfig.custom() .intervalFunction(intervalWithExponentialBackoff) .build(); ``` > intervalFunction和intervalBiFunction不能同时指定,同时指定时会抛出异常。 > > 如果指定了intervalFunction,那么在通过builder创建RetryConfig时,会自动通过intervalFunction给intervalBiFunction也赋值。 > > 如果指定了intervalFunction或intervalBiFunction中任一,则使用指定的函数来计算waitDuration,当二者都没有指定时,则waitDuration固定为waitDuration ## TimeLimiter ### Create a TimeLimiterRegistry 和`CircuitBreaker` module类似,TimeLimiter module支持提供in-memory TimeLimiterRegistry,可用于管理TimeLimiter实例。 ```java TimeLimiterRegistry timeLimiterRegistry = TimeLimiterRegistry.ofDefaults(); ``` ### Create and configure TimeLimiter 可以提供一个全局的自定义TimeLimiterConfig,支持通过builder创建。 TimeLimiterConfig支持配置如下两个参数: - the timeout duration - whether cancel should be called on the running future 使用示例如下所示: ```java TimeLimiterConfig config = TimeLimiterConfig.custom() .cancelRunningFuture(true) .timeoutDuration(Duration.ofMillis(500)) .build(); // Create a TimeLimiterRegistry with a custom global configuration TimeLimiterRegistry timeLimiterRegistry = TimeLimiterRegistry.of(config); // Get or create a TimeLimiter from the registry - // TimeLimiter will be backed by the default config TimeLimiter timeLimiterWithDefaultConfig = registry.timeLimiter("name1"); // Get or create a TimeLimiter from the registry, // use a custom configuration when creating the TimeLimiter TimeLimiterConfig config = TimeLimiterConfig.custom() .cancelRunningFuture(false) .timeoutDuration(Duration.ofMillis(1000)) .build(); TimeLimiter timeLimiterWithCustomConfig = registry.timeLimiter("name2", config); ``` ### decorate and execute a functional interface TimeLimiter可以对`CompletionStage`和`Future`进行decorate,示例如下: ```java // Given I have a helloWorldService.sayHelloWorld() method which takes too long HelloWorldService helloWorldService = mock(HelloWorldService.class); // Create a TimeLimiter TimeLimiter timeLimiter = TimeLimiter.of(Duration.ofSeconds(1)); // The Scheduler is needed to schedule a timeout on a non-blocking CompletableFuture ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3); // The non-blocking variant with a CompletableFuture CompletableFuture result = timeLimiter.executeCompletionStage( scheduler, () -> CompletableFuture.supplyAsync(helloWorldService::sayHelloWorld)).toCompletableFuture(); // The blocking variant which is basically future.get(timeoutDuration, MILLISECONDS) String result = timeLimiter.executeFutureSupplier( () -> CompletableFuture.supplyAsync(() -> helloWorldService::sayHelloWorld)); ``` 在TimeLimiter的实现中,`timeoutDuration`设置了`等待future执行结束的最长等待时间`,如果该等待时间超时,将会抛出`TimeoutException`。 当发生等待超时时,将会根据`shouldCancelRunningFuture`参数的配置来决定`是否针对尚未执行完成的future调用cancel`。 ## Spring Cloud CircuitBreaker Spring Cloud Circuit Breaker提供了circuit breaker的抽象,可以让开发者自由选择circuit breaker的实现。目前,spring cloud支持如下circuit breaker的实现: - Resilience4j - Sentinel - spring retry ### core concepts 可以通过`CircuitBreakerFactory` api来创建circuit breaker。在classpath中包含spring cloud circuit breaker starter时,将会自动创建实现了`CircuitBreakerFactory`的bean。如下示例展示了如何使用该api: ```java @Service public static class DemoControllerService { private RestTemplate rest; private CircuitBreakerFactory cbFactory; public DemoControllerService(RestTemplate rest, CircuitBreakerFactory cbFactory) { this.rest = rest; this.cbFactory = cbFactory; } public String slow() { return cbFactory.create("slow").run(() -> rest.getForObject("/slow", String.class), throwable -> "fallback"); } } ``` `CircuitBreakerFactory.create`方法将会创建一个`CircuitBreaker`的实例,run方法将会接收一个Supplier和一个Function: - supplier是实际被封装在circuitbreaker中的方法 - function是circuit breaker被触发之后的fallback - function接收一个Throwable,throwable为导致该fallback被触发的异常 ### Configuration 可以通过创建`Customizer`类型的bean来配置circuit breaker。 在`Resilience4JCircuitBreaker`的实现中,每次调用`circuitbreaker#run`时,都会调用`Customizer#customize`方法,该行为可能会存在效率问题。故而,可以使用`Customizer#once`方法来创建Customizer,其会保证customizer中的逻辑最多只会被调用一次。 示例如下所示: ```java Customizer.once(circuitBreaker -> { circuitBreaker.getEventPublisher() .onStateTransition(event -> log.info("{}: {}", event.getCircuitBreakerName(), event.getStateTransition())); }, CircuitBreaker::getName) ```