From 7f060618a9f1dadf79385539a658931d94e22ee2 Mon Sep 17 00:00:00 2001 From: asahi Date: Wed, 3 Sep 2025 11:07:26 +0800 Subject: [PATCH] =?UTF-8?q?doc:=20=E9=98=85=E8=AF=BBRateLimiter=E6=96=87?= =?UTF-8?q?=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../CircuitBreaker/CircuitBreaker.md | 140 ++++++++++++++++++ 1 file changed, 140 insertions(+) diff --git a/spring/Spring Cloud/CircuitBreaker/CircuitBreaker.md b/spring/Spring Cloud/CircuitBreaker/CircuitBreaker.md index 102672f..6fb40f3 100644 --- a/spring/Spring Cloud/CircuitBreaker/CircuitBreaker.md +++ b/spring/Spring Cloud/CircuitBreaker/CircuitBreaker.md @@ -1,3 +1,39 @@ +- [CircuitBreaker](#circuitbreaker) + - [Introduction](#introduction) + - [maven](#maven) + - [CircuitBreaker](#circuitbreaker-1) + - [State Machine](#state-machine) + - [Sliding Window](#sliding-window) + - [Count-based sliding window](#count-based-sliding-window) + - [Time-based sliding window](#time-based-sliding-window) + - [Failure rate and slow call rate thresholds](#failure-rate-and-slow-call-rate-thresholds) + - [Failure rate \& exception list](#failure-rate--exception-list) + - [Slow call rate](#slow-call-rate) + - [CircuitBreaker in `OPEN`/`HALF_OPEN` state](#circuitbreaker-in-openhalf_open-state) + - [Special States](#special-states) + - [thread-safe](#thread-safe) + - [CircuitBreakerRegistry](#circuitbreakerregistry) + - [Create and configure CircuitBreakerConfig](#create-and-configure-circuitbreakerconfig) + - [success/failure/ignore判断流程](#successfailureignore判断流程) + - [Decorate and execute a functional interface](#decorate-and-execute-a-functional-interface) + - [Consume emitted RegistryEvents](#consume-emitted-registryevents) + - [consume emitted CircuitBreakerEvents](#consume-emitted-circuitbreakerevents) + - [Bulkhead](#bulkhead) + - [create a BulkheadRegistry](#create-a-bulkheadregistry) + - [Create and configure a Bulkhead](#create-and-configure-a-bulkhead) + - [Create and configure a ThreadPoolBulkhead](#create-and-configure-a-threadpoolbulkhead) + - [decorate and execute a functional interface](#decorate-and-execute-a-functional-interface-1) + - [consume emitted RegistryEvents](#consume-emitted-registryevents-1) + - [consume emitted BulkheadEvents](#consume-emitted-bulkheadevents) + - [ThreadPoolBulkhead弊端](#threadpoolbulkhead弊端) + - [RateLimiter](#ratelimiter) + - [Create RateLimiterRegistry](#create-ratelimiterregistry) + - [Create and configure a RateLimiter](#create-and-configure-a-ratelimiter) + - [decorate and execute a functional interface](#decorate-and-execute-a-functional-interface-2) + - [consume emitted RegistryEvents](#consume-emitted-registryevents-2) + - [consume emitted RateLimiterEvents](#consume-emitted-ratelimiterevents) + + # CircuitBreaker resilience4j是一个轻量级的fault tolerance library,其针对函数式编程进行设计。resilence4j提供了更高阶的函数(`decorator`)来对`function interface, lambda expression, method reference`等内容进行增强。 @@ -564,3 +600,107 @@ bulkhead.getEventPublisher() .onCallRejected(event -> logger.info(...)) .onCallFinished(event -> logger.info(...)); ``` + +### ThreadPoolBulkhead弊端 +在ThreadPoolBulkhead的实现中,会为每一个bulkhead都创建独立的线程池,故而应当避免在项目中创建大量的bulkhead,避免项目线程数量的膨胀以及线程切换带来的巨大开销。 + +## RateLimiter +Resilience4j提供了RateLimiter,其将从`epoch`(jvm启动开始)开始的所有nanos划分为了一系列周期,每个周期的时长可以通过`RateLimiterConfig.limitRefreshPeriod`来进行配置。在每个周期的开始,RateLimiter都会将`active permissions number`设置为`RateLimiterConfig.limitForPeriod`。 + +RateLimiter的默认实现为`AtomicRateLimiter`,其通过`AtomicReference`来管理自身的状态。`AtomicRateLimiter.State`其本身是不可变的,并且包含如下fields: +- `activeCycle`: 上次调用所使用的cycle number +- `activePermissions`: 在上次调用之后的available permissions,该field可以为负数 +- `nanosToWait`:为了最后一次调用所需要等待的nanos数 + +除了`AtomicRateLimiter`之外,还存在`SemaphoreBasedRateLimiter`实现,其使用了semaphore和scheduler在每次`RatelImiterConfig#limitRefreshPeriod`之后对permissions进行刷新 + +### Create RateLimiterRegistry +和CircuitBreaker module类似,RateLimiter module也提供了in-memory RateLimiterRegistry用于管理RateLimiter实例。 + +```java +RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.ofDefaults(); +``` + +### Create and configure a RateLimiter +可以提供自定义的`RateLimiterConfig`,可通过builder进行创建。`RateLimiterConfig`支持如下配置属性: +- `timeoutDuration`: + - `default value`: 5 [s] + - `description`: 线程等待permission的默认等待时间 +- `limitRefreshPeriod`: + - `default value`: 500 [ns] + - `description`: limit refresh period。在每个period刷新后,rate limiter都会将其permission count重新设置为`limitForPeriod` +- `limitForPeriod`: + - `default value`: 50 + - `description`: 一个refresh period内的可获取permissions数量 + +通过rateLimiter限制某些接口调用速率不能超过`10 req/ms`的示例如下: +```java +RateLimiterConfig config = RateLimiterConfig.custom() + .limitRefreshPeriod(Duration.ofMillis(1)) + .limitForPeriod(10) + .timeoutDuration(Duration.ofMillis(25)) + .build(); + +// Create registry +RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.of(config); + +// Use registry +RateLimiter rateLimiterWithDefaultConfig = rateLimiterRegistry + .rateLimiter("name1"); + +RateLimiter rateLimiterWithCustomConfig = rateLimiterRegistry + .rateLimiter("name2", config); +``` + +### decorate and execute a functional interface +RateLimiter支持对`callable, supplier, runnable, consumer, checkedRunnalbe, checkedSupplier, checkedConsumer, CompletionStage`进行decorate: +```java +// Decorate your call to BackendService.doSomething() +CheckedRunnable restrictedCall = RateLimiter + .decorateCheckedRunnable(rateLimiter, backendService::doSomething); + +Try.run(restrictedCall) + .andThenTry(restrictedCall) + .onFailure((RequestNotPermitted throwable) -> LOG.info("Wait before call it again :)")); +``` +可以使用`changeTimeoutDuration`和`changeLimitForPeriod`在运行时改变rateLimiter的参数。new timeout并不会影响`正在等待permissions`的线程。并且,new limit不会影响当前period的permissions,new limiter会从下个period开始应用: +```java +// Decorate your call to BackendService.doSomething() +CheckedRunnable restrictedCall = RateLimiter + .decorateCheckedRunnable(rateLimiter, backendService::doSomething); + +// during second refresh cycle limiter will get 100 permissions +rateLimiter.changeLimitForPeriod(100); +``` + +### consume emitted RegistryEvents +可以针对`RateLimiterRegistry`注册event consumer,对rateLimiter的创建、替换和删除事件进行监听: +```java +RateLimiterRegistry registry = RateLimiterRegistry.ofDefaults(); +registry.getEventPublisher() + .onEntryAdded(entryAddedEvent -> { + RateLimiter addedRateLimiter = entryAddedEvent.getAddedEntry(); + LOG.info("RateLimiter {} added", addedRateLimiter.getName()); + }) + .onEntryRemoved(entryRemovedEvent -> { + RateLimiter removedRateLimiter = entryRemovedEvent.getRemovedEntry(); + LOG.info("RateLimiter {} removed", removedRateLimiter.getName()); + }); +``` +### consume emitted RateLimiterEvents +RateLimiter会在如下场景下发送事件: +- a successful permission acquire +- acquire failure + +所有的事件都包含额外信息,例如事件创建事件和rate limiter name等。如果想要消费事件,可以针对rateLimiter注册event consumer: +```java +rateLimiter.getEventPublisher() + .onSuccess(event -> logger.info(...)) + .onFailure(event -> logger.info(...)); +``` +如果使用project reactor,可以使用如下方式进行注册: +```java +ReactorAdapter.toFlux(rateLimiter.getEventPublisher()) + .filter(event -> event.getEventType() == FAILED_ACQUIRE) + .subscribe(event -> logger.info(...)) +```