# Spring AMQP Spring AMQP将spring项目中的核心理念应用到了基于AMQP的消息解决方案的开发。项目中提供了一个高级别抽象的“template”来发送和接收消息。 ## 示例 ### Spring配置 如下示例展示了Spring项目中消息的接收和发送: ```java ApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfiguration.class); AmqpTemplate template = context.getBean(AmqpTemplate.class); template.convertAndSend("myqueue", "foo"); String foo = (String) template.receiveAndConvert("myqueue"); ........ @Configuration public class RabbitConfiguration { @Bean public CachingConnectionFactory connectionFactory() { return new CachingConnectionFactory("localhost"); } @Bean public RabbitAdmin amqpAdmin() { return new RabbitAdmin(connectionFactory()); } @Bean public RabbitTemplate rabbitTemplate() { return new RabbitTemplate(connectionFactory()); } @Bean public Queue myQueue() { return new Queue("myqueue"); } } ``` ### SpringBoot自动装配配置 SpringBoot会自动配置bean对象的结构,按如下样例所示: ```java @SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } @Bean public ApplicationRunner runner(AmqpTemplate template) { return args -> template.convertAndSend("myqueue", "foo"); } @Bean public Queue myQueue() { return new Queue("myqueue"); } @RabbitListener(queues = "myqueue") public void listen(String in) { System.out.println(in); } } ``` ## AMQP抽象 Spring AMQP由两部分组成:`spring-amqp`和`spring-rabbit`。其中`spring-amqp`为抽象层,不依赖于任何AMQP Broker实现或client library。因此开发者只需要针对抽象层进行开发,开发的代码也能够跨amqp实现进行移植。该抽象层通过`spring-rabbit`来进行实现,目前只有RABBITMQ的实现。 AMQP在协议层进行操作,可以通过RabbitMQ的客户端与任何实现了AMQP相同版本协议的broker进行通信。 ### Message Spring AMQP定义了Message类作为AMQP模型中对消息的抽象,Message中既包含了消息的内容,也包含了消息的属性,如下展示了Message类的定义: ```java public class Message { private final MessageProperties messageProperties; private final byte[] body; public Message(byte[] body, MessageProperties messageProperties) { this.body = body; this.messageProperties = messageProperties; } public byte[] getBody() { return this.body; } public MessageProperties getMessageProperties() { return this.messageProperties; } } ``` `MessageProperties`类中定义了通用的属性,例如'messageId', 'timestamp', 'contentType'等。除了这些属性外,还可以通过`setHeader(String key, Object value)`方法来自定义一些header。 ### Exchange Exchange接口代表了AMQP Exchange,消息生产者将消息发送到Exchange。每个Exchange都位于broker的virtual host中,并含有一个唯一的name,并且Exchange还有其他属性。 ```java public interface Exchange { String getName(); String getExchangeType(); boolean isDurable(); boolean isAutoDelete(); Map getArguments(); } ``` #### Exchange参数 - isDurable:当isDurable参数被设置为true时,该交换机即使当broker重启仍然会存在,如果isDurable参数被设置为false,当broker重启之后,交换机必须重新declare - autoDelete:当最后一个queue从交换机解绑定时,交换机将会被删除 > 默认情况下,Spring Exchange中创建的交换机,isDurable属性为true,autoDelete属性为false #### Exchange类型 交换机类型通过`ExchangeTypes`中的常量进行表示,常用的类型如下:direct, topic, fanout, and headers。 不同类型交换机之间的区别,在于它们如何处理与队列之间的绑定: - direct:direct exchange会让一个queue被绑定到一个固定的routing key - topic:topic exchange支持通过routing pattern来进行绑定,routing pattern中包含`*`和`#`通配符,其中`*` 代表一个任意word,而`#`代表0个或多个任意word > ### routing_key格式 > routging_key组成格式为一个由`.`符号分隔的word列表,形式如下: > `stock.usd.nyse`,`nyse.vmw`,`quick.orange.rabbit`,word的数量任意,但是大小需要控制在256字节以内 > topic exchange中的通配符如下:`#`和`*`,其中`*`可以匹配一个word,而`#`则是可以匹配0个或多个word > 例如`*.orange.*`,`*.*.rabbit`,`lazy.#` - fanout:fanout exchange会将消息发布到所有绑定到交换机上的队列,而不会考虑是否routing key #### default Exchange AMQP协议要求所有broker实现需要提供一个默认的exchange,该默认交换机没有名称(name为空字符串),且类型为direct。所有创建的queue都会自动绑定到该交换机,并且routing_key和queueName相同。 ### Queue Queue代表消息消费者接收消息的组件。如下展示了Queue类结构: ```java public class Queue { private final String name; private volatile boolean durable; private volatile boolean exclusive; private volatile boolean autoDelete; private volatile Map arguments; /** * The queue is durable, non-exclusive and non auto-delete. * * @param name the name of the queue. */ public Queue(String name) { this(name, true, false, false); } // Getters and Setters omitted for brevity } ``` Queue构造器接收queue name,取决于实现,adminTemplate会提供方法来产生唯一的named queue。这些队列可以用于reply address,故而这些队列的exclusive和autodelete属性都应该被设置为true。 #### Queue的参数 - durable:该队列当broker重启时是否仍会存在,如果设置为false,broker重启之后,queue必须被重新decalre - exclusive:只能够被一个连接使用,当该连接断开之后,queue也会被删除 - auto-delete:当最后一个consumer不再订阅该queue时,该queue会被自动删除 > 当声明一个queue时,如果该queue不存在,那么queue会被创建;如果该queue已经存在,并且声明的属性和已存在队列的属性相同,那么不会发生任何事;如果queue已经存在,并且声明属性和已存在队列不同,那么会抛出异常,并且406的异常code(PRECONDITION_FAILED)会被返回 ### Binding 生产者向exchange发送消息,而消费者会从queue接收消息,而exchange和queue之间的关系通过binding表示。 #### 将queue绑定到DirectExchange ```java // routing_key为固定的"foo.bar" new Binding(someQueue, someDirectExchange, "foo.bar"); ``` #### 将queue绑定到TopicExchange ```java // routing_key为pattern “foo.*" new Binding(someQueue, someTopicExchange, "foo.*"); ``` #### 将queue绑定到FanoutExchange ```java // 绑定到fanout交换机时,无需routing_key new Binding(someQueue, someFanoutExchange); ``` #### 通过BindingBuilder来创建Binding ```java Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*"); ``` ## 连接和资源管理 管理到RabbitMQ broker连接的是`ConnectionFactory`接口,该接口的职责是提供`org.springframework.amqp.rabbit.connection.Connection`连接 ### 选择ConnectionFactory 有三种connectionFactory可供选择: - PooledChannelConnectionFactory - ThreadChannelConnectionFactory - CachingConnectionFactory 在大多数情况下,应该使用PooledChannelConnectionFactory。 #### PooledChannelConnectionFactory 该factory管理一个连接和两个channel pool,其中一个channel pool是事务channel,另一个channel pool用于非事务的channel。 ```java @Bean PooledChannelConnectionFactory pcf() throws Exception { ConnectionFactory rabbitConnectionFactory = new ConnectionFactory(); rabbitConnectionFactory.setHost("localhost"); PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory); pcf.setPoolConfigurer((pool, tx) -> { if (tx) { // configure the transactional pool } else { // configure the non-transactional pool } }); return pcf; } ``` #### ThreadChannelConnectionFactory 该factory管理一个连接和两个ThreadLocal,一个用于事务channel,一个用于非事务channel。该factory会保证位于同一线程的操作会使用同一个channel。 该特性允许在不借助Scoped操作的前提下进行严格的消息排序。为了避免内存泄漏,如果你你的应用使用了很多生命周期很短的线程,必须手动调用factory的`closeThreadChannel()`方法来释放channel资源。 #### CachingConnectionFactory CachingConnectionFactory会建立一个在应用程序范围内共享的connection proxy。共享连接是可能的,因为与AMQP进行消息传递的其实是Channel,而connection实例提供了createChnanel方法。 `CachingConnectionFactory`实现支持缓存channel对象,并且对事务channel和非事务channel进行分别缓存。 在创建CachingConnectionFactory时,可以通过构造器提供hostname参数,还可以提供usernmae和password参数。 如果要设置channel cache size(默认情况下为25),可以调用`setChannelCacheSize()`方法。 也可以设置CachingConnectionFactory来缓存connection,每次调用`createConnection()`方法都会创建一个新的连接,关闭一个连接则是会将其返还到connection cache中。在这些connection创建的channel也会被缓存。如果要缓存这些连接,需要将`cacheMode`设置为`CacheMode.CONNECTION`。 #### cache size并不是limit cache size并不会限制应用中使用channel的数量,但是限制channel被缓存的数量。当cache size被设置为10时,应用中可以使用任意多数量的channel。如果由超过10个channel被使用,那么当这些channel被返还到cache时,只会有10个channel被缓存,其他的channel都会被物理关闭。 默认情况下,cache size被设置为25,在多线程环境下,如果cache size被设置的过小,那么会频繁的对channel进行创建和关闭。可以在RabbitMQ Admin UI界面实时监控channel数量,并且对cache size进行调整。 #### 通过CachingConnectionFactory来创建来连接 ```java CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); Connection connection = connectionFactory.createConnection(); ``` ### 连接到集群 如果要连接到一个集群,需要设置`CachingConnectionFactory`的`addresses`属性: ```java @Bean public CachingConnectionFactory ccf() { CachingConnectionFactory ccf = new CachingConnectionFactory(); ccf.setAddresses("host1:5672,host2:5672,host3:5672"); return ccf; } ``` 默认情况下,当新连接创建时会随机连接到集群中的一个host,如果想要从第一个尝试到最后一个,可以设置`ddressShuffleMode`属性为`AddressShuffleMode.NONE`。 ### Publisher Confirm and Returns confirm和返回消息可以通过设置`CachingConnectionFactory`来支持,将`CachingConnectionFactory`的`publisherConfirmType`属性设置为`ConfirmType.CORRELATED`并将`publisherReturns`的属性设置为true。 当这些选项设置后,被factory创建的Channel实例将会被包装在`PublisherCallbackChannel`中。当这些channel对象被获取后,使用者可以向channel中注册一个`PublisherCallbackChannel.Listener`回调。`PublisherCallbackChannel`实现中含有逻辑来将confirm或return路由到特定的listener中。 #### Publisher Confirm 由于网络故障或是其他原因,client向socket中写入的数据无法保证被传输到server端并被处理,可能会出现数据丢失的情况。 在使用标准AMQP 0-9-1情况下,唯一保证消息不被丢失的方法是使用事务:令channel是事务的,并且对消息进行发布和提交。在这种情况下,不必要的事务会极大的影响吞吐量和效率,会将吞吐量降低250倍左右。为了解决这种情况,引入了确认机制。 要启用确认机制,client会发送`confirm.select`,broker则会回复`confirm.select-ok`。一旦`confirm.select`在channel中被使用,则意味着confirm mode已经开启,一个事务channel无法开启确认机制,且一个确认机制开启的channel也无法将其设置为事务的。 一旦channel处于确认模式,client和broker都会计算消息数量(从1开始,即从confirm.select开始),broker在确认消息时,会向channel中发送`basic.ack`,而`delivery-tag`则是包含了确认消息的序列号。broker也会在`basic.ack`中设置`multiple`属性,代表该消息和位于该消息之前的消息都被确认。 ## Amqp Template Spring AMQP提供了高层抽象的template,在该tempalte中定义了接收消息和发送消息的主要操作。 ### 添加重试功能 可以配置RabbitTemplate使用RetryTemplate,如下样例展示了exponential back off policy和默认的SimpleRetryPolicy,该policy会在抛异常之前重试3次。 ```java @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); RetryTemplate retryTemplate = new RetryTemplate(); ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); backOffPolicy.setInitialInterval(500); backOffPolicy.setMultiplier(10.0); backOffPolicy.setMaxInterval(10000); retryTemplate.setBackOffPolicy(backOffPolicy); template.setRetryTemplate(retryTemplate); return template; } ``` ### 异步发布 发布消息是一个异步的机制,默认情况下,rabbitMQ会将不能路由的消息丢弃。为了成功的发布,需要获取异步的confirm。考虑如下两种失败场景: - 发送到一个exchange,但是没有匹配的queue - 发送到一个不存在的exchange 第一种场景由publisher returns涵盖。 对于第二种场景,消息将会被丢弃并且没有任何返回,channel将会被关闭并且抛出异常。默认情况下,异常将会在日志中打印,但是可以注册一个ChannelListener到CachingConnectionFactory,用于获取如下事件。 ```java this.connectionFactory.addConnectionListener(new ConnectionListener() { @Override public void onCreate(Connection connection) { } @Override public void onShutDown(ShutdownSignalException signal) { ... } }); ``` 可以检查signal的reason属性,从而确定产生的问题。 ### Correlated Publisher Confirms and Returns RabbitTemplate中实现了AmqpTemplate中对于发布确认和Returns的支持。 #### Rabbitmq Returns 通过Rabbitmq完成RPC调用,消息中需要含有如下属性: - deliveryMode: 将消息标识为持久化的或者暂时的,如果该属性值为2,则消息是持久化的,任何其他值消息都是暂时的 - contentType:用于表示消息的mime-type,例如可以是application/json - replyTo:通常用于命名一个callback queue - correlationId:该属性用于关联rpc响应和请求 ##### Correlation Id 如果对来自所有客户端的rpc请求,都只创建一个callback queue,该方案效率会很低。因此,应该为每一个客户端都创建一个callback queue。 > 因为如果所有客户端使用一条callback queue,那么传递给callback queue的rpc响应会广播给所有订阅的客户端,然后再用客户端丢弃或匹配。这样会极大影响吞吐量。 在为每一个client创建一个callback queue后,从callback queue中接收到的响应再通过`correlationId`属性同请求关联到一起。如果correlationid关联的请求不存在,那么只需要安全的丢弃该请求即可。 ##### Rabitmq实现RPC流程 - 对于rpc请求,客户端发送消息时消息应该携带两个属性:`replyTo`:replyTo是一个匿名的独占队列(exclusive为true,当该队列只能由一个连接使用,并且当连接断开时会自动删除),`correleationId`则是一个唯一的值,用于将请求和响应关联起来 - 请求将会被发送到rpc_queue - rpc worker将会从rpc_queue中拉取消息,并对拉取数据进行处理。处理完成之后,会将结果封装在消息中并且发送给客户端,消息会被发送到`replyTo`属性指定的queue - client等待reply_queue中的响应消息,当拉取到响应消息之后,其会检查correlationId属性,并根据其来匹配响应对应的请求 #### Spring AMQP Returns 对于返回的消息, template的`mandatory`属性必须要被设置为true,或`mandatory-expression`表达式必须要为true。该特性要求CachingConnectionFactory的`publisherReturns`属性被设置为true。返回将会通过`setReturnsCallback(ReturnsCallback callback)`方法注册`RabbitTemplate.ReturnsCallback`回调来发送到客户端,该回调必须实现如下方法: ```java void returnedMessage(ReturnedMessage returned); ``` > 在通过setReturnsCallback来注册回调时,每个rabbitTemplate只能够注册一个ReturnsCallback ReturnedMessage必须含有如下属性: - message:返回消息本身 - replyCode:代表return reason的code - replyText:一个文本的return reason,例如`NO_ROUTE` - exchange:消息发送到的交换机 - routing key:使用的routing key #### 发布确认 对于发布确认,该template需要一个CachingConnectionFactory,并且CachingConnectionFactory的`publisherConfirm`属性需要被设置为`ConfirmType.CORRELATED`。确认将会被发送给客户端,可以通过`setConfirmCallback(ConfirmCallback callback)`注册一个`RabbitTemplate.ConfirmCallback`方法。该回调需要实现如下方法: ```java void confirm(CorrelationData correlationData, boolean ack, String cause); ``` 其中correlationData是客户端在发送消息时提供的原始消息对象。`ack`参数如果为`true`代表`ack`,为`false`则是代表`nack`。对于`nack`,`cause`参数将会包含`nack`的原因。 > 例如,如果将消息发送给一个不存在的交换机,在这种情况下,broker将会关闭channel,关闭的原因则会包含在`cause`中。 ConfirmCallback只在RabbitTemplate中被支持。 如果同时启用了发布确认和返回,且消息无法被路由到任何队列中,那么CorrelationData的return属性将会被注入到返回的消息中。其会保证返回消息的属性会在 ### Scoped Operations 通常情况下,在使用template时,Channel从cache中获取,使用完毕之后再被返还到cache中以便重用。在多线程环境下,并不会保证线程在下次使用Channel时会使用一样的Channel。