diff --git a/spring/AMQP/Spring AMQP RabbitMQ.md b/spring/AMQP/Spring AMQP RabbitMQ.md new file mode 100644 index 0000000..faaeb25 --- /dev/null +++ b/spring/AMQP/Spring AMQP RabbitMQ.md @@ -0,0 +1,296 @@ +# Spring AMQP +Spring AMQP将spring项目中的核心理念应用到了基于AMQP的消息解决方案的开发。项目中提供了一个高级别抽象的“template”来发送和接收消息。 +## 示例 +### Spring配置 +如下示例展示了Spring项目中消息的接收和发送: +```java +ApplicationContext context = + new AnnotationConfigApplicationContext(RabbitConfiguration.class); +AmqpTemplate template = context.getBean(AmqpTemplate.class); +template.convertAndSend("myqueue", "foo"); +String foo = (String) template.receiveAndConvert("myqueue"); + +........ + +@Configuration +public class RabbitConfiguration { + + @Bean + public CachingConnectionFactory connectionFactory() { + return new CachingConnectionFactory("localhost"); + } + + @Bean + public RabbitAdmin amqpAdmin() { + return new RabbitAdmin(connectionFactory()); + } + + @Bean + public RabbitTemplate rabbitTemplate() { + return new RabbitTemplate(connectionFactory()); + } + + @Bean + public Queue myQueue() { + return new Queue("myqueue"); + } +} + + +``` +### SpringBoot自动装配配置 +SpringBoot会自动配置bean对象的结构,按如下样例所示: +```java +@SpringBootApplication +public class Application { + + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } + + @Bean + public ApplicationRunner runner(AmqpTemplate template) { + return args -> template.convertAndSend("myqueue", "foo"); + } + + @Bean + public Queue myQueue() { + return new Queue("myqueue"); + } + + @RabbitListener(queues = "myqueue") + public void listen(String in) { + System.out.println(in); + } + +} +``` +## AMQP抽象 +Spring AMQP由两部分组成:`spring-amqp`和`spring-rabbit`。其中`spring-amqp`为抽象层,不依赖于任何AMQP Broker实现或client library。因此开发者只需要针对抽象层进行开发,开发的代码也能够跨amqp实现进行移植。该抽象层通过`spring-rabbit`来进行实现,目前只有RABBITMQ的实现。 +AMQP在协议层进行操作,可以通过RabbitMQ的客户端与任何实现了AMQP相同版本协议的broker进行通信。 +### Message +Spring AMQP定义了Message类作为AMQP模型中对消息的抽象,Message中既包含了消息的内容,也包含了消息的属性,如下展示了Message类的定义: +```java +public class Message { + + private final MessageProperties messageProperties; + + private final byte[] body; + + public Message(byte[] body, MessageProperties messageProperties) { + this.body = body; + this.messageProperties = messageProperties; + } + + public byte[] getBody() { + return this.body; + } + + public MessageProperties getMessageProperties() { + return this.messageProperties; + } +} +``` +`MessageProperties`类中定义了通用的属性,例如'messageId', 'timestamp', 'contentType'等。除了这些属性外,还可以通过`setHeader(String key, Object value)`方法来自定义一些header。 +### Exchange +Exchange接口代表了AMQP Exchange,消息生产者将消息发送到Exchange。每个Exchange都位于broker的virtual host中,并含有一个唯一的name,并且Exchange还有其他属性。 +```java +public interface Exchange { + + String getName(); + + String getExchangeType(); + + boolean isDurable(); + + boolean isAutoDelete(); + + Map getArguments(); + +} +``` +#### Exchange参数 +- isDurable:当isDurable参数被设置为true时,该交换机即使当broker重启仍然会存在,如果isDurable参数被设置为false,当broker重启之后,交换机必须重新declare +- autoDelete:当最后一个queue从交换机解绑定时,交换机将会被删除 +> 默认情况下,Spring Exchange中创建的交换机,isDurable属性为true,autoDelete属性为false + +#### Exchange类型 +交换机类型通过`ExchangeTypes`中的常量进行表示,常用的类型如下:direct, topic, fanout, and headers。 +不同类型交换机之间的区别,在于它们如何处理与队列之间的绑定: +- direct:direct exchange会让一个queue被绑定到一个固定的routing key +- topic:topic exchange支持通过routing pattern来进行绑定,routing pattern中包含`*`和`#`通配符,其中`*` 代表一个任意word,而`#`代表0个或多个任意word + > ### routing_key格式 + > routging_key组成格式为一个由`.`符号分隔的word列表,形式如下: + > `stock.usd.nyse`,`nyse.vmw`,`quick.orange.rabbit`,word的数量任意,但是大小需要控制在256字节以内 + > topic exchange中的通配符如下:`#`和`*`,其中`*`可以匹配一个word,而`#`则是可以匹配0个或多个word + > 例如`*.orange.*`,`*.*.rabbit`,`lazy.#` +- fanout:fanout exchange会将消息发布到所有绑定到交换机上的队列,而不会考虑是否routing key +#### default Exchange +AMQP协议要求所有broker实现需要提供一个默认的exchange,该默认交换机没有名称(name为空字符串),且类型为direct。所有创建的queue都会自动绑定到该交换机,并且routing_key和queueName相同。 + +### Queue +Queue代表消息消费者接收消息的组件。如下展示了Queue类结构: +```java +public class Queue { + + private final String name; + + private volatile boolean durable; + + private volatile boolean exclusive; + + private volatile boolean autoDelete; + + private volatile Map arguments; + + /** + * The queue is durable, non-exclusive and non auto-delete. + * + * @param name the name of the queue. + */ + public Queue(String name) { + this(name, true, false, false); + } + + // Getters and Setters omitted for brevity + +} +``` +Queue构造器接收queue name,取决于实现,adminTemplate会提供方法来产生唯一的named queue。这些队列可以用于reply address,故而这些队列的exclusive和autodelete属性都应该被设置为true。 +#### Queue的参数 +- durable:该队列当broker重启时是否仍会存在,如果设置为false,broker重启之后,queue必须被重新decalre +- exclusive:只能够被一个连接使用,当该连接断开之后,queue也会被删除 +- auto-delete:当最后一个consumer不再订阅该queue时,该queue会被自动删除 + +> 当声明一个queue时,如果该queue不存在,那么queue会被创建;如果该queue已经存在,并且声明的属性和已存在队列的属性相同,那么不会发生任何事;如果queue已经存在,并且声明属性和已存在队列不同,那么会抛出异常,并且406的异常code(PRECONDITION_FAILED)会被返回 + +### Binding +生产者向exchange发送消息,而消费者会从queue接收消息,而exchange和queue之间的关系通过binding表示。 +#### 将queue绑定到DirectExchange +```java +// routing_key为固定的"foo.bar" +new Binding(someQueue, someDirectExchange, "foo.bar"); +``` +#### 将queue绑定到TopicExchange +```java +// routing_key为pattern “foo.*" +new Binding(someQueue, someTopicExchange, "foo.*"); +``` +#### 将queue绑定到FanoutExchange +```java +// 绑定到fanout交换机时,无需routing_key +new Binding(someQueue, someFanoutExchange); +``` +#### 通过BindingBuilder来创建Binding +```java +Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*"); +``` +## 连接和资源管理 +管理到RabbitMQ broker连接的是`ConnectionFactory`接口,该接口的职责是提供`org.springframework.amqp.rabbit.connection.Connection`连接 +### 选择ConnectionFactory +有三种connectionFactory可供选择: +- PooledChannelConnectionFactory +- ThreadChannelConnectionFactory +- CachingConnectionFactory + +在大多数情况下,应该使用PooledChannelConnectionFactory。 +#### PooledChannelConnectionFactory +该factory管理一个连接和两个channel pool,其中一个channel pool是事务channel,另一个channel pool用于非事务的channel。 +```java +@Bean +PooledChannelConnectionFactory pcf() throws Exception { + ConnectionFactory rabbitConnectionFactory = new ConnectionFactory(); + rabbitConnectionFactory.setHost("localhost"); + PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory); + pcf.setPoolConfigurer((pool, tx) -> { + if (tx) { + // configure the transactional pool + } + else { + // configure the non-transactional pool + } + }); + return pcf; +} + +``` +#### ThreadChannelConnectionFactory +该factory管理一个连接和两个ThreadLocal,一个用于事务channel,一个用于非事务channel。该factory会保证位于同一线程的操作会使用同一个channel。 +该特性允许在不借助Scoped操作的前提下进行严格的消息排序。为了避免内存泄漏,如果你你的应用使用了很多生命周期很短的线程,必须手动调用factory的`closeThreadChannel()`方法来释放channel资源。 +#### CachingConnectionFactory +CachingConnectionFactory会建立一个在应用程序范围内共享的connection proxy。共享连接是可能的,因为与AMQP进行消息传递的其实是Channel,而connection实例提供了createChnanel方法。 +`CachingConnectionFactory`实现支持缓存channel对象,并且对事务channel和非事务channel进行分别缓存。 +在创建CachingConnectionFactory时,可以通过构造器提供hostname参数,还可以提供usernmae和password参数。 +如果要设置channel cache size(默认情况下为25),可以调用`setChannelCacheSize()`方法。 +也可以设置CachingConnectionFactory来缓存connection,每次调用`createConnection()`方法都会创建一个新的连接,关闭一个连接则是会将其返还到connection cache中。在这些connection创建的channel也会被缓存。如果要缓存这些连接,需要将`cacheMode`设置为`CacheMode.CONNECTION`。 +#### cache size并不是limit +cache size并不会限制应用中使用channel的数量,但是限制channel被缓存的数量。当cache size被设置为10时,应用中可以使用任意多数量的channel。如果由超过10个channel被使用,那么当这些channel被返还到cache时,只会有10个channel被缓存,其他的channel都会被物理关闭。 +默认情况下,cache size被设置为25,在多线程环境下,如果cache size被设置的过小,那么会频繁的对channel进行创建和关闭。可以在RabbitMQ Admin UI界面实时监控channel数量,并且对cache size进行调整。 +#### 通过CachingConnectionFactory来创建来连接 +```java +CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost"); +connectionFactory.setUsername("guest"); +connectionFactory.setPassword("guest"); + +Connection connection = connectionFactory.createConnection(); +``` +### 连接到集群 +如果要连接到一个集群,需要设置`CachingConnectionFactory`的`addresses`属性: +```java +@Bean +public CachingConnectionFactory ccf() { + CachingConnectionFactory ccf = new CachingConnectionFactory(); + ccf.setAddresses("host1:5672,host2:5672,host3:5672"); + return ccf; +} +``` +默认情况下,当新连接创建时会随机连接到集群中的一个host,如果想要从第一个尝试到最后一个,可以设置`ddressShuffleMode`属性为`AddressShuffleMode.NONE`。 +### Publisher Confirm and Returns +confirm和返回消息可以通过设置`CachingConnectionFactory`来支持,将`CachingConnectionFactory`的`publisherConfirmType`属性设置为`ConfirmType.CORRELATED`并将`publisherReturns`的属性设置为true。 +当这些选项设置后,被factory创建的Channel实例将会被包装在`PublisherCallbackChannel`中。当这些channel对象被获取后,使用者可以向channel中注册一个`PublisherCallbackChannel.Listener`回调。`PublisherCallbackChannel`实现中含有逻辑来将confirm或return路由到特定的listener中。 +#### Publisher Confirm +由于网络故障或是其他原因,client向socket中写入的数据无法保证被传输到server端并被处理,可能会出现数据丢失的情况。 +在使用标准AMQP 0-9-1情况下,唯一保证消息不被丢失的方法是使用事务:令channel是事务的,并且对消息进行发布和提交。在这种情况下,不必要的事务会极大的影响吞吐量和效率,会将吞吐量降低250倍左右。为了解决这种情况,引入了确认机制。 +要启用确认机制,client会发送`confirm.select`,broker则会回复`confirm.select-ok`。一旦`confirm.select`在channel中被使用,则意味着confirm mode已经开启,一个事务channel无法开启确认机制,且一个确认机制开启的channel也无法将其设置为事务的。 +一旦channel处于确认模式,client和broker都会计算消息数量(从1开始,即从confirm.select开始),broker在确认消息时,会向channel中发送`basic.ack`,而`delivery-tag`则是包含了确认消息的序列号。broker也会在`basic.ack`中设置`multiple`属性,代表该消息和位于该消息之前的消息都被确认。 +## Amqp Template +Spring AMQP提供了高层抽象的template,在该tempalte中定义了接收消息和发送消息的主要操作。 +### 添加重试功能 +可以配置RabbitTemplate使用RetryTemplate,如下样例展示了exponential back off policy和默认的SimpleRetryPolicy,该policy会在抛异常之前重试3次。 +```java +@Bean +public RabbitTemplate rabbitTemplate() { + RabbitTemplate template = new RabbitTemplate(connectionFactory()); + RetryTemplate retryTemplate = new RetryTemplate(); + ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); + backOffPolicy.setInitialInterval(500); + backOffPolicy.setMultiplier(10.0); + backOffPolicy.setMaxInterval(10000); + retryTemplate.setBackOffPolicy(backOffPolicy); + template.setRetryTemplate(retryTemplate); + return template; +} +``` +### 异步发布 +发布消息是一个异步的机制,默认情况下,rabbitMQ会将不能路由的消息丢弃。为了成功的发布,需要获取异步的confirm。考虑如下两种失败场景: +- 发送到一个exchange,但是没有匹配的queue +- 发送到一个不存在的exchange + +第一种场景由publisher returns涵盖。 +对于第二种场景,消息将会被丢弃并且没有任何返回,channel将会被关闭并且抛出异常。默认情况下,异常将会在日志中打印,但是可以注册一个ChannelListener到CachingConnectionFactory,用于获取如下事件。 +```java +this.connectionFactory.addConnectionListener(new ConnectionListener() { + + @Override + public void onCreate(Connection connection) { + } + + @Override + public void onShutDown(ShutdownSignalException signal) { + ... + } + +}); +``` +可以检查signal的reason属性,从而确定产生的问题。 +