From 2034b8f8c7f7d993021f04273876a97f9451d609 Mon Sep 17 00:00:00 2001 From: Rikako Wu <496063163@qq.com> Date: Thu, 27 Apr 2023 16:57:44 +0800 Subject: [PATCH] =?UTF-8?q?Spring=20AMQP=E6=96=87=E6=A1=A3=E9=98=85?= =?UTF-8?q?=E8=AF=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- spring/AMQP/Spring AMQP RabbitMQ.md | 50 +++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/spring/AMQP/Spring AMQP RabbitMQ.md b/spring/AMQP/Spring AMQP RabbitMQ.md index faaeb25..8dc5c52 100644 --- a/spring/AMQP/Spring AMQP RabbitMQ.md +++ b/spring/AMQP/Spring AMQP RabbitMQ.md @@ -294,3 +294,53 @@ this.connectionFactory.addConnectionListener(new ConnectionListener() { ``` 可以检查signal的reason属性,从而确定产生的问题。 +### Correlated Publisher Confirms and Returns +RabbitTemplate中实现了AmqpTemplate中对于发布确认和Returns的支持。 +#### Rabbitmq Returns +通过Rabbitmq完成RPC调用,消息中需要含有如下属性: +- deliveryMode: 将消息标识为持久化的或者暂时的,如果该属性值为2,则消息是持久化的,任何其他值消息都是暂时的 +- contentType:用于表示消息的mime-type,例如可以是application/json +- replyTo:通常用于命名一个callback queue +- correlationId:该属性用于关联rpc响应和请求 + +##### Correlation Id +如果对来自所有客户端的rpc请求,都只创建一个callback queue,该方案效率会很低。因此,应该为每一个客户端都创建一个callback queue。 +> 因为如果所有客户端使用一条callback queue,那么传递给callback queue的rpc响应会广播给所有订阅的客户端,然后再用客户端丢弃或匹配。这样会极大影响吞吐量。 + +在为每一个client创建一个callback queue后,从callback queue中接收到的响应再通过`correlationId`属性同请求关联到一起。如果correlationid关联的请求不存在,那么只需要安全的丢弃该请求即可。 + +##### Rabitmq实现RPC流程 +- 对于rpc请求,客户端发送消息时消息应该携带两个属性:`replyTo`:replyTo是一个匿名的独占队列(exclusive为true,当该队列只能由一个连接使用,并且当连接断开时会自动删除),`correleationId`则是一个唯一的值,用于将请求和响应关联起来 +- 请求将会被发送到rpc_queue +- rpc worker将会从rpc_queue中拉取消息,并对拉取数据进行处理。处理完成之后,会将结果封装在消息中并且发送给客户端,消息会被发送到`replyTo`属性指定的queue +- client等待reply_queue中的响应消息,当拉取到响应消息之后,其会检查correlationId属性,并根据其来匹配响应对应的请求 + + +#### Spring AMQP Returns +对于返回的消息, template的`mandatory`属性必须要被设置为true,或`mandatory-expression`表达式必须要为true。该特性要求CachingConnectionFactory的`publisherReturns`属性被设置为true。返回将会通过`setReturnsCallback(ReturnsCallback callback)`方法注册`RabbitTemplate.ReturnsCallback`回调来发送到客户端,该回调必须实现如下方法: +```java +void returnedMessage(ReturnedMessage returned); +``` +> 在通过setReturnsCallback来注册回调时,每个rabbitTemplate只能够注册一个ReturnsCallback + +ReturnedMessage必须含有如下属性: +- message:返回消息本身 +- replyCode:代表return reason的code +- replyText:一个文本的return reason,例如`NO_ROUTE` +- exchange:消息发送到的交换机 +- routing key:使用的routing key + +#### 发布确认 +对于发布确认,该template需要一个CachingConnectionFactory,并且CachingConnectionFactory的`publisherConfirm`属性需要被设置为`ConfirmType.CORRELATED`。确认将会被发送给客户端,可以通过`setConfirmCallback(ConfirmCallback callback)`注册一个`RabbitTemplate.ConfirmCallback`方法。该回调需要实现如下方法: +```java +void confirm(CorrelationData correlationData, boolean ack, String cause); +``` +其中correlationData是客户端在发送消息时提供的原始消息对象。`ack`参数如果为`true`代表`ack`,为`false`则是代表`nack`。对于`nack`,`cause`参数将会包含`nack`的原因。 +> 例如,如果将消息发送给一个不存在的交换机,在这种情况下,broker将会关闭channel,关闭的原因则会包含在`cause`中。 + +ConfirmCallback只在RabbitTemplate中被支持。 + +如果同时启用了发布确认和返回,且消息无法被路由到任何队列中,那么CorrelationData的return属性将会被注入到返回的消息中。其会保证返回消息的属性会在 + +### Scoped Operations +通常情况下,在使用template时,Channel从cache中获取,使用完毕之后再被返还到cache中以便重用。在多线程环境下,并不会保证线程在下次使用Channel时会使用一样的Channel。 \ No newline at end of file