Files
rikako-note/spring/AMQP/Spring AMQP RabbitMQ.md
2023-04-27 16:57:44 +08:00

346 lines
19 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Spring AMQP
Spring AMQP将spring项目中的核心理念应用到了基于AMQP的消息解决方案的开发。项目中提供了一个高级别抽象的“template”来发送和接收消息。
## 示例
### Spring配置
如下示例展示了Spring项目中消息的接收和发送
```java
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate template = context.getBean(AmqpTemplate.class);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");
........
@Configuration
public class RabbitConfiguration {
@Bean
public CachingConnectionFactory connectionFactory() {
return new CachingConnectionFactory("localhost");
}
@Bean
public RabbitAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
public Queue myQueue() {
return new Queue("myqueue");
}
}
```
### SpringBoot自动装配配置
SpringBoot会自动配置bean对象的结构按如下样例所示
```java
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public ApplicationRunner runner(AmqpTemplate template) {
return args -> template.convertAndSend("myqueue", "foo");
}
@Bean
public Queue myQueue() {
return new Queue("myqueue");
}
@RabbitListener(queues = "myqueue")
public void listen(String in) {
System.out.println(in);
}
}
```
## AMQP抽象
Spring AMQP由两部分组成`spring-amqp``spring-rabbit`。其中`spring-amqp`为抽象层不依赖于任何AMQP Broker实现或client library。因此开发者只需要针对抽象层进行开发开发的代码也能够跨amqp实现进行移植。该抽象层通过`spring-rabbit`来进行实现目前只有RABBITMQ的实现。
AMQP在协议层进行操作可以通过RabbitMQ的客户端与任何实现了AMQP相同版本协议的broker进行通信。
### Message
Spring AMQP定义了Message类作为AMQP模型中对消息的抽象Message中既包含了消息的内容也包含了消息的属性如下展示了Message类的定义
```java
public class Message {
private final MessageProperties messageProperties;
private final byte[] body;
public Message(byte[] body, MessageProperties messageProperties) {
this.body = body;
this.messageProperties = messageProperties;
}
public byte[] getBody() {
return this.body;
}
public MessageProperties getMessageProperties() {
return this.messageProperties;
}
}
```
`MessageProperties`类中定义了通用的属性,例如'messageId', 'timestamp', 'contentType'等。除了这些属性外,还可以通过`setHeader(String key, Object value)`方法来自定义一些header。
### Exchange
Exchange接口代表了AMQP Exchange消息生产者将消息发送到Exchange。每个Exchange都位于broker的virtual host中并含有一个唯一的name并且Exchange还有其他属性。
```java
public interface Exchange {
String getName();
String getExchangeType();
boolean isDurable();
boolean isAutoDelete();
Map<String, Object> getArguments();
}
```
#### Exchange参数
- isDurable当isDurable参数被设置为true时该交换机即使当broker重启仍然会存在如果isDurable参数被设置为false当broker重启之后交换机必须重新declare
- autoDelete当最后一个queue从交换机解绑定时交换机将会被删除
> 默认情况下Spring Exchange中创建的交换机isDurable属性为trueautoDelete属性为false
#### Exchange类型
交换机类型通过`ExchangeTypes`中的常量进行表示常用的类型如下direct, topic, fanout, and headers。
不同类型交换机之间的区别,在于它们如何处理与队列之间的绑定:
- directdirect exchange会让一个queue被绑定到一个固定的routing key
- topictopic exchange支持通过routing pattern来进行绑定routing pattern中包含`*``#`通配符,其中`*` 代表一个任意word`#`代表0个或多个任意word
> ### routing_key格式
> routging_key组成格式为一个由`.`符号分隔的word列表形式如下
> `stock.usd.nyse`,`nyse.vmw`,`quick.orange.rabbit`word的数量任意但是大小需要控制在256字节以内
> topic exchange中的通配符如下`#`和`*`,其中`*`可以匹配一个word而`#`则是可以匹配0个或多个word
> 例如`*.orange.*``*.*.rabbit``lazy.#`
- fanoutfanout exchange会将消息发布到所有绑定到交换机上的队列而不会考虑是否routing key
#### default Exchange
AMQP协议要求所有broker实现需要提供一个默认的exchange该默认交换机没有名称name为空字符串且类型为direct。所有创建的queue都会自动绑定到该交换机并且routing_key和queueName相同。
### Queue
Queue代表消息消费者接收消息的组件。如下展示了Queue类结构
```java
public class Queue {
private final String name;
private volatile boolean durable;
private volatile boolean exclusive;
private volatile boolean autoDelete;
private volatile Map<String, Object> arguments;
/**
* The queue is durable, non-exclusive and non auto-delete.
*
* @param name the name of the queue.
*/
public Queue(String name) {
this(name, true, false, false);
}
// Getters and Setters omitted for brevity
}
```
Queue构造器接收queue name取决于实现adminTemplate会提供方法来产生唯一的named queue。这些队列可以用于reply address故而这些队列的exclusive和autodelete属性都应该被设置为true。
#### Queue的参数
- durable该队列当broker重启时是否仍会存在如果设置为falsebroker重启之后queue必须被重新decalre
- exclusive只能够被一个连接使用当该连接断开之后queue也会被删除
- auto-delete当最后一个consumer不再订阅该queue时该queue会被自动删除
> 当声明一个queue时如果该queue不存在那么queue会被创建如果该queue已经存在并且声明的属性和已存在队列的属性相同那么不会发生任何事如果queue已经存在并且声明属性和已存在队列不同那么会抛出异常并且406的异常codePRECONDITION_FAILED会被返回
### Binding
生产者向exchange发送消息而消费者会从queue接收消息而exchange和queue之间的关系通过binding表示。
#### 将queue绑定到DirectExchange
```java
// routing_key为固定的"foo.bar"
new Binding(someQueue, someDirectExchange, "foo.bar");
```
#### 将queue绑定到TopicExchange
```java
// routing_key为pattern “foo.*"
new Binding(someQueue, someTopicExchange, "foo.*");
```
#### 将queue绑定到FanoutExchange
```java
// 绑定到fanout交换机时无需routing_key
new Binding(someQueue, someFanoutExchange);
```
#### 通过BindingBuilder来创建Binding
```java
Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");
```
## 连接和资源管理
管理到RabbitMQ broker连接的是`ConnectionFactory`接口,该接口的职责是提供`org.springframework.amqp.rabbit.connection.Connection`连接
### 选择ConnectionFactory
有三种connectionFactory可供选择
- PooledChannelConnectionFactory
- ThreadChannelConnectionFactory
- CachingConnectionFactory
在大多数情况下应该使用PooledChannelConnectionFactory。
#### PooledChannelConnectionFactory
该factory管理一个连接和两个channel pool其中一个channel pool是事务channel另一个channel pool用于非事务的channel。
```java
@Bean
PooledChannelConnectionFactory pcf() throws Exception {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory);
pcf.setPoolConfigurer((pool, tx) -> {
if (tx) {
// configure the transactional pool
}
else {
// configure the non-transactional pool
}
});
return pcf;
}
```
#### ThreadChannelConnectionFactory
该factory管理一个连接和两个ThreadLocal一个用于事务channel一个用于非事务channel。该factory会保证位于同一线程的操作会使用同一个channel。
该特性允许在不借助Scoped操作的前提下进行严格的消息排序。为了避免内存泄漏如果你你的应用使用了很多生命周期很短的线程必须手动调用factory的`closeThreadChannel()`方法来释放channel资源。
#### CachingConnectionFactory
CachingConnectionFactory会建立一个在应用程序范围内共享的connection proxy。共享连接是可能的因为与AMQP进行消息传递的其实是Channel而connection实例提供了createChnanel方法。
`CachingConnectionFactory`实现支持缓存channel对象并且对事务channel和非事务channel进行分别缓存。
在创建CachingConnectionFactory时可以通过构造器提供hostname参数还可以提供usernmae和password参数。
如果要设置channel cache size默认情况下为25可以调用`setChannelCacheSize()`方法。
也可以设置CachingConnectionFactory来缓存connection每次调用`createConnection()`方法都会创建一个新的连接关闭一个连接则是会将其返还到connection cache中。在这些connection创建的channel也会被缓存。如果要缓存这些连接需要将`cacheMode`设置为`CacheMode.CONNECTION`
#### cache size并不是limit
cache size并不会限制应用中使用channel的数量但是限制channel被缓存的数量。当cache size被设置为10时应用中可以使用任意多数量的channel。如果由超过10个channel被使用那么当这些channel被返还到cache时只会有10个channel被缓存其他的channel都会被物理关闭。
默认情况下cache size被设置为25在多线程环境下如果cache size被设置的过小那么会频繁的对channel进行创建和关闭。可以在RabbitMQ Admin UI界面实时监控channel数量并且对cache size进行调整。
#### 通过CachingConnectionFactory来创建来连接
```java
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.createConnection();
```
### 连接到集群
如果要连接到一个集群,需要设置`CachingConnectionFactory``addresses`属性:
```java
@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
return ccf;
}
```
默认情况下当新连接创建时会随机连接到集群中的一个host如果想要从第一个尝试到最后一个可以设置`ddressShuffleMode`属性为`AddressShuffleMode.NONE`
### Publisher Confirm and Returns
confirm和返回消息可以通过设置`CachingConnectionFactory`来支持,将`CachingConnectionFactory``publisherConfirmType`属性设置为`ConfirmType.CORRELATED`并将`publisherReturns`的属性设置为true。
当这些选项设置后被factory创建的Channel实例将会被包装在`PublisherCallbackChannel`中。当这些channel对象被获取后使用者可以向channel中注册一个`PublisherCallbackChannel.Listener`回调。`PublisherCallbackChannel`实现中含有逻辑来将confirm或return路由到特定的listener中。
#### Publisher Confirm
由于网络故障或是其他原因client向socket中写入的数据无法保证被传输到server端并被处理可能会出现数据丢失的情况。
在使用标准AMQP 0-9-1情况下唯一保证消息不被丢失的方法是使用事务令channel是事务的并且对消息进行发布和提交。在这种情况下不必要的事务会极大的影响吞吐量和效率会将吞吐量降低250倍左右。为了解决这种情况引入了确认机制。
要启用确认机制client会发送`confirm.select`broker则会回复`confirm.select-ok`。一旦`confirm.select`在channel中被使用则意味着confirm mode已经开启一个事务channel无法开启确认机制且一个确认机制开启的channel也无法将其设置为事务的。
一旦channel处于确认模式client和broker都会计算消息数量从1开始即从confirm.select开始broker在确认消息时会向channel中发送`basic.ack`,而`delivery-tag`则是包含了确认消息的序列号。broker也会在`basic.ack`中设置`multiple`属性,代表该消息和位于该消息之前的消息都被确认。
## Amqp Template
Spring AMQP提供了高层抽象的template在该tempalte中定义了接收消息和发送消息的主要操作。
### 添加重试功能
可以配置RabbitTemplate使用RetryTemplate如下样例展示了exponential back off policy和默认的SimpleRetryPolicy该policy会在抛异常之前重试3次。
```java
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(500);
backOffPolicy.setMultiplier(10.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
template.setRetryTemplate(retryTemplate);
return template;
}
```
### 异步发布
发布消息是一个异步的机制默认情况下rabbitMQ会将不能路由的消息丢弃。为了成功的发布需要获取异步的confirm。考虑如下两种失败场景
- 发送到一个exchange但是没有匹配的queue
- 发送到一个不存在的exchange
第一种场景由publisher returns涵盖。
对于第二种场景消息将会被丢弃并且没有任何返回channel将会被关闭并且抛出异常。默认情况下异常将会在日志中打印但是可以注册一个ChannelListener到CachingConnectionFactory用于获取如下事件。
```java
this.connectionFactory.addConnectionListener(new ConnectionListener() {
@Override
public void onCreate(Connection connection) {
}
@Override
public void onShutDown(ShutdownSignalException signal) {
...
}
});
```
可以检查signal的reason属性从而确定产生的问题。
### Correlated Publisher Confirms and Returns
RabbitTemplate中实现了AmqpTemplate中对于发布确认和Returns的支持。
#### Rabbitmq Returns
通过Rabbitmq完成RPC调用消息中需要含有如下属性
- deliveryMode 将消息标识为持久化的或者暂时的如果该属性值为2则消息是持久化的任何其他值消息都是暂时的
- contentType用于表示消息的mime-type例如可以是application/json
- replyTo通常用于命名一个callback queue
- correlationId该属性用于关联rpc响应和请求
##### Correlation Id
如果对来自所有客户端的rpc请求都只创建一个callback queue该方案效率会很低。因此应该为每一个客户端都创建一个callback queue。
> 因为如果所有客户端使用一条callback queue那么传递给callback queue的rpc响应会广播给所有订阅的客户端然后再用客户端丢弃或匹配。这样会极大影响吞吐量。
在为每一个client创建一个callback queue后从callback queue中接收到的响应再通过`correlationId`属性同请求关联到一起。如果correlationid关联的请求不存在那么只需要安全的丢弃该请求即可。
##### Rabitmq实现RPC流程
- 对于rpc请求客户端发送消息时消息应该携带两个属性`replyTo`replyTo是一个匿名的独占队列exclusive为true当该队列只能由一个连接使用并且当连接断开时会自动删除`correleationId`则是一个唯一的值,用于将请求和响应关联起来
- 请求将会被发送到rpc_queue
- rpc worker将会从rpc_queue中拉取消息并对拉取数据进行处理。处理完成之后会将结果封装在消息中并且发送给客户端消息会被发送到`replyTo`属性指定的queue
- client等待reply_queue中的响应消息当拉取到响应消息之后其会检查correlationId属性并根据其来匹配响应对应的请求
#### Spring AMQP Returns
对于返回的消息, template的`mandatory`属性必须要被设置为true`mandatory-expression`表达式必须要为true。该特性要求CachingConnectionFactory的`publisherReturns`属性被设置为true。返回将会通过`setReturnsCallback(ReturnsCallback callback)`方法注册`RabbitTemplate.ReturnsCallback`回调来发送到客户端,该回调必须实现如下方法:
```java
void returnedMessage(ReturnedMessage returned);
```
> 在通过setReturnsCallback来注册回调时每个rabbitTemplate只能够注册一个ReturnsCallback
ReturnedMessage必须含有如下属性
- message返回消息本身
- replyCode代表return reason的code
- replyText一个文本的return reason例如`NO_ROUTE`
- exchange消息发送到的交换机
- routing key使用的routing key
#### 发布确认
对于发布确认该template需要一个CachingConnectionFactory并且CachingConnectionFactory的`publisherConfirm`属性需要被设置为`ConfirmType.CORRELATED`。确认将会被发送给客户端,可以通过`setConfirmCallback(ConfirmCallback callback)`注册一个`RabbitTemplate.ConfirmCallback`方法。该回调需要实现如下方法:
```java
void confirm(CorrelationData correlationData, boolean ack, String cause);
```
其中correlationData是客户端在发送消息时提供的原始消息对象。`ack`参数如果为`true`代表`ack`,为`false`则是代表`nack`。对于`nack``cause`参数将会包含`nack`的原因。
> 例如如果将消息发送给一个不存在的交换机在这种情况下broker将会关闭channel关闭的原因则会包含在`cause`中。
ConfirmCallback只在RabbitTemplate中被支持。
如果同时启用了发布确认和返回且消息无法被路由到任何队列中那么CorrelationData的return属性将会被注入到返回的消息中。其会保证返回消息的属性会在
### Scoped Operations
通常情况下在使用template时Channel从cache中获取使用完毕之后再被返还到cache中以便重用。在多线程环境下并不会保证线程在下次使用Channel时会使用一样的Channel。