Spring AMQP文档阅读
This commit is contained in:
@@ -294,3 +294,53 @@ this.connectionFactory.addConnectionListener(new ConnectionListener() {
|
||||
```
|
||||
可以检查signal的reason属性,从而确定产生的问题。
|
||||
|
||||
### Correlated Publisher Confirms and Returns
|
||||
RabbitTemplate中实现了AmqpTemplate中对于发布确认和Returns的支持。
|
||||
#### Rabbitmq Returns
|
||||
通过Rabbitmq完成RPC调用,消息中需要含有如下属性:
|
||||
- deliveryMode: 将消息标识为持久化的或者暂时的,如果该属性值为2,则消息是持久化的,任何其他值消息都是暂时的
|
||||
- contentType:用于表示消息的mime-type,例如可以是application/json
|
||||
- replyTo:通常用于命名一个callback queue
|
||||
- correlationId:该属性用于关联rpc响应和请求
|
||||
|
||||
##### Correlation Id
|
||||
如果对来自所有客户端的rpc请求,都只创建一个callback queue,该方案效率会很低。因此,应该为每一个客户端都创建一个callback queue。
|
||||
> 因为如果所有客户端使用一条callback queue,那么传递给callback queue的rpc响应会广播给所有订阅的客户端,然后再用客户端丢弃或匹配。这样会极大影响吞吐量。
|
||||
|
||||
在为每一个client创建一个callback queue后,从callback queue中接收到的响应再通过`correlationId`属性同请求关联到一起。如果correlationid关联的请求不存在,那么只需要安全的丢弃该请求即可。
|
||||
|
||||
##### Rabitmq实现RPC流程
|
||||
- 对于rpc请求,客户端发送消息时消息应该携带两个属性:`replyTo`:replyTo是一个匿名的独占队列(exclusive为true,当该队列只能由一个连接使用,并且当连接断开时会自动删除),`correleationId`则是一个唯一的值,用于将请求和响应关联起来
|
||||
- 请求将会被发送到rpc_queue
|
||||
- rpc worker将会从rpc_queue中拉取消息,并对拉取数据进行处理。处理完成之后,会将结果封装在消息中并且发送给客户端,消息会被发送到`replyTo`属性指定的queue
|
||||
- client等待reply_queue中的响应消息,当拉取到响应消息之后,其会检查correlationId属性,并根据其来匹配响应对应的请求
|
||||
|
||||
|
||||
#### Spring AMQP Returns
|
||||
对于返回的消息, template的`mandatory`属性必须要被设置为true,或`mandatory-expression`表达式必须要为true。该特性要求CachingConnectionFactory的`publisherReturns`属性被设置为true。返回将会通过`setReturnsCallback(ReturnsCallback callback)`方法注册`RabbitTemplate.ReturnsCallback`回调来发送到客户端,该回调必须实现如下方法:
|
||||
```java
|
||||
void returnedMessage(ReturnedMessage returned);
|
||||
```
|
||||
> 在通过setReturnsCallback来注册回调时,每个rabbitTemplate只能够注册一个ReturnsCallback
|
||||
|
||||
ReturnedMessage必须含有如下属性:
|
||||
- message:返回消息本身
|
||||
- replyCode:代表return reason的code
|
||||
- replyText:一个文本的return reason,例如`NO_ROUTE`
|
||||
- exchange:消息发送到的交换机
|
||||
- routing key:使用的routing key
|
||||
|
||||
#### 发布确认
|
||||
对于发布确认,该template需要一个CachingConnectionFactory,并且CachingConnectionFactory的`publisherConfirm`属性需要被设置为`ConfirmType.CORRELATED`。确认将会被发送给客户端,可以通过`setConfirmCallback(ConfirmCallback callback)`注册一个`RabbitTemplate.ConfirmCallback`方法。该回调需要实现如下方法:
|
||||
```java
|
||||
void confirm(CorrelationData correlationData, boolean ack, String cause);
|
||||
```
|
||||
其中correlationData是客户端在发送消息时提供的原始消息对象。`ack`参数如果为`true`代表`ack`,为`false`则是代表`nack`。对于`nack`,`cause`参数将会包含`nack`的原因。
|
||||
> 例如,如果将消息发送给一个不存在的交换机,在这种情况下,broker将会关闭channel,关闭的原因则会包含在`cause`中。
|
||||
|
||||
ConfirmCallback只在RabbitTemplate中被支持。
|
||||
|
||||
如果同时启用了发布确认和返回,且消息无法被路由到任何队列中,那么CorrelationData的return属性将会被注入到返回的消息中。其会保证返回消息的属性会在
|
||||
|
||||
### Scoped Operations
|
||||
通常情况下,在使用template时,Channel从cache中获取,使用完毕之后再被返还到cache中以便重用。在多线程环境下,并不会保证线程在下次使用Channel时会使用一样的Channel。
|
||||
Reference in New Issue
Block a user