From 80b25ede3d211ef17977842390f8330286279cfd Mon Sep 17 00:00:00 2001 From: asahi Date: Wed, 3 Sep 2025 10:20:44 +0800 Subject: [PATCH] =?UTF-8?q?doc:=20=E9=98=85=E8=AF=BBBulkhead=E6=96=87?= =?UTF-8?q?=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../CircuitBreaker/CircuitBreaker.md | 254 ++++++++++++++++++ 1 file changed, 254 insertions(+) diff --git a/spring/Spring Cloud/CircuitBreaker/CircuitBreaker.md b/spring/Spring Cloud/CircuitBreaker/CircuitBreaker.md index f00c989..102672f 100644 --- a/spring/Spring Cloud/CircuitBreaker/CircuitBreaker.md +++ b/spring/Spring Cloud/CircuitBreaker/CircuitBreaker.md @@ -309,4 +309,258 @@ CircuitBreaker circuitBreakerWithCustomConfig = circuitBreakerRegistry .circuitBreaker("name2", circuitBreakerConfig); ``` +除此之外,还可以在circuitRegistry中添加配置,该配置可以被多个CircuitBreaker实例共享 +```java +CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom() + .failureRateThreshold(70) + .build(); +circuitBreakerRegistry.addConfiguration("someSharedConfig", config); + +CircuitBreaker circuitBreaker = circuitBreakerRegistry + .circuitBreaker("name", "someSharedConfig"); +``` + +并且,可以针对默认配置进行overwrite +```java +CircuitBreakerConfig defaultConfig = circuitBreakerRegistry + .getDefaultConfig(); + +CircuitBreakerConfig overwrittenConfig = CircuitBreakerConfig + .from(defaultConfig) + .waitDurationInOpenState(Duration.ofSeconds(20)) + .build(); +``` + +如果不想使用CircuitBreakerRegistry来管理CircuitBreaker实例,也可以自己直接创建CircuitBreaker实例: +```java +// Create a custom configuration for a CircuitBreaker +CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom() + .recordExceptions(IOException.class, TimeoutException.class) + .ignoreExceptions(BusinessException.class, OtherBusinessException.class) + .build(); + +CircuitBreaker customCircuitBreaker = CircuitBreaker + .of("testName", circuitBreakerConfig); +``` + +如果想要plugin in自己的registry实现,可以提供一个自定义的`RegistryStore`实现,并且通过Builder方法来plug in +```java +CircuitBreakerRegistry registry = CircuitBreakerRegistry.custom() + .withRegistryStore(new YourRegistryStoreImplementation()) + .withCircuitBreakerConfig(CircuitBreakerConfig.ofDefaults()) + .build(); +``` + +### Decorate and execute a functional interface +CircuitBreaker可以针对`callable, supplier, runnable, consumer, checkedrunnable, checkedsupplier, checkedconsumer, completionStage`来进行decorate。 + +可以通过`Try.of`或`Try.run`来调用decorated function,这样允许链式调用,使用示例如下: +```java +// Given +CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("testName"); + +// When I decorate my function +CheckedFunction0 decoratedSupplier = CircuitBreaker + .decorateCheckedSupplier(circuitBreaker, () -> "This can be any method which returns: 'Hello"); + +// and chain an other function with map +Try result = Try.of(decoratedSupplier) + .map(value -> value + " world'"); + +// Then the Try Monad returns a Success, if all functions ran successfully. +assertThat(result.isSuccess()).isTrue(); +assertThat(result.get()).isEqualTo("This can be any method which returns: 'Hello world'"); +``` + +### Consume emitted RegistryEvents +可以向CircuitBreakerRegistry注册一个event consumer,并且在`CircuitBreaker`被创建、替换、删除时执行对应的action +```java +CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.ofDefaults(); +circuitBreakerRegistry.getEventPublisher() + .onEntryAdded(entryAddedEvent -> { + CircuitBreaker addedCircuitBreaker = entryAddedEvent.getAddedEntry(); + LOG.info("CircuitBreaker {} added", addedCircuitBreaker.getName()); + }) + .onEntryRemoved(entryRemovedEvent -> { + CircuitBreaker removedCircuitBreaker = entryRemovedEvent.getRemovedEntry(); + LOG.info("CircuitBreaker {} removed", removedCircuitBreaker.getName()); + }); +``` + +### consume emitted CircuitBreakerEvents +CircuitBreakerEvent在如下场景下会被发出: +- state transition +- circuit breaker reset +- successful call +- recorded error +- ignored error + +所有的event都包含额外的信息,例如事件创建时间、call的处理时长等。如果想要消费该类事件,需要向CircuitBreaker注册事件: +```java +circuitBreaker.getEventPublisher() + .onSuccess(event -> logger.info(...)) + .onError(event -> logger.info(...)) + .onIgnoredError(event -> logger.info(...)) + .onReset(event -> logger.info(...)) + .onStateTransition(event -> logger.info(...)); +// Or if you want to register a consumer listening +// to all events, you can do: +circuitBreaker.getEventPublisher() + .onEvent(event -> logger.info(...)); +``` + +可以使用`CircularEventConsumer`来对事件进行存储,事件会被存储在一个固定容量的circular buffer中 +```java +CircularEventConsumer ringBuffer = + new CircularEventConsumer<>(10); +circuitBreaker.getEventPublisher().onEvent(ringBuffer); +List bufferedEvents = ringBuffer.getBufferedEvents() +``` + +## Bulkhead +resilence4j提供了两种bulkhead的实现,bukhead可以用于限制并发执行的数量: +- `SemaphoreBulkhead`: 该bulkhead实现基于信号量 +- `FixedThreadPoolBulkhead`: 该实现使用了`a bounded queue and a fixed thread pool` + +### create a BulkheadRegistry +和CircuitBreaker module类似,Bulkhead module也提供了in-memory的`BulkheadRegistry和ThreadPoolBulkheadRegistry`,用于管理Bulkhead实例,示例如下: +```java +BulkheadRegistry bulkheadRegistry = BulkheadRegistry.ofDefaults(); + +ThreadPoolBulkheadRegistry threadPoolBulkheadRegistry = + ThreadPoolBulkheadRegistry.ofDefaults(); +``` + +### Create and configure a Bulkhead +可以自己提供一个全局的`BulkheadConfig`,可通过`BulkheadConfig` builder来创建config对象,该config对象支持如下属性配置: +- `maxConcurentCalls`: + - `default value`:25 + - `description`:代表bulkhead允许的最大并行执行数量 +- `maxWaitDuration`: + - `default value`: 0 + - `description`: 代表一个线程尝试进入饱和的bulkhead时,该线程的最长等待时间 + +示例如下所示: +```java +// Create a custom configuration for a Bulkhead +BulkheadConfig config = BulkheadConfig.custom() + .maxConcurrentCalls(150) + .maxWaitDuration(Duration.ofMillis(500)) + .build(); + +// Create a BulkheadRegistry with a custom global configuration +BulkheadRegistry registry = BulkheadRegistry.of(config); + +// Get or create a Bulkhead from the registry - +// bulkhead will be backed by the default config +Bulkhead bulkheadWithDefaultConfig = registry.bulkhead("name1"); + +// Get or create a Bulkhead from the registry, +// use a custom configuration when creating the bulkhead +Bulkhead bulkheadWithCustomConfig = registry.bulkhead("name2", custom); +``` + +### Create and configure a ThreadPoolBulkhead +可以提供一个自定义的global `ThreadPoolBulkheadConfig`,支持通过builder创建。 + +`ThreadPoolBulkheadConfig`支持如下配置属性: +- `maxThreadPoolSize`: + - `default value`: `Runtime.getRuntime().availableProcessors()` + - `description`: 配置线程池的最大线程数 +- `coreThreadPoolSize`: + - `default value`: `Runtime.getRuntime().availableProcessors()-1` + - `description`: 配置线程池的核心线程数 +- `queueCapacity`: + - `default value`: 100 + - `descritpion`: 配置queue的容量 +- `keepAliveDuration`: + - `default value`: 20 [ms] + - `description`: 当线程池中线程数量大于coreSize时,该值代表非核心线程在销毁前空闲的最大时长 +- `writableStackTraceEnabled`: + - `default value`: true + - `descritpion`: 当bulkhead抛出异常时,是否打印除Stack Trace,当该值设置为false时,仅打印单行 + +```java +ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom() + .maxThreadPoolSize(10) + .coreThreadPoolSize(2) + .queueCapacity(20) + .build(); + +// Create a BulkheadRegistry with a custom global configuration +ThreadPoolBulkheadRegistry registry = ThreadPoolBulkheadRegistry.of(config); + +// Get or create a ThreadPoolBulkhead from the registry - +// bulkhead will be backed by the default config +ThreadPoolBulkhead bulkheadWithDefaultConfig = registry.bulkhead("name1"); + +// Get or create a Bulkhead from the registry, +// use a custom configuration when creating the bulkhead +ThreadPoolBulkheadConfig custom = ThreadPoolBulkheadConfig.custom() + .maxThreadPoolSize(5) + .build(); + +ThreadPoolBulkhead bulkheadWithCustomConfig = registry.bulkhead("name2", custom); +``` + +### decorate and execute a functional interface +可以使用Bulkhead对`callable, supplier, runnable, consumer, checkedrunnable, checkedsupplier, checkedconsumer, completionStage`来进行decorate,示例如下: +```java +// Given +Bulkhead bulkhead = Bulkhead.of("name", config); + +// When I decorate my function +CheckedFunction0 decoratedSupplier = Bulkhead + .decorateCheckedSupplier(bulkhead, () -> "This can be any method which returns: 'Hello"); + +// and chain an other function with map +Try result = Try.of(decoratedSupplier) + .map(value -> value + " world'"); + +// Then the Try Monad returns a Success, if all functions ran successfully. +assertThat(result.isSuccess()).isTrue(); +assertThat(result.get()).isEqualTo("This can be any method which returns: 'Hello world'"); +assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(1); +``` +```java +ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom() + .maxThreadPoolSize(10) + .coreThreadPoolSize(2) + .queueCapacity(20) + .build(); + +ThreadPoolBulkhead bulkhead = ThreadPoolBulkhead.of("name", config); + +CompletionStage supplier = ThreadPoolBulkhead + .executeSupplier(bulkhead, backendService::doSomething); +``` + +### consume emitted RegistryEvents +可以向`BulkheadRegistry`注册event consumer,用于监听Bulkhead的创建、替换和删除事件 +```java +BulkheadRegistry registry = BulkheadRegistry.ofDefaults(); +registry.getEventPublisher() + .onEntryAdded(entryAddedEvent -> { + Bulkhead addedBulkhead = entryAddedEvent.getAddedEntry(); + LOG.info("Bulkhead {} added", addedBulkhead.getName()); + }) + .onEntryRemoved(entryRemovedEvent -> { + Bulkhead removedBulkhead = entryRemovedEvent.getRemovedEntry(); + LOG.info("Bulkhead {} removed", removedBulkhead.getName()); + }); +``` + +### consume emitted BulkheadEvents +BulkHead会发送BulkHeadEvent事件,发送的事件包含如下类型: +- permitted execution +- rejected execution +- finished execution + +如果想要消费上述事件,可以按照如下示例注册event consumer +```java +bulkhead.getEventPublisher() + .onCallPermitted(event -> logger.info(...)) + .onCallRejected(event -> logger.info(...)) + .onCallFinished(event -> logger.info(...)); +```