Files
rikako-note/spring/AMQP/Spring AMQP RabbitMQ.md
2023-04-27 11:18:14 +08:00

297 lines
15 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Spring AMQP
Spring AMQP将spring项目中的核心理念应用到了基于AMQP的消息解决方案的开发。项目中提供了一个高级别抽象的“template”来发送和接收消息。
## 示例
### Spring配置
如下示例展示了Spring项目中消息的接收和发送
```java
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate template = context.getBean(AmqpTemplate.class);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");
........
@Configuration
public class RabbitConfiguration {
@Bean
public CachingConnectionFactory connectionFactory() {
return new CachingConnectionFactory("localhost");
}
@Bean
public RabbitAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
public Queue myQueue() {
return new Queue("myqueue");
}
}
```
### SpringBoot自动装配配置
SpringBoot会自动配置bean对象的结构按如下样例所示
```java
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public ApplicationRunner runner(AmqpTemplate template) {
return args -> template.convertAndSend("myqueue", "foo");
}
@Bean
public Queue myQueue() {
return new Queue("myqueue");
}
@RabbitListener(queues = "myqueue")
public void listen(String in) {
System.out.println(in);
}
}
```
## AMQP抽象
Spring AMQP由两部分组成`spring-amqp``spring-rabbit`。其中`spring-amqp`为抽象层不依赖于任何AMQP Broker实现或client library。因此开发者只需要针对抽象层进行开发开发的代码也能够跨amqp实现进行移植。该抽象层通过`spring-rabbit`来进行实现目前只有RABBITMQ的实现。
AMQP在协议层进行操作可以通过RabbitMQ的客户端与任何实现了AMQP相同版本协议的broker进行通信。
### Message
Spring AMQP定义了Message类作为AMQP模型中对消息的抽象Message中既包含了消息的内容也包含了消息的属性如下展示了Message类的定义
```java
public class Message {
private final MessageProperties messageProperties;
private final byte[] body;
public Message(byte[] body, MessageProperties messageProperties) {
this.body = body;
this.messageProperties = messageProperties;
}
public byte[] getBody() {
return this.body;
}
public MessageProperties getMessageProperties() {
return this.messageProperties;
}
}
```
`MessageProperties`类中定义了通用的属性,例如'messageId', 'timestamp', 'contentType'等。除了这些属性外,还可以通过`setHeader(String key, Object value)`方法来自定义一些header。
### Exchange
Exchange接口代表了AMQP Exchange消息生产者将消息发送到Exchange。每个Exchange都位于broker的virtual host中并含有一个唯一的name并且Exchange还有其他属性。
```java
public interface Exchange {
String getName();
String getExchangeType();
boolean isDurable();
boolean isAutoDelete();
Map<String, Object> getArguments();
}
```
#### Exchange参数
- isDurable当isDurable参数被设置为true时该交换机即使当broker重启仍然会存在如果isDurable参数被设置为false当broker重启之后交换机必须重新declare
- autoDelete当最后一个queue从交换机解绑定时交换机将会被删除
> 默认情况下Spring Exchange中创建的交换机isDurable属性为trueautoDelete属性为false
#### Exchange类型
交换机类型通过`ExchangeTypes`中的常量进行表示常用的类型如下direct, topic, fanout, and headers。
不同类型交换机之间的区别,在于它们如何处理与队列之间的绑定:
- directdirect exchange会让一个queue被绑定到一个固定的routing key
- topictopic exchange支持通过routing pattern来进行绑定routing pattern中包含`*``#`通配符,其中`*` 代表一个任意word`#`代表0个或多个任意word
> ### routing_key格式
> routging_key组成格式为一个由`.`符号分隔的word列表形式如下
> `stock.usd.nyse`,`nyse.vmw`,`quick.orange.rabbit`word的数量任意但是大小需要控制在256字节以内
> topic exchange中的通配符如下`#`和`*`,其中`*`可以匹配一个word而`#`则是可以匹配0个或多个word
> 例如`*.orange.*``*.*.rabbit``lazy.#`
- fanoutfanout exchange会将消息发布到所有绑定到交换机上的队列而不会考虑是否routing key
#### default Exchange
AMQP协议要求所有broker实现需要提供一个默认的exchange该默认交换机没有名称name为空字符串且类型为direct。所有创建的queue都会自动绑定到该交换机并且routing_key和queueName相同。
### Queue
Queue代表消息消费者接收消息的组件。如下展示了Queue类结构
```java
public class Queue {
private final String name;
private volatile boolean durable;
private volatile boolean exclusive;
private volatile boolean autoDelete;
private volatile Map<String, Object> arguments;
/**
* The queue is durable, non-exclusive and non auto-delete.
*
* @param name the name of the queue.
*/
public Queue(String name) {
this(name, true, false, false);
}
// Getters and Setters omitted for brevity
}
```
Queue构造器接收queue name取决于实现adminTemplate会提供方法来产生唯一的named queue。这些队列可以用于reply address故而这些队列的exclusive和autodelete属性都应该被设置为true。
#### Queue的参数
- durable该队列当broker重启时是否仍会存在如果设置为falsebroker重启之后queue必须被重新decalre
- exclusive只能够被一个连接使用当该连接断开之后queue也会被删除
- auto-delete当最后一个consumer不再订阅该queue时该queue会被自动删除
> 当声明一个queue时如果该queue不存在那么queue会被创建如果该queue已经存在并且声明的属性和已存在队列的属性相同那么不会发生任何事如果queue已经存在并且声明属性和已存在队列不同那么会抛出异常并且406的异常codePRECONDITION_FAILED会被返回
### Binding
生产者向exchange发送消息而消费者会从queue接收消息而exchange和queue之间的关系通过binding表示。
#### 将queue绑定到DirectExchange
```java
// routing_key为固定的"foo.bar"
new Binding(someQueue, someDirectExchange, "foo.bar");
```
#### 将queue绑定到TopicExchange
```java
// routing_key为pattern “foo.*"
new Binding(someQueue, someTopicExchange, "foo.*");
```
#### 将queue绑定到FanoutExchange
```java
// 绑定到fanout交换机时无需routing_key
new Binding(someQueue, someFanoutExchange);
```
#### 通过BindingBuilder来创建Binding
```java
Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");
```
## 连接和资源管理
管理到RabbitMQ broker连接的是`ConnectionFactory`接口,该接口的职责是提供`org.springframework.amqp.rabbit.connection.Connection`连接
### 选择ConnectionFactory
有三种connectionFactory可供选择
- PooledChannelConnectionFactory
- ThreadChannelConnectionFactory
- CachingConnectionFactory
在大多数情况下应该使用PooledChannelConnectionFactory。
#### PooledChannelConnectionFactory
该factory管理一个连接和两个channel pool其中一个channel pool是事务channel另一个channel pool用于非事务的channel。
```java
@Bean
PooledChannelConnectionFactory pcf() throws Exception {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory);
pcf.setPoolConfigurer((pool, tx) -> {
if (tx) {
// configure the transactional pool
}
else {
// configure the non-transactional pool
}
});
return pcf;
}
```
#### ThreadChannelConnectionFactory
该factory管理一个连接和两个ThreadLocal一个用于事务channel一个用于非事务channel。该factory会保证位于同一线程的操作会使用同一个channel。
该特性允许在不借助Scoped操作的前提下进行严格的消息排序。为了避免内存泄漏如果你你的应用使用了很多生命周期很短的线程必须手动调用factory的`closeThreadChannel()`方法来释放channel资源。
#### CachingConnectionFactory
CachingConnectionFactory会建立一个在应用程序范围内共享的connection proxy。共享连接是可能的因为与AMQP进行消息传递的其实是Channel而connection实例提供了createChnanel方法。
`CachingConnectionFactory`实现支持缓存channel对象并且对事务channel和非事务channel进行分别缓存。
在创建CachingConnectionFactory时可以通过构造器提供hostname参数还可以提供usernmae和password参数。
如果要设置channel cache size默认情况下为25可以调用`setChannelCacheSize()`方法。
也可以设置CachingConnectionFactory来缓存connection每次调用`createConnection()`方法都会创建一个新的连接关闭一个连接则是会将其返还到connection cache中。在这些connection创建的channel也会被缓存。如果要缓存这些连接需要将`cacheMode`设置为`CacheMode.CONNECTION`
#### cache size并不是limit
cache size并不会限制应用中使用channel的数量但是限制channel被缓存的数量。当cache size被设置为10时应用中可以使用任意多数量的channel。如果由超过10个channel被使用那么当这些channel被返还到cache时只会有10个channel被缓存其他的channel都会被物理关闭。
默认情况下cache size被设置为25在多线程环境下如果cache size被设置的过小那么会频繁的对channel进行创建和关闭。可以在RabbitMQ Admin UI界面实时监控channel数量并且对cache size进行调整。
#### 通过CachingConnectionFactory来创建来连接
```java
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.createConnection();
```
### 连接到集群
如果要连接到一个集群,需要设置`CachingConnectionFactory``addresses`属性:
```java
@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
return ccf;
}
```
默认情况下当新连接创建时会随机连接到集群中的一个host如果想要从第一个尝试到最后一个可以设置`ddressShuffleMode`属性为`AddressShuffleMode.NONE`
### Publisher Confirm and Returns
confirm和返回消息可以通过设置`CachingConnectionFactory`来支持,将`CachingConnectionFactory``publisherConfirmType`属性设置为`ConfirmType.CORRELATED`并将`publisherReturns`的属性设置为true。
当这些选项设置后被factory创建的Channel实例将会被包装在`PublisherCallbackChannel`中。当这些channel对象被获取后使用者可以向channel中注册一个`PublisherCallbackChannel.Listener`回调。`PublisherCallbackChannel`实现中含有逻辑来将confirm或return路由到特定的listener中。
#### Publisher Confirm
由于网络故障或是其他原因client向socket中写入的数据无法保证被传输到server端并被处理可能会出现数据丢失的情况。
在使用标准AMQP 0-9-1情况下唯一保证消息不被丢失的方法是使用事务令channel是事务的并且对消息进行发布和提交。在这种情况下不必要的事务会极大的影响吞吐量和效率会将吞吐量降低250倍左右。为了解决这种情况引入了确认机制。
要启用确认机制client会发送`confirm.select`broker则会回复`confirm.select-ok`。一旦`confirm.select`在channel中被使用则意味着confirm mode已经开启一个事务channel无法开启确认机制且一个确认机制开启的channel也无法将其设置为事务的。
一旦channel处于确认模式client和broker都会计算消息数量从1开始即从confirm.select开始broker在确认消息时会向channel中发送`basic.ack`,而`delivery-tag`则是包含了确认消息的序列号。broker也会在`basic.ack`中设置`multiple`属性,代表该消息和位于该消息之前的消息都被确认。
## Amqp Template
Spring AMQP提供了高层抽象的template在该tempalte中定义了接收消息和发送消息的主要操作。
### 添加重试功能
可以配置RabbitTemplate使用RetryTemplate如下样例展示了exponential back off policy和默认的SimpleRetryPolicy该policy会在抛异常之前重试3次。
```java
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(500);
backOffPolicy.setMultiplier(10.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
template.setRetryTemplate(retryTemplate);
return template;
}
```
### 异步发布
发布消息是一个异步的机制默认情况下rabbitMQ会将不能路由的消息丢弃。为了成功的发布需要获取异步的confirm。考虑如下两种失败场景
- 发送到一个exchange但是没有匹配的queue
- 发送到一个不存在的exchange
第一种场景由publisher returns涵盖。
对于第二种场景消息将会被丢弃并且没有任何返回channel将会被关闭并且抛出异常。默认情况下异常将会在日志中打印但是可以注册一个ChannelListener到CachingConnectionFactory用于获取如下事件。
```java
this.connectionFactory.addConnectionListener(new ConnectionListener() {
@Override
public void onCreate(Connection connection) {
}
@Override
public void onShutDown(ShutdownSignalException signal) {
...
}
});
```
可以检查signal的reason属性从而确定产生的问题。