Files
rikako-note/spring/Spring Cloud/CircuitBreaker/CircuitBreaker.md
2025-09-03 11:07:26 +08:00

35 KiB
Raw Blame History

CircuitBreaker

resilience4j是一个轻量级的fault tolerance library其针对函数式编程进行设计。resilence4j提供了更高阶的函数decorator)来对function interface, lambda expression, method reference等内容进行增强。

decorators包含如下分类

  • CircuitBreaker
  • Rate Limiter
  • Retry
  • Bulkhead

对于任何function interface, lambda expression, method reference都可以使用多个decorators进行装饰。

Introduction

在如下示例中会展示如何通过CircuitBreaker和Retry来对lambda expression进行装饰令lambda在发生异常时最多重试3次。

可以针对多次retry之间的interval进行配置也支持自定义的backoff algorithm。

// Create a CircuitBreaker with default configuration
CircuitBreaker circuitBreaker = CircuitBreaker
  .ofDefaults("backendService");

// Create a Retry with default configuration
// 3 retry attempts and a fixed time interval between retries of 500ms
Retry retry = Retry
  .ofDefaults("backendService");

// Create a Bulkhead with default configuration
Bulkhead bulkhead = Bulkhead
  .ofDefaults("backendService");

Supplier<String> supplier = () -> backendService
  .doSomething(param1, param2)

// Decorate your call to backendService.doSomething() 
// with a Bulkhead, CircuitBreaker and Retry
// **note: you will need the resilience4j-all dependency for this
Supplier<String> decoratedSupplier = Decorators.ofSupplier(supplier)
  .withCircuitBreaker(circuitBreaker)
  .withBulkhead(bulkhead)
  .withRetry(retry)  
  .decorate();

// When you don't want to decorate your lambda expression,
// but just execute it and protect the call by a CircuitBreaker.
String result = circuitBreaker
  .executeSupplier(backendService::doSomething);

// You can also run the supplier asynchronously in a ThreadPoolBulkhead
 ThreadPoolBulkhead threadPoolBulkhead = ThreadPoolBulkhead
  .ofDefaults("backendService");

// The Scheduler is needed to schedule a timeout 
// on a non-blocking CompletableFuture
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
TimeLimiter timeLimiter = TimeLimiter.of(Duration.ofSeconds(1));

CompletableFuture<String> future = Decorators.ofSupplier(supplier)
    .withThreadPoolBulkhead(threadPoolBulkhead)
    .withTimeLimiter(timeLimiter, scheduledExecutorService)
    .withCircuitBreaker(circuitBreaker)
    .withFallback(asList(TimeoutException.class, 
                         CallNotPermittedException.class, 
                         BulkheadFullException.class),  
                  throwable -> "Hello from Recovery")
    .get().toCompletableFuture();

maven

resilence4j需要jdk17及以上如果使用maven可以按照如下方式来引入

引入所有包的方式如下

<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-all</artifactId>
    <version>${resilience4jVersion}</version>
</dependency>

按需引入方式如下

<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-circuitbreaker</artifactId>
    <version>${resilience4jVersion}</version>
</dependency>
<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-ratelimiter</artifactId>
    <version>${resilience4jVersion}</version>
</dependency>
<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-retry</artifactId>
    <version>${resilience4jVersion}</version>
</dependency>
<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-bulkhead</artifactId>
    <version>${resilience4jVersion}</version>
</dependency>
<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-cache</artifactId>
    <version>${resilience4jVersion}</version>
</dependency>
<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-timelimiter</artifactId>
    <version>${resilience4jVersion}</version>
</dependency>

CircuitBreaker

State Machine

CircuitBreaker通过有限状态机实现其拥有如下状态

  • CLOSED
  • OPEN
  • HALF_OPEN
  • METRICS_ONLY
  • DISABLED
  • FORCED_OPEN

其中,前三个状态为正常状态,后三个状态为特殊状态。

上述circuitbreaker状态转换逻辑如下所示

  • 处于CLOSED状态时,如果实际接口的失败率超过上限后,会从CLOSED状态转换为OPEN状态
  • 处于OPEN状态下经过一段时间后,会从OPEN状态转换为HALF_OPEN状态
  • 处于HALF_OPEN状态下,如果失败率小于上限,会从HALF_OPEN状态重新变为CLOSED状态
  • 如果HALF_OPEN状态下,失败率仍然超过上限,则会从HALF_OPEN状态重新变为OPEN状态

Sliding Window

CircuitBreaker会使用滑动窗口来存储和聚合调用结果。在使用CircuitBreaker时可以选择count-based的滑动窗口还是time-based滑动窗口。

  • count-basedcount-based滑动窗口会对最近N次调用的结果进行聚合
  • time-basedtime-based滑动窗口将会对最近N秒的调用结果进行聚合

Count-based sliding window

count-based sliding window是通过循环数组来实现的循环数组中包含了n个measurements。如果count window的大小为10那么circular array一直都会有10个measurements。

count-based的滑动窗口实现会total aggregation结果进行更新,更新逻辑如下:

  • 当一个新的调用返回结果后其结果将会被记录并且total aggregation也会被更新将新调用的结果加到total aggregation中
  • 发生新调用时循环数组中最老oldest的measurement将会被淘汰并且measurement也会从total aggregation中被减去bucket也会被重置bucket即measurementbucket被重置即代表oldest measurement会被重置

对于聚合结果的检查的开销是O(1)的,因为其是pre-aggregated并且和window size无关。

Time-based sliding window

Time-based sliding window其也是通过循环数组实现数组中含有N个partial aggregationbucket

如果time window大小是10秒那么circular array一直都会有10的buckets。每个bucket都对应了一个epoch secondbucket会对该epoch second内发生的调用结果进行聚合。Partial aggregation)。

在循环数组中head buket中存储了当前epoch second中发生的调用结果而其他的partial aggregation则存储的是之前second发生的调用结果。在Time-based的滑动窗口实现中并不会像Count-based那样独立的存储调用结果,而是增量的对partial aggregation进行更新。

除了更新Partial aggregationtime-based滑动窗口还会在新请求结果返回时total aggregation进行更新。当oldest bucket被淘汰时该bucket的partial aggregation也会从total aggregation中被减去并且bucket也会被重置。

检查聚合结果的开销也是O(1)Time-based滑动窗口也是pre-aggregated的。

partial aggregation中包含了3个integer用于记录如下信息

  • failed calls次数
  • slow calls次数
  • 总共的call次数

除此之外partial aggregation中还会包含一个long用于存储所有请求的总耗时

Failure rate and slow call rate thresholds

Failure rate & exception list

当failure rate大于等于配置的threshold时CircuitBreaker的状态将会从CLOSED变为OPEN

默认情况下所有的抛出的异常都会被统计为failure在使用时也可以指定一个exception list在exception list中的异常才会被统计为failure而不在exception list中的异常会被视为success。除此之外还可以对异常进行ignored,被忽视的异常既不会被统计为success也不会被统计为failure

Slow call rate

当slow call rate大于或等于配置的threshold时CircuitBreaker的状态也会从CLOSED变为OPEN。通过slow call rate可以降低外部系统的负载。

只有当记录的call数量达到最小数量时failure rate和slow call rate才能被计算。例如minimum number of required calls为10只有当被记录的calls次数达到10时failure rate和slow call rate才能被计算。如果当前只记录了9个calls即使9次调用全部都失败circuitbreaker也不会变为open状态。

CircuitBreaker in OPEN/HALF_OPEN state

circuitbreaker在OPEN状态时,会拒绝所有的调用,并且抛出CallNotPermittedException。在等待一段时间后,CircuitBreaker将会从OPEN状态转为HALF_OPEN状态,并允许一个configurable number数量的请求进行实际调用从而检测是否backend已经恢复并且可以再次访问。

处于HALF_OPEN状态的circuitbreaker假设permittedNumberOfCalls的数量为10此时存在20个调用那么前10个调用都能正常调用而后10个调用将会被拒绝并且抛出CallNotPermittedException

HALF_OPEN状态下如果failure rate或是slow call rate大于等于配置的threshold那么circuitbreaker状态将会转为OPEN。如果failure rate和slow call rate小于threshold那么circuitbreaker状态将变为CLOSED。

Special States

CircuitBreaker支持3个特殊状态

  • METRICS_ONLY:处于该状态时,其行为如下
    • 所有circuit breaker events都会正常生成除了state transition外并且metrics会正常记录
    • 该状态和CLOSED状态类似但是circuitbreaker在threshold达到时不会切换为OPEN状态
  • DISABLED
    • 没有CircuitBreakerEvent会被产生metrics也不会被记录
    • 会允许所有的访问
  • FORCED_OPEN:
    • 没有CircuitBreakerEvent会被产生metrics也不会被记录
    • 会拒绝所有的访问

退出这些特殊状态的方式如下:

  • 触发state transition
  • 对CircuitBreaker执行reset

thread-safe

CircuitBreaker线程安全但是CircuitBreaker并不会对function call进行串行化故而在使用CircuitBreaker时function call可能会并行执行

对于Closed状态的CircuitBreaker而言如果20个线程同时对cirbuitbreaker进行访问那么所有的方法调用都能同时并行执行即使滑动窗口的大小为15小于并行数量。滑动窗口的大小不会对方法的并行程度造成影响

如果想要对并行程度做出限制,可以使用Bulkhead

CircuitBreakerRegistry

resilence4j附带了一个in-memoryCircuitBreakerRegistry,基于ConcurrentHashMap实现。可以通过CircuitBreakerRegistry来管理创建和获取CircuitBreaker实例。可以按照如下示例根据默认的CircuitBreakerConfig来创建CircuitBreakerRegistry:

CircuitBreakerRegistry circuitBreakerRegistry = 
  CircuitBreakerRegistry.ofDefaults();

Create and configure CircuitBreakerConfig

除了使用默认的CircuitBreakerConfig外还可以提供自定义的CircuitBreakerConfig对象可以通过builder来构建。

CircuitBreakerConfig的可配置属性如下:

  • failureRateThreshold配置failure rate threshold的默认百分比
    • default value: 50
    • description当failure rate大于等于该threshold值时CircuitBreaker会切为OPEN状态,并开始short-circuiting calls
  • slowCallRateThreshold配置threshold百分比
    • default value: 100
    • description: 当slow calls的百分比等于或超过该threshold时CircuitBreaker会切换到OPEN状态并且开始short-circuiting calls
  • slowCallDurationThreshold: 配置slow calls的duration threshold
    • default value 60000 [ms]
    • description: 当call的耗时超过该duration threshold限制时会被认定为slow call并且会增加slow call rate
  • permittedNumberOfCallsInHalfOpenState:
    • default value: 10
    • description配置circuitbreaker切换到half open状态时permitted calls的数量
  • maxWairDurationInfHalfOpenState
    • default value: 0 [ms]
    • description:配置在CircuitBreaker从Half Open状态切换回Open状态前其可以处于Half Open抓过你太的最长时间。默认值为0代表其等待时间没有上限直到所有的permitted calls都执行完成
  • slidingWindowType:配置滑动窗口的类型
    • default value: COUNT_BASED
    • desceiption在CircuitBreaker处于closed状态时滑动窗口用于记录调用的结果。滑动窗口类型可以是count-basedtime-based
      • counted_based:会记录最后slidingWindowSize个请求的结果并对其进行聚合
      • time_based:会记录最后slidingWindowSize秒的调用结果,并且会对其进行聚合
  • slidingWindowSize:
    • default value: 100
    • description:用于配置滑动窗口的大小
  • minimumNumberOfCalls:
    • default value:100
    • description: 在可以计算error rate和slow call rate之前至少需要记录minimum number个调用。例如,如果minimumNumberOfCalls为10那么在可以计算failure rate前必须至少记录10个calls范围是整个滑动窗口期间范围内。如果已经计算了9个calls即使9个calls都调用失败在记录的请求数达到minimumNumberOfCalls之前也不会切换到open状态
  • waitDurationInOpenState
    • default value: 60000 [ms]
    • description:该值代表处于OPEN状态的CircuitBreaker在切换为HALF-OPEN之前,会等待的时间长度
  • automaticTransitionFromOpenToHalfOpenEnabled:
    • default value: false
    • description:
      • 如果该值设置为true,会创建一个线程来对所有CircuitBreakers对象进行监控并在waitDurationInOpenState设置的时间达到后将CircuitBreaker从Open切换到HALF_OPEN状态
      • 如果该值设置为false默认,那么CircuitBreakerOPEN状态切换到HALF_OPEN状态的过程并不由专门的线程来触发,而是由请求来触发。如果该值设置为false默认即使waitDurationInOpenState设置的时间达到,如果没有后续请求,那么从OPENHALF_OPEN的变化也不会自动被触发
  • recordExceptions:
    • default value: empty
    • description: 该属性用于配置exception list位于该exception list中的异常类型将会被视为failure并增加failure rate。
      • 任何匹配或者继承exception list中的异常将会被视为failure除非通过ignoreExceptions显式的忽略了该异常
      • 如果通过recordExcepions指定了exception list那么所有其他的异常都会被视为success,除非显式被ignoreExceptions指定
  • ignoreExceptions:
    • default value: empty
    • description: 用于配置exception list该list中配置的异常类型将会被忽略既不会被记录为failure也不会被记录为success
      • 任何匹配或者继承了该list中异常类型的异常将会被忽略即使该异常在recordExceptions中被指定
  • recordFailurePredicate
    • default value: throwable -> true
    • description: 一个自定义的predicate用于评估一个异常是否应该被记录为failure。如果该异常应当被记录为failure那么该predicate应当返回true如果该异常应当被记录为failure那么该predicate应当返回false
  • ignoreExceptionPredicate:
    • default value: throwable -> false
    • description: 一个自定义的predicate用于评估一个异常是否应当被忽略或是记录为failure/success。
      • 如果异常应当被忽略那么该predicate应当返回true
      • 如果异常不应该被忽略predicate应当返回为false该异常应该被视为failure/success
success/failure/ignore判断流程

在实际调用发生异常时决定将异常视为success/failure/ignore的判断流程如下

  • 如果实际调用未抛出异常,则记录为调用成功,否则继续
  • 如果抛出异常,首先会检查异常是否位于Ignored Exceptions中,如果位于其中,则忽略该异常,否则继续
  • 如果ignoreExceptionPredicate不为空根据该predicate进行判断如果返回为true则忽略该异常否则继续
  • 校验异常是否位于recordExceptions如果位于其中则将其视为failure否则继续
  • 如果recordFailurePredicate不为空根据recordFailurePredicate判断是否该异常应当被视为failure如果返回为true将其视为failure否则继续
  • 如果上述都不满足那么将其视为success

创建自定义CircuitBreakerConfig的示例如下所示:

// Create a custom configuration for a CircuitBreaker
CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
  .failureRateThreshold(50)
  .slowCallRateThreshold(50)
  .waitDurationInOpenState(Duration.ofMillis(1000))
  .slowCallDurationThreshold(Duration.ofSeconds(2))
  .permittedNumberOfCallsInHalfOpenState(3)
  .minimumNumberOfCalls(10)
  .slidingWindowType(SlidingWindowType.TIME_BASED)
  .slidingWindowSize(5)
  .recordException(e -> INTERNAL_SERVER_ERROR
                 .equals(getResponse().getStatus()))
  .recordExceptions(IOException.class, TimeoutException.class)
  .ignoreExceptions(BusinessException.class, OtherBusinessException.class)
  .build();

// Create a CircuitBreakerRegistry with a custom global configuration
CircuitBreakerRegistry circuitBreakerRegistry = 
  CircuitBreakerRegistry.of(circuitBreakerConfig);

// Get or create a CircuitBreaker from the CircuitBreakerRegistry 
// with the global default configuration
CircuitBreaker circuitBreakerWithDefaultConfig = 
  circuitBreakerRegistry.circuitBreaker("name1");

// Get or create a CircuitBreaker from the CircuitBreakerRegistry 
// with a custom configuration
CircuitBreaker circuitBreakerWithCustomConfig = circuitBreakerRegistry
  .circuitBreaker("name2", circuitBreakerConfig);

除此之外还可以在circuitRegistry中添加配置该配置可以被多个CircuitBreaker实例共享

CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
  .failureRateThreshold(70)
  .build();

circuitBreakerRegistry.addConfiguration("someSharedConfig", config);

CircuitBreaker circuitBreaker = circuitBreakerRegistry
  .circuitBreaker("name", "someSharedConfig");

并且可以针对默认配置进行overwrite

CircuitBreakerConfig defaultConfig = circuitBreakerRegistry
   .getDefaultConfig();

CircuitBreakerConfig overwrittenConfig = CircuitBreakerConfig
  .from(defaultConfig)
  .waitDurationInOpenState(Duration.ofSeconds(20))
  .build();

如果不想使用CircuitBreakerRegistry来管理CircuitBreaker实例也可以自己直接创建CircuitBreaker实例

// Create a custom configuration for a CircuitBreaker
CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
  .recordExceptions(IOException.class, TimeoutException.class)
  .ignoreExceptions(BusinessException.class, OtherBusinessException.class)
  .build();

CircuitBreaker customCircuitBreaker = CircuitBreaker
  .of("testName", circuitBreakerConfig);

如果想要plugin in自己的registry实现可以提供一个自定义的RegistryStore实现并且通过Builder方法来plug in

CircuitBreakerRegistry registry = CircuitBreakerRegistry.custom()
    .withRegistryStore(new YourRegistryStoreImplementation())
    .withCircuitBreakerConfig(CircuitBreakerConfig.ofDefaults())
    .build();

Decorate and execute a functional interface

CircuitBreaker可以针对callable, supplier, runnable, consumer, checkedrunnable, checkedsupplier, checkedconsumer, completionStage来进行decorate。

可以通过Try.ofTry.run来调用decorated function这样允许链式调用使用示例如下

// Given
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("testName");

// When I decorate my function
CheckedFunction0<String> decoratedSupplier = CircuitBreaker
        .decorateCheckedSupplier(circuitBreaker, () -> "This can be any method which returns: 'Hello");

// and chain an other function with map
Try<String> result = Try.of(decoratedSupplier)
                .map(value -> value + " world'");

// Then the Try Monad returns a Success<String>, if all functions ran successfully.
assertThat(result.isSuccess()).isTrue();
assertThat(result.get()).isEqualTo("This can be any method which returns: 'Hello world'");

Consume emitted RegistryEvents

可以向CircuitBreakerRegistry注册一个event consumer并且在CircuitBreaker被创建、替换、删除时执行对应的action

CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.ofDefaults();
circuitBreakerRegistry.getEventPublisher()
  .onEntryAdded(entryAddedEvent -> {
    CircuitBreaker addedCircuitBreaker = entryAddedEvent.getAddedEntry();
    LOG.info("CircuitBreaker {} added", addedCircuitBreaker.getName());
  })
  .onEntryRemoved(entryRemovedEvent -> {
    CircuitBreaker removedCircuitBreaker = entryRemovedEvent.getRemovedEntry();
    LOG.info("CircuitBreaker {} removed", removedCircuitBreaker.getName());
  });

consume emitted CircuitBreakerEvents

CircuitBreakerEvent在如下场景下会被发出

  • state transition
  • circuit breaker reset
  • successful call
  • recorded error
  • ignored error

所有的event都包含额外的信息例如事件创建时间、call的处理时长等。如果想要消费该类事件需要向CircuitBreaker注册事件

circuitBreaker.getEventPublisher()
    .onSuccess(event -> logger.info(...))
    .onError(event -> logger.info(...))
    .onIgnoredError(event -> logger.info(...))
    .onReset(event -> logger.info(...))
    .onStateTransition(event -> logger.info(...));
// Or if you want to register a consumer listening
// to all events, you can do:
circuitBreaker.getEventPublisher()
    .onEvent(event -> logger.info(...));

可以使用CircularEventConsumer来对事件进行存储事件会被存储在一个固定容量的circular buffer中

CircularEventConsumer<CircuitBreakerEvent> ringBuffer = 
  new CircularEventConsumer<>(10);
circuitBreaker.getEventPublisher().onEvent(ringBuffer);
List<CircuitBreakerEvent> bufferedEvents = ringBuffer.getBufferedEvents()

Bulkhead

resilence4j提供了两种bulkhead的实现bukhead可以用于限制并发执行的数量

  • SemaphoreBulkhead: 该bulkhead实现基于信号量
  • FixedThreadPoolBulkhead: 该实现使用了a bounded queue and a fixed thread pool

create a BulkheadRegistry

和CircuitBreaker module类似Bulkhead module也提供了in-memory的BulkheadRegistry和ThreadPoolBulkheadRegistry用于管理Bulkhead实例示例如下

BulkheadRegistry bulkheadRegistry = BulkheadRegistry.ofDefaults();

ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry = 
  ThreadPoolBulkheadRegistry.ofDefaults();

Create and configure a Bulkhead

可以自己提供一个全局的BulkheadConfig,可通过BulkheadConfig builder来创建config对象该config对象支持如下属性配置

  • maxConcurentCalls
    • default value25
    • description代表bulkhead允许的最大并行执行数量
  • maxWaitDuration:
    • default value 0
    • description: 代表一个线程尝试进入饱和的bulkhead时该线程的最长等待时间

示例如下所示:

// Create a custom configuration for a Bulkhead
BulkheadConfig config = BulkheadConfig.custom()
    .maxConcurrentCalls(150)
    .maxWaitDuration(Duration.ofMillis(500))
    .build();

// Create a BulkheadRegistry with a custom global configuration
BulkheadRegistry registry = BulkheadRegistry.of(config);

// Get or create a Bulkhead from the registry - 
// bulkhead will be backed by the default config
Bulkhead bulkheadWithDefaultConfig = registry.bulkhead("name1");

// Get or create a Bulkhead from the registry, 
// use a custom configuration when creating the bulkhead
Bulkhead bulkheadWithCustomConfig = registry.bulkhead("name2", custom);

Create and configure a ThreadPoolBulkhead

可以提供一个自定义的global ThreadPoolBulkheadConfig支持通过builder创建。

ThreadPoolBulkheadConfig支持如下配置属性:

  • maxThreadPoolSize
    • default value: Runtime.getRuntime().availableProcessors()
    • description: 配置线程池的最大线程数
  • coreThreadPoolSize:
    • default value: Runtime.getRuntime().availableProcessors()-1
    • description: 配置线程池的核心线程数
  • queueCapacity:
    • default value: 100
    • descritpion: 配置queue的容量
  • keepAliveDuration:
    • default value: 20 [ms]
    • description: 当线程池中线程数量大于coreSize时该值代表非核心线程在销毁前空闲的最大时长
  • writableStackTraceEnabled:
    • default value: true
    • descritpion: 当bulkhead抛出异常时是否打印除Stack Trace当该值设置为false时仅打印单行
ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom()
  .maxThreadPoolSize(10)
  .coreThreadPoolSize(2)
  .queueCapacity(20)
  .build();
        
// Create a BulkheadRegistry with a custom global configuration
ThreadPoolBulkheadRegistry registry = ThreadPoolBulkheadRegistry.of(config);

// Get or create a ThreadPoolBulkhead from the registry - 
// bulkhead will be backed by the default config
ThreadPoolBulkhead bulkheadWithDefaultConfig = registry.bulkhead("name1");

// Get or create a Bulkhead from the registry, 
// use a custom configuration when creating the bulkhead
ThreadPoolBulkheadConfig custom = ThreadPoolBulkheadConfig.custom()
  .maxThreadPoolSize(5)
  .build();

ThreadPoolBulkhead bulkheadWithCustomConfig = registry.bulkhead("name2", custom);

decorate and execute a functional interface

可以使用Bulkhead对callable, supplier, runnable, consumer, checkedrunnable, checkedsupplier, checkedconsumer, completionStage来进行decorate示例如下

// Given
Bulkhead bulkhead = Bulkhead.of("name", config);

// When I decorate my function
CheckedFunction0<String> decoratedSupplier = Bulkhead
  .decorateCheckedSupplier(bulkhead, () -> "This can be any method which returns: 'Hello");

// and chain an other function with map
Try<String> result = Try.of(decoratedSupplier)
  .map(value -> value + " world'");

// Then the Try Monad returns a Success<String>, if all functions ran successfully.
assertThat(result.isSuccess()).isTrue();
assertThat(result.get()).isEqualTo("This can be any method which returns: 'Hello world'");
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom()
    .maxThreadPoolSize(10)
    .coreThreadPoolSize(2)
    .queueCapacity(20)
    .build();

ThreadPoolBulkhead bulkhead = ThreadPoolBulkhead.of("name", config);

CompletionStage<String> supplier = ThreadPoolBulkhead
    .executeSupplier(bulkhead, backendService::doSomething);

consume emitted RegistryEvents

可以向BulkheadRegistry注册event consumer用于监听Bulkhead的创建、替换和删除事件

BulkheadRegistry registry = BulkheadRegistry.ofDefaults();
registry.getEventPublisher()
  .onEntryAdded(entryAddedEvent -> {
    Bulkhead addedBulkhead = entryAddedEvent.getAddedEntry();
    LOG.info("Bulkhead {} added", addedBulkhead.getName());
  })
  .onEntryRemoved(entryRemovedEvent -> {
    Bulkhead removedBulkhead = entryRemovedEvent.getRemovedEntry();
    LOG.info("Bulkhead {} removed", removedBulkhead.getName());
  });

consume emitted BulkheadEvents

BulkHead会发送BulkHeadEvent事件发送的事件包含如下类型

  • permitted execution
  • rejected execution
  • finished execution

如果想要消费上述事件可以按照如下示例注册event consumer

bulkhead.getEventPublisher()
    .onCallPermitted(event -> logger.info(...))
    .onCallRejected(event -> logger.info(...))
    .onCallFinished(event -> logger.info(...));

ThreadPoolBulkhead弊端

在ThreadPoolBulkhead的实现中会为每一个bulkhead都创建独立的线程池故而应当避免在项目中创建大量的bulkhead避免项目线程数量的膨胀以及线程切换带来的巨大开销。

RateLimiter

Resilience4j提供了RateLimiter其将从epochjvm启动开始开始的所有nanos划分为了一系列周期每个周期的时长可以通过RateLimiterConfig.limitRefreshPeriod来进行配置。在每个周期的开始RateLimiter都会将active permissions number设置为RateLimiterConfig.limitForPeriod

RateLimiter的默认实现为AtomicRateLimiter,其通过AtomicReference来管理自身的状态。AtomicRateLimiter.State其本身是不可变的并且包含如下fields:

  • activeCycle: 上次调用所使用的cycle number
  • activePermissions 在上次调用之后的available permissions该field可以为负数
  • nanosToWait为了最后一次调用所需要等待的nanos数

除了AtomicRateLimiter之外,还存在SemaphoreBasedRateLimiter实现其使用了semaphore和scheduler在每次RatelImiterConfig#limitRefreshPeriod之后对permissions进行刷新

Create RateLimiterRegistry

和CircuitBreaker module类似RateLimiter module也提供了in-memory RateLimiterRegistry用于管理RateLimiter实例。

RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.ofDefaults();

Create and configure a RateLimiter

可以提供自定义的RateLimiterConfig可通过builder进行创建。RateLimiterConfig支持如下配置属性:

  • timeoutDuration:
    • default value: 5 [s]
    • description: 线程等待permission的默认等待时间
  • limitRefreshPeriod:
    • default value: 500 [ns]
    • description: limit refresh period。在每个period刷新后rate limiter都会将其permission count重新设置为limitForPeriod
  • limitForPeriod:
    • default value: 50
    • description: 一个refresh period内的可获取permissions数量

通过rateLimiter限制某些接口调用速率不能超过10 req/ms的示例如下:

RateLimiterConfig config = RateLimiterConfig.custom()
  .limitRefreshPeriod(Duration.ofMillis(1))
  .limitForPeriod(10)
  .timeoutDuration(Duration.ofMillis(25))
  .build();

// Create registry
RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.of(config);

// Use registry
RateLimiter rateLimiterWithDefaultConfig = rateLimiterRegistry
  .rateLimiter("name1");

RateLimiter rateLimiterWithCustomConfig = rateLimiterRegistry
  .rateLimiter("name2", config);

decorate and execute a functional interface

RateLimiter支持对callable, supplier, runnable, consumer, checkedRunnalbe, checkedSupplier, checkedConsumer, CompletionStage进行decorate

// Decorate your call to BackendService.doSomething()
CheckedRunnable restrictedCall = RateLimiter
    .decorateCheckedRunnable(rateLimiter, backendService::doSomething);

Try.run(restrictedCall)
    .andThenTry(restrictedCall)
    .onFailure((RequestNotPermitted throwable) -> LOG.info("Wait before call it again :)"));

可以使用changeTimeoutDurationchangeLimitForPeriod在运行时改变rateLimiter的参数。new timeout并不会影响正在等待permissions的线程。并且new limit不会影响当前period的permissionsnew limiter会从下个period开始应用

// Decorate your call to BackendService.doSomething()
CheckedRunnable restrictedCall = RateLimiter
    .decorateCheckedRunnable(rateLimiter, backendService::doSomething);

// during second refresh cycle limiter will get 100 permissions
rateLimiter.changeLimitForPeriod(100);

consume emitted RegistryEvents

可以针对RateLimiterRegistry注册event consumer对rateLimiter的创建、替换和删除事件进行监听

RateLimiterRegistry registry = RateLimiterRegistry.ofDefaults();
registry.getEventPublisher()
  .onEntryAdded(entryAddedEvent -> {
    RateLimiter addedRateLimiter = entryAddedEvent.getAddedEntry();
    LOG.info("RateLimiter {} added", addedRateLimiter.getName());
  })
  .onEntryRemoved(entryRemovedEvent -> {
    RateLimiter removedRateLimiter = entryRemovedEvent.getRemovedEntry();
    LOG.info("RateLimiter {} removed", removedRateLimiter.getName());
  });

consume emitted RateLimiterEvents

RateLimiter会在如下场景下发送事件

  • a successful permission acquire
  • acquire failure

所有的事件都包含额外信息例如事件创建事件和rate limiter name等。如果想要消费事件可以针对rateLimiter注册event consumer

rateLimiter.getEventPublisher()
    .onSuccess(event -> logger.info(...))
    .onFailure(event -> logger.info(...));

如果使用project reactor可以使用如下方式进行注册

ReactorAdapter.toFlux(rateLimiter.getEventPublisher())
    .filter(event -> event.getEventType() == FAILED_ACQUIRE)
    .subscribe(event -> logger.info(...))