28 KiB
CircuitBreaker
resilience4j是一个轻量级的fault tolerance library,其针对函数式编程进行设计。resilence4j提供了更高阶的函数(decorator)来对function interface, lambda expression, method reference等内容进行增强。
decorators包含如下分类:
- CircuitBreaker
- Rate Limiter
- Retry
- Bulkhead
对于任何function interface, lambda expression, method reference,都可以使用多个decorators进行装饰。
Introduction
在如下示例中,会展示如何通过CircuitBreaker和Retry来对lambda expression进行装饰,令lambda在发生异常时最多重试3次。
可以针对多次retry之间的interval进行配置,也支持自定义的backoff algorithm。
// Create a CircuitBreaker with default configuration
CircuitBreaker circuitBreaker = CircuitBreaker
.ofDefaults("backendService");
// Create a Retry with default configuration
// 3 retry attempts and a fixed time interval between retries of 500ms
Retry retry = Retry
.ofDefaults("backendService");
// Create a Bulkhead with default configuration
Bulkhead bulkhead = Bulkhead
.ofDefaults("backendService");
Supplier<String> supplier = () -> backendService
.doSomething(param1, param2)
// Decorate your call to backendService.doSomething()
// with a Bulkhead, CircuitBreaker and Retry
// **note: you will need the resilience4j-all dependency for this
Supplier<String> decoratedSupplier = Decorators.ofSupplier(supplier)
.withCircuitBreaker(circuitBreaker)
.withBulkhead(bulkhead)
.withRetry(retry)
.decorate();
// When you don't want to decorate your lambda expression,
// but just execute it and protect the call by a CircuitBreaker.
String result = circuitBreaker
.executeSupplier(backendService::doSomething);
// You can also run the supplier asynchronously in a ThreadPoolBulkhead
ThreadPoolBulkhead threadPoolBulkhead = ThreadPoolBulkhead
.ofDefaults("backendService");
// The Scheduler is needed to schedule a timeout
// on a non-blocking CompletableFuture
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
TimeLimiter timeLimiter = TimeLimiter.of(Duration.ofSeconds(1));
CompletableFuture<String> future = Decorators.ofSupplier(supplier)
.withThreadPoolBulkhead(threadPoolBulkhead)
.withTimeLimiter(timeLimiter, scheduledExecutorService)
.withCircuitBreaker(circuitBreaker)
.withFallback(asList(TimeoutException.class,
CallNotPermittedException.class,
BulkheadFullException.class),
throwable -> "Hello from Recovery")
.get().toCompletableFuture();
maven
resilence4j需要jdk17及以上,如果使用maven,可以按照如下方式来引入
引入所有包的方式如下
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-all</artifactId>
<version>${resilience4jVersion}</version>
</dependency>
按需引入方式如下
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-circuitbreaker</artifactId>
<version>${resilience4jVersion}</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-ratelimiter</artifactId>
<version>${resilience4jVersion}</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-retry</artifactId>
<version>${resilience4jVersion}</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-bulkhead</artifactId>
<version>${resilience4jVersion}</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-cache</artifactId>
<version>${resilience4jVersion}</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-timelimiter</artifactId>
<version>${resilience4jVersion}</version>
</dependency>
CircuitBreaker
State Machine
CircuitBreaker通过有限状态机实现,其拥有如下状态:
CLOSEDOPENHALF_OPENMETRICS_ONLYDISABLEDFORCED_OPEN
其中,前三个状态为正常状态,后三个状态为特殊状态。
上述circuitbreaker状态转换逻辑如下所示:
- 处于
CLOSED状态时,如果实际接口的失败率超过上限后,会从CLOSED状态转换为OPEN状态 - 处于
OPEN状态下经过一段时间后,会从OPEN状态转换为HALF_OPEN状态 - 处于
HALF_OPEN状态下,如果失败率小于上限,会从HALF_OPEN状态重新变为CLOSED状态 - 如果
HALF_OPEN状态下,失败率仍然超过上限,则会从HALF_OPEN状态重新变为OPEN状态
Sliding Window
CircuitBreaker会使用滑动窗口来存储和聚合调用结果。在使用CircuitBreaker时,可以选择count-based的滑动窗口还是time-based滑动窗口。
count-based:count-based滑动窗口会对最近N次调用的结果进行聚合time-based:time-based滑动窗口将会对最近N秒的调用结果进行聚合
Count-based sliding window
count-based sliding window是通过循环数组来实现的,循环数组中包含了n个measurements。如果count window的大小为10,那么circular array一直都会有10个measurements。
count-based的滑动窗口实现会total aggregation结果进行更新,更新逻辑如下:
- 当一个新的调用返回结果后,其结果将会被记录,并且total aggregation也会被更新,将新调用的结果加到total aggregation中
- 发生新调用时,循环数组中最老(oldest)的measurement将会被淘汰,并且measurement也会从total aggregation中被减去,bucket也会被重置(bucket即measurement,bucket被重置即代表oldest measurement会被重置)
对于聚合结果的检查的开销是O(1)的,因为其是pre-aggregated的,并且和window size无关。
Time-based sliding window
Time-based sliding window其也是通过循环数组实现,数组中含有N个partial aggregation(bucket)。
如果time window大小是10秒,那么circular array一直都会有10的buckets。每个bucket都对应了一个epoch second,bucket会对该epoch second内发生的调用结果进行聚合。(Partial aggregation)。
在循环数组中,head buket中存储了当前epoch second中发生的调用结果,而其他的partial aggregation则存储的是之前second发生的调用结果。在Time-based的滑动窗口实现中,并不会像Count-based那样独立的存储调用结果,而是增量的对partial aggregation进行更新。
除了更新Partial aggregation外,time-based滑动窗口还会在新请求结果返回时,对total aggregation进行更新。当oldest bucket被淘汰时,该bucket的partial aggregation也会从total aggregation中被减去,并且bucket也会被重置。
检查聚合结果的开销也是O(1)的,Time-based滑动窗口也是pre-aggregated的。
partial aggregation中包含了3个integer,用于记录如下信息:
- failed calls次数
- slow calls次数
- 总共的call次数
除此之外,partial aggregation中还会包含一个long,用于存储所有请求的总耗时
Failure rate and slow call rate thresholds
Failure rate & exception list
当failure rate大于等于配置的threshold时,CircuitBreaker的状态将会从CLOSED变为OPEN。
默认情况下,所有的抛出的异常都会被统计为failure,在使用时也可以指定一个exception list,在exception list中的异常才会被统计为failure,而不在exception list中的异常会被视为success。除此之外,还可以对异常进行ignored,被忽视的异常既不会被统计为success,也不会被统计为failure。
Slow call rate
当slow call rate大于或等于配置的threshold时,CircuitBreaker的状态也会从CLOSED变为OPEN。通过slow call rate,可以降低外部系统的负载。
只有当记录的call数量达到最小数量时,failure rate和slow call rate才能被计算。例如,minimum number of required calls为10,只有当被记录的calls次数达到10时,failure rate和slow call rate才能被计算。如果当前只记录了9个calls,即使9次调用全部都失败,circuitbreaker也不会变为open状态。
CircuitBreaker in OPEN/HALF_OPEN state
circuitbreaker在OPEN状态时,会拒绝所有的调用,并且抛出CallNotPermittedException。在等待一段时间后,CircuitBreaker将会从OPEN状态转为HALF_OPEN状态,并允许一个configurable number数量的请求进行实际调用,从而检测是否backend已经恢复并且可以再次访问。
处于HALF_OPEN状态的circuitbreaker,假设permittedNumberOfCalls的数量为10,此时存在20个调用,那么前10个调用都能正常调用,而后10个调用将会被拒绝,并且抛出CallNotPermittedException。
在HALF_OPEN状态下,如果failure rate或是slow call rate大于等于配置的threshold,那么circuitbreaker状态将会转为OPEN。如果failure rate和slow call rate小于threshold,那么circuitbreaker状态将变为CLOSED。
Special States
CircuitBreaker支持3个特殊状态:
METRICS_ONLY:处于该状态时,其行为如下- 所有
circuit breaker events都会正常生成(除了state transition外),并且metrics会正常记录 - 该状态和
CLOSED状态类似,但是circuitbreaker在threshold达到时,不会切换为OPEN状态
- 所有
DISABLED:- 没有
CircuitBreakerEvent会被产生,metrics也不会被记录 - 会允许所有的访问
- 没有
FORCED_OPEN:- 没有
CircuitBreakerEvent会被产生,metrics也不会被记录 - 会拒绝所有的访问
- 没有
退出这些特殊状态的方式如下:
- 触发state transition
- 对CircuitBreaker执行reset
thread-safe
CircuitBreaker线程安全,但是CircuitBreaker并不会对function call进行串行化,故而在使用CircuitBreaker时,function call可能会并行执行。
对于Closed状态的CircuitBreaker而言,如果20个线程同时对cirbuitbreaker进行访问,那么所有的方法调用都能同时并行执行,即使滑动窗口的大小为15小于并行数量。滑动窗口的大小不会对方法的并行程度造成影响。
如果想要对并行程度做出限制,可以使用Bulkhead。
CircuitBreakerRegistry
resilence4j附带了一个in-memory的CircuitBreakerRegistry,基于ConcurrentHashMap实现。可以通过CircuitBreakerRegistry来管理(创建和获取)CircuitBreaker实例。可以按照如下示例,根据默认的CircuitBreakerConfig来创建CircuitBreakerRegistry:
CircuitBreakerRegistry circuitBreakerRegistry =
CircuitBreakerRegistry.ofDefaults();
Create and configure CircuitBreakerConfig
除了使用默认的CircuitBreakerConfig外,还可以提供自定义的CircuitBreakerConfig,对象可以通过builder来构建。
CircuitBreakerConfig的可配置属性如下:
failureRateThreshold:配置failure rate threshold的默认百分比default value: 50description:当failure rate大于等于该threshold值时,CircuitBreaker会切为OPEN状态,并开始short-circuiting calls
slowCallRateThreshold:配置threshold百分比default value: 100description: 当slow calls的百分比等于或超过该threshold时,CircuitBreaker会切换到OPEN状态,并且开始short-circuiting calls
slowCallDurationThreshold: 配置slow calls的duration thresholddefault value: 60000 [ms]description: 当call的耗时超过该duration threshold限制时,会被认定为slow call,并且会增加slow call rate
permittedNumberOfCallsInHalfOpenState:default value: 10description:配置circuitbreaker切换到half open状态时,permitted calls的数量
maxWairDurationInfHalfOpenState:default value: 0 [ms]description:配置在CircuitBreaker从Half Open状态切换回Open状态前,其可以处于Half Open抓过你太的最长时间。默认值为0,代表其等待时间没有上限,直到所有的permitted calls都执行完成
slidingWindowType:配置滑动窗口的类型default value:COUNT_BASEDdesceiption:在CircuitBreaker处于closed状态时,滑动窗口用于记录调用的结果。滑动窗口类型可以是count-based和time-basedcounted_based:会记录最后slidingWindowSize个请求的结果并对其进行聚合time_based:会记录最后slidingWindowSize秒的调用结果,并且会对其进行聚合
slidingWindowSize:default value: 100description:用于配置滑动窗口的大小
minimumNumberOfCalls:default value:100description: 在可以计算error rate和slow call rate之前,至少需要记录minimum number个调用。例如,如果minimumNumberOfCalls为10,那么在可以计算failure rate前,必须至少记录10个calls(范围是整个滑动窗口期间范围内)。如果已经计算了9个calls,即使9个calls都调用失败,在记录的请求数达到minimumNumberOfCalls之前,也不会切换到open状态
waitDurationInOpenState:default value: 60000 [ms]description:该值代表处于OPEN状态的CircuitBreaker在切换为HALF-OPEN之前,会等待的时间长度
automaticTransitionFromOpenToHalfOpenEnabled:default value: falsedescription:如果该值设置为true,会创建一个线程来对所有CircuitBreakers对象进行监控,并在waitDurationInOpenState设置的时间达到后将CircuitBreaker从Open切换到HALF_OPEN状态如果该值设置为false(默认),那么CircuitBreaker从OPEN状态切换到HALF_OPEN状态的过程并不由专门的线程来触发,而是由请求来触发。如果该值设置为false(默认),即使waitDurationInOpenState设置的时间达到,如果没有后续请求,那么从OPEN到HALF_OPEN的变化也不会自动被触发
recordExceptions:default value: emptydescription: 该属性用于配置exception list,位于该exception list中的异常类型将会被视为failure,并增加failure rate。- 任何匹配或者继承exception list中的异常将会被视为failure,除非通过
ignoreExceptions显式的忽略了该异常 - 如果通过
recordExcepions指定了exception list,那么所有其他的异常都会被视为success,除非显式被ignoreExceptions指定
- 任何匹配或者继承exception list中的异常将会被视为failure,除非通过
ignoreExceptions:default value: emptydescription: 用于配置exception list,该list中配置的异常类型将会被忽略,既不会被记录为failure也不会被记录为success- 任何匹配或者继承了该list中异常类型的异常将会被忽略,即使该异常在
recordExceptions中被指定
- 任何匹配或者继承了该list中异常类型的异常将会被忽略,即使该异常在
recordFailurePredicate:default value: throwable -> truedescription: 一个自定义的predicate,用于评估一个异常是否应该被记录为failure。如果该异常应当被记录为failure,那么该predicate应当返回true;如果该异常应当被记录为failure,那么该predicate应当返回false
ignoreExceptionPredicate:default value: throwable -> falsedescription: 一个自定义的predicate,用于评估一个异常是否应当被忽略或是记录为failure/success。- 如果异常应当被忽略,那么该predicate应当返回true
- 如果异常不应该被忽略,predicate应当返回为false,该异常应该被视为failure/success
success/failure/ignore判断流程
在实际调用发生异常时,决定将异常视为success/failure/ignore的判断流程如下:
- 如果实际调用未抛出异常,则记录为调用成功,否则继续
- 如果抛出异常,首先会检查异常是否位于
Ignored Exceptions中,如果位于其中,则忽略该异常,否则继续 - 如果ignoreExceptionPredicate不为空,根据该predicate进行判断,如果返回为true,则忽略该异常,否则继续
- 校验异常是否位于
recordExceptions中,如果位于其中,则将其视为failure,否则继续 - 如果recordFailurePredicate不为空,根据
recordFailurePredicate判断是否该异常应当被视为failure,如果返回为true,将其视为failure,否则继续 - 如果上述都不满足,那么将其视为success
创建自定义CircuitBreakerConfig的示例如下所示:
// Create a custom configuration for a CircuitBreaker
CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
.failureRateThreshold(50)
.slowCallRateThreshold(50)
.waitDurationInOpenState(Duration.ofMillis(1000))
.slowCallDurationThreshold(Duration.ofSeconds(2))
.permittedNumberOfCallsInHalfOpenState(3)
.minimumNumberOfCalls(10)
.slidingWindowType(SlidingWindowType.TIME_BASED)
.slidingWindowSize(5)
.recordException(e -> INTERNAL_SERVER_ERROR
.equals(getResponse().getStatus()))
.recordExceptions(IOException.class, TimeoutException.class)
.ignoreExceptions(BusinessException.class, OtherBusinessException.class)
.build();
// Create a CircuitBreakerRegistry with a custom global configuration
CircuitBreakerRegistry circuitBreakerRegistry =
CircuitBreakerRegistry.of(circuitBreakerConfig);
// Get or create a CircuitBreaker from the CircuitBreakerRegistry
// with the global default configuration
CircuitBreaker circuitBreakerWithDefaultConfig =
circuitBreakerRegistry.circuitBreaker("name1");
// Get or create a CircuitBreaker from the CircuitBreakerRegistry
// with a custom configuration
CircuitBreaker circuitBreakerWithCustomConfig = circuitBreakerRegistry
.circuitBreaker("name2", circuitBreakerConfig);
除此之外,还可以在circuitRegistry中添加配置,该配置可以被多个CircuitBreaker实例共享
CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
.failureRateThreshold(70)
.build();
circuitBreakerRegistry.addConfiguration("someSharedConfig", config);
CircuitBreaker circuitBreaker = circuitBreakerRegistry
.circuitBreaker("name", "someSharedConfig");
并且,可以针对默认配置进行overwrite
CircuitBreakerConfig defaultConfig = circuitBreakerRegistry
.getDefaultConfig();
CircuitBreakerConfig overwrittenConfig = CircuitBreakerConfig
.from(defaultConfig)
.waitDurationInOpenState(Duration.ofSeconds(20))
.build();
如果不想使用CircuitBreakerRegistry来管理CircuitBreaker实例,也可以自己直接创建CircuitBreaker实例:
// Create a custom configuration for a CircuitBreaker
CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
.recordExceptions(IOException.class, TimeoutException.class)
.ignoreExceptions(BusinessException.class, OtherBusinessException.class)
.build();
CircuitBreaker customCircuitBreaker = CircuitBreaker
.of("testName", circuitBreakerConfig);
如果想要plugin in自己的registry实现,可以提供一个自定义的RegistryStore实现,并且通过Builder方法来plug in
CircuitBreakerRegistry registry = CircuitBreakerRegistry.custom()
.withRegistryStore(new YourRegistryStoreImplementation())
.withCircuitBreakerConfig(CircuitBreakerConfig.ofDefaults())
.build();
Decorate and execute a functional interface
CircuitBreaker可以针对callable, supplier, runnable, consumer, checkedrunnable, checkedsupplier, checkedconsumer, completionStage来进行decorate。
可以通过Try.of或Try.run来调用decorated function,这样允许链式调用,使用示例如下:
// Given
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("testName");
// When I decorate my function
CheckedFunction0<String> decoratedSupplier = CircuitBreaker
.decorateCheckedSupplier(circuitBreaker, () -> "This can be any method which returns: 'Hello");
// and chain an other function with map
Try<String> result = Try.of(decoratedSupplier)
.map(value -> value + " world'");
// Then the Try Monad returns a Success<String>, if all functions ran successfully.
assertThat(result.isSuccess()).isTrue();
assertThat(result.get()).isEqualTo("This can be any method which returns: 'Hello world'");
Consume emitted RegistryEvents
可以向CircuitBreakerRegistry注册一个event consumer,并且在CircuitBreaker被创建、替换、删除时执行对应的action
CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.ofDefaults();
circuitBreakerRegistry.getEventPublisher()
.onEntryAdded(entryAddedEvent -> {
CircuitBreaker addedCircuitBreaker = entryAddedEvent.getAddedEntry();
LOG.info("CircuitBreaker {} added", addedCircuitBreaker.getName());
})
.onEntryRemoved(entryRemovedEvent -> {
CircuitBreaker removedCircuitBreaker = entryRemovedEvent.getRemovedEntry();
LOG.info("CircuitBreaker {} removed", removedCircuitBreaker.getName());
});
consume emitted CircuitBreakerEvents
CircuitBreakerEvent在如下场景下会被发出:
- state transition
- circuit breaker reset
- successful call
- recorded error
- ignored error
所有的event都包含额外的信息,例如事件创建时间、call的处理时长等。如果想要消费该类事件,需要向CircuitBreaker注册事件:
circuitBreaker.getEventPublisher()
.onSuccess(event -> logger.info(...))
.onError(event -> logger.info(...))
.onIgnoredError(event -> logger.info(...))
.onReset(event -> logger.info(...))
.onStateTransition(event -> logger.info(...));
// Or if you want to register a consumer listening
// to all events, you can do:
circuitBreaker.getEventPublisher()
.onEvent(event -> logger.info(...));
可以使用CircularEventConsumer来对事件进行存储,事件会被存储在一个固定容量的circular buffer中
CircularEventConsumer<CircuitBreakerEvent> ringBuffer =
new CircularEventConsumer<>(10);
circuitBreaker.getEventPublisher().onEvent(ringBuffer);
List<CircuitBreakerEvent> bufferedEvents = ringBuffer.getBufferedEvents()
Bulkhead
resilence4j提供了两种bulkhead的实现,bukhead可以用于限制并发执行的数量:
SemaphoreBulkhead: 该bulkhead实现基于信号量FixedThreadPoolBulkhead: 该实现使用了a bounded queue and a fixed thread pool
create a BulkheadRegistry
和CircuitBreaker module类似,Bulkhead module也提供了in-memory的BulkheadRegistry和ThreadPoolBulkheadRegistry,用于管理Bulkhead实例,示例如下:
BulkheadRegistry bulkheadRegistry = BulkheadRegistry.ofDefaults();
ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry =
ThreadPoolBulkheadRegistry.ofDefaults();
Create and configure a Bulkhead
可以自己提供一个全局的BulkheadConfig,可通过BulkheadConfig builder来创建config对象,该config对象支持如下属性配置:
maxConcurentCalls:default value:25description:代表bulkhead允许的最大并行执行数量
maxWaitDuration:default value: 0description: 代表一个线程尝试进入饱和的bulkhead时,该线程的最长等待时间
示例如下所示:
// Create a custom configuration for a Bulkhead
BulkheadConfig config = BulkheadConfig.custom()
.maxConcurrentCalls(150)
.maxWaitDuration(Duration.ofMillis(500))
.build();
// Create a BulkheadRegistry with a custom global configuration
BulkheadRegistry registry = BulkheadRegistry.of(config);
// Get or create a Bulkhead from the registry -
// bulkhead will be backed by the default config
Bulkhead bulkheadWithDefaultConfig = registry.bulkhead("name1");
// Get or create a Bulkhead from the registry,
// use a custom configuration when creating the bulkhead
Bulkhead bulkheadWithCustomConfig = registry.bulkhead("name2", custom);
Create and configure a ThreadPoolBulkhead
可以提供一个自定义的global ThreadPoolBulkheadConfig,支持通过builder创建。
ThreadPoolBulkheadConfig支持如下配置属性:
maxThreadPoolSize:default value:Runtime.getRuntime().availableProcessors()description: 配置线程池的最大线程数
coreThreadPoolSize:default value:Runtime.getRuntime().availableProcessors()-1description: 配置线程池的核心线程数
queueCapacity:default value: 100descritpion: 配置queue的容量
keepAliveDuration:default value: 20 [ms]description: 当线程池中线程数量大于coreSize时,该值代表非核心线程在销毁前空闲的最大时长
writableStackTraceEnabled:default value: truedescritpion: 当bulkhead抛出异常时,是否打印除Stack Trace,当该值设置为false时,仅打印单行
ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom()
.maxThreadPoolSize(10)
.coreThreadPoolSize(2)
.queueCapacity(20)
.build();
// Create a BulkheadRegistry with a custom global configuration
ThreadPoolBulkheadRegistry registry = ThreadPoolBulkheadRegistry.of(config);
// Get or create a ThreadPoolBulkhead from the registry -
// bulkhead will be backed by the default config
ThreadPoolBulkhead bulkheadWithDefaultConfig = registry.bulkhead("name1");
// Get or create a Bulkhead from the registry,
// use a custom configuration when creating the bulkhead
ThreadPoolBulkheadConfig custom = ThreadPoolBulkheadConfig.custom()
.maxThreadPoolSize(5)
.build();
ThreadPoolBulkhead bulkheadWithCustomConfig = registry.bulkhead("name2", custom);
decorate and execute a functional interface
可以使用Bulkhead对callable, supplier, runnable, consumer, checkedrunnable, checkedsupplier, checkedconsumer, completionStage来进行decorate,示例如下:
// Given
Bulkhead bulkhead = Bulkhead.of("name", config);
// When I decorate my function
CheckedFunction0<String> decoratedSupplier = Bulkhead
.decorateCheckedSupplier(bulkhead, () -> "This can be any method which returns: 'Hello");
// and chain an other function with map
Try<String> result = Try.of(decoratedSupplier)
.map(value -> value + " world'");
// Then the Try Monad returns a Success<String>, if all functions ran successfully.
assertThat(result.isSuccess()).isTrue();
assertThat(result.get()).isEqualTo("This can be any method which returns: 'Hello world'");
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1);
ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom()
.maxThreadPoolSize(10)
.coreThreadPoolSize(2)
.queueCapacity(20)
.build();
ThreadPoolBulkhead bulkhead = ThreadPoolBulkhead.of("name", config);
CompletionStage<String> supplier = ThreadPoolBulkhead
.executeSupplier(bulkhead, backendService::doSomething);
consume emitted RegistryEvents
可以向BulkheadRegistry注册event consumer,用于监听Bulkhead的创建、替换和删除事件
BulkheadRegistry registry = BulkheadRegistry.ofDefaults();
registry.getEventPublisher()
.onEntryAdded(entryAddedEvent -> {
Bulkhead addedBulkhead = entryAddedEvent.getAddedEntry();
LOG.info("Bulkhead {} added", addedBulkhead.getName());
})
.onEntryRemoved(entryRemovedEvent -> {
Bulkhead removedBulkhead = entryRemovedEvent.getRemovedEntry();
LOG.info("Bulkhead {} removed", removedBulkhead.getName());
});
consume emitted BulkheadEvents
BulkHead会发送BulkHeadEvent事件,发送的事件包含如下类型:
- permitted execution
- rejected execution
- finished execution
如果想要消费上述事件,可以按照如下示例注册event consumer
bulkhead.getEventPublisher()
.onCallPermitted(event -> logger.info(...))
.onCallRejected(event -> logger.info(...))
.onCallFinished(event -> logger.info(...));