19 KiB
Spring AMQP
Spring AMQP将spring项目中的核心理念应用到了基于AMQP的消息解决方案的开发。项目中提供了一个高级别抽象的“template”来发送和接收消息。
示例
Spring配置
如下示例展示了Spring项目中消息的接收和发送:
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate template = context.getBean(AmqpTemplate.class);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");
........
@Configuration
public class RabbitConfiguration {
@Bean
public CachingConnectionFactory connectionFactory() {
return new CachingConnectionFactory("localhost");
}
@Bean
public RabbitAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
public Queue myQueue() {
return new Queue("myqueue");
}
}
SpringBoot自动装配配置
SpringBoot会自动配置bean对象的结构,按如下样例所示:
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public ApplicationRunner runner(AmqpTemplate template) {
return args -> template.convertAndSend("myqueue", "foo");
}
@Bean
public Queue myQueue() {
return new Queue("myqueue");
}
@RabbitListener(queues = "myqueue")
public void listen(String in) {
System.out.println(in);
}
}
AMQP抽象
Spring AMQP由两部分组成:spring-amqp和spring-rabbit。其中spring-amqp为抽象层,不依赖于任何AMQP Broker实现或client library。因此开发者只需要针对抽象层进行开发,开发的代码也能够跨amqp实现进行移植。该抽象层通过spring-rabbit来进行实现,目前只有RABBITMQ的实现。
AMQP在协议层进行操作,可以通过RabbitMQ的客户端与任何实现了AMQP相同版本协议的broker进行通信。
Message
Spring AMQP定义了Message类作为AMQP模型中对消息的抽象,Message中既包含了消息的内容,也包含了消息的属性,如下展示了Message类的定义:
public class Message {
private final MessageProperties messageProperties;
private final byte[] body;
public Message(byte[] body, MessageProperties messageProperties) {
this.body = body;
this.messageProperties = messageProperties;
}
public byte[] getBody() {
return this.body;
}
public MessageProperties getMessageProperties() {
return this.messageProperties;
}
}
MessageProperties类中定义了通用的属性,例如'messageId', 'timestamp', 'contentType'等。除了这些属性外,还可以通过setHeader(String key, Object value)方法来自定义一些header。
Exchange
Exchange接口代表了AMQP Exchange,消息生产者将消息发送到Exchange。每个Exchange都位于broker的virtual host中,并含有一个唯一的name,并且Exchange还有其他属性。
public interface Exchange {
String getName();
String getExchangeType();
boolean isDurable();
boolean isAutoDelete();
Map<String, Object> getArguments();
}
Exchange参数
- isDurable:当isDurable参数被设置为true时,该交换机即使当broker重启仍然会存在,如果isDurable参数被设置为false,当broker重启之后,交换机必须重新declare
- autoDelete:当最后一个queue从交换机解绑定时,交换机将会被删除
默认情况下,Spring Exchange中创建的交换机,isDurable属性为true,autoDelete属性为false
Exchange类型
交换机类型通过ExchangeTypes中的常量进行表示,常用的类型如下:direct, topic, fanout, and headers。
不同类型交换机之间的区别,在于它们如何处理与队列之间的绑定:
- direct:direct exchange会让一个queue被绑定到一个固定的routing key
- topic:topic exchange支持通过routing pattern来进行绑定,routing pattern中包含
*和#通配符,其中*代表一个任意word,而#代表0个或多个任意wordrouting_key格式
routging_key组成格式为一个由
.符号分隔的word列表,形式如下:stock.usd.nyse,nyse.vmw,quick.orange.rabbit,word的数量任意,但是大小需要控制在256字节以内
topic exchange中的通配符如下:#和*,其中*可以匹配一个word,而#则是可以匹配0个或多个word
例如*.orange.*,*.*.rabbit,lazy.# - fanout:fanout exchange会将消息发布到所有绑定到交换机上的队列,而不会考虑是否routing key
default Exchange
AMQP协议要求所有broker实现需要提供一个默认的exchange,该默认交换机没有名称(name为空字符串),且类型为direct。所有创建的queue都会自动绑定到该交换机,并且routing_key和queueName相同。
Queue
Queue代表消息消费者接收消息的组件。如下展示了Queue类结构:
public class Queue {
private final String name;
private volatile boolean durable;
private volatile boolean exclusive;
private volatile boolean autoDelete;
private volatile Map<String, Object> arguments;
/**
* The queue is durable, non-exclusive and non auto-delete.
*
* @param name the name of the queue.
*/
public Queue(String name) {
this(name, true, false, false);
}
// Getters and Setters omitted for brevity
}
Queue构造器接收queue name,取决于实现,adminTemplate会提供方法来产生唯一的named queue。这些队列可以用于reply address,故而这些队列的exclusive和autodelete属性都应该被设置为true。
Queue的参数
- durable:该队列当broker重启时是否仍会存在,如果设置为false,broker重启之后,queue必须被重新decalre
- exclusive:只能够被一个连接使用,当该连接断开之后,queue也会被删除
- auto-delete:当最后一个consumer不再订阅该queue时,该queue会被自动删除
当声明一个queue时,如果该queue不存在,那么queue会被创建;如果该queue已经存在,并且声明的属性和已存在队列的属性相同,那么不会发生任何事;如果queue已经存在,并且声明属性和已存在队列不同,那么会抛出异常,并且406的异常code(PRECONDITION_FAILED)会被返回
Binding
生产者向exchange发送消息,而消费者会从queue接收消息,而exchange和queue之间的关系通过binding表示。
将queue绑定到DirectExchange
// routing_key为固定的"foo.bar"
new Binding(someQueue, someDirectExchange, "foo.bar");
将queue绑定到TopicExchange
// routing_key为pattern “foo.*"
new Binding(someQueue, someTopicExchange, "foo.*");
将queue绑定到FanoutExchange
// 绑定到fanout交换机时,无需routing_key
new Binding(someQueue, someFanoutExchange);
通过BindingBuilder来创建Binding
Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");
连接和资源管理
管理到RabbitMQ broker连接的是ConnectionFactory接口,该接口的职责是提供org.springframework.amqp.rabbit.connection.Connection连接
选择ConnectionFactory
有三种connectionFactory可供选择:
- PooledChannelConnectionFactory
- ThreadChannelConnectionFactory
- CachingConnectionFactory
在大多数情况下,应该使用PooledChannelConnectionFactory。
PooledChannelConnectionFactory
该factory管理一个连接和两个channel pool,其中一个channel pool是事务channel,另一个channel pool用于非事务的channel。
@Bean
PooledChannelConnectionFactory pcf() throws Exception {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory);
pcf.setPoolConfigurer((pool, tx) -> {
if (tx) {
// configure the transactional pool
}
else {
// configure the non-transactional pool
}
});
return pcf;
}
ThreadChannelConnectionFactory
该factory管理一个连接和两个ThreadLocal,一个用于事务channel,一个用于非事务channel。该factory会保证位于同一线程的操作会使用同一个channel。
该特性允许在不借助Scoped操作的前提下进行严格的消息排序。为了避免内存泄漏,如果你你的应用使用了很多生命周期很短的线程,必须手动调用factory的closeThreadChannel()方法来释放channel资源。
CachingConnectionFactory
CachingConnectionFactory会建立一个在应用程序范围内共享的connection proxy。共享连接是可能的,因为与AMQP进行消息传递的其实是Channel,而connection实例提供了createChnanel方法。
CachingConnectionFactory实现支持缓存channel对象,并且对事务channel和非事务channel进行分别缓存。
在创建CachingConnectionFactory时,可以通过构造器提供hostname参数,还可以提供usernmae和password参数。
如果要设置channel cache size(默认情况下为25),可以调用setChannelCacheSize()方法。
也可以设置CachingConnectionFactory来缓存connection,每次调用createConnection()方法都会创建一个新的连接,关闭一个连接则是会将其返还到connection cache中。在这些connection创建的channel也会被缓存。如果要缓存这些连接,需要将cacheMode设置为CacheMode.CONNECTION。
cache size并不是limit
cache size并不会限制应用中使用channel的数量,但是限制channel被缓存的数量。当cache size被设置为10时,应用中可以使用任意多数量的channel。如果由超过10个channel被使用,那么当这些channel被返还到cache时,只会有10个channel被缓存,其他的channel都会被物理关闭。
默认情况下,cache size被设置为25,在多线程环境下,如果cache size被设置的过小,那么会频繁的对channel进行创建和关闭。可以在RabbitMQ Admin UI界面实时监控channel数量,并且对cache size进行调整。
通过CachingConnectionFactory来创建来连接
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.createConnection();
连接到集群
如果要连接到一个集群,需要设置CachingConnectionFactory的addresses属性:
@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
return ccf;
}
默认情况下,当新连接创建时会随机连接到集群中的一个host,如果想要从第一个尝试到最后一个,可以设置ddressShuffleMode属性为AddressShuffleMode.NONE。
Publisher Confirm and Returns
confirm和返回消息可以通过设置CachingConnectionFactory来支持,将CachingConnectionFactory的publisherConfirmType属性设置为ConfirmType.CORRELATED并将publisherReturns的属性设置为true。
当这些选项设置后,被factory创建的Channel实例将会被包装在PublisherCallbackChannel中。当这些channel对象被获取后,使用者可以向channel中注册一个PublisherCallbackChannel.Listener回调。PublisherCallbackChannel实现中含有逻辑来将confirm或return路由到特定的listener中。
Publisher Confirm
由于网络故障或是其他原因,client向socket中写入的数据无法保证被传输到server端并被处理,可能会出现数据丢失的情况。
在使用标准AMQP 0-9-1情况下,唯一保证消息不被丢失的方法是使用事务:令channel是事务的,并且对消息进行发布和提交。在这种情况下,不必要的事务会极大的影响吞吐量和效率,会将吞吐量降低250倍左右。为了解决这种情况,引入了确认机制。
要启用确认机制,client会发送confirm.select,broker则会回复confirm.select-ok。一旦confirm.select在channel中被使用,则意味着confirm mode已经开启,一个事务channel无法开启确认机制,且一个确认机制开启的channel也无法将其设置为事务的。
一旦channel处于确认模式,client和broker都会计算消息数量(从1开始,即从confirm.select开始),broker在确认消息时,会向channel中发送basic.ack,而delivery-tag则是包含了确认消息的序列号。broker也会在basic.ack中设置multiple属性,代表该消息和位于该消息之前的消息都被确认。
Amqp Template
Spring AMQP提供了高层抽象的template,在该tempalte中定义了接收消息和发送消息的主要操作。
添加重试功能
可以配置RabbitTemplate使用RetryTemplate,如下样例展示了exponential back off policy和默认的SimpleRetryPolicy,该policy会在抛异常之前重试3次。
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(500);
backOffPolicy.setMultiplier(10.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
template.setRetryTemplate(retryTemplate);
return template;
}
异步发布
发布消息是一个异步的机制,默认情况下,rabbitMQ会将不能路由的消息丢弃。为了成功的发布,需要获取异步的confirm。考虑如下两种失败场景:
- 发送到一个exchange,但是没有匹配的queue
- 发送到一个不存在的exchange
第一种场景由publisher returns涵盖。
对于第二种场景,消息将会被丢弃并且没有任何返回,channel将会被关闭并且抛出异常。默认情况下,异常将会在日志中打印,但是可以注册一个ChannelListener到CachingConnectionFactory,用于获取如下事件。
this.connectionFactory.addConnectionListener(new ConnectionListener() {
@Override
public void onCreate(Connection connection) {
}
@Override
public void onShutDown(ShutdownSignalException signal) {
...
}
});
可以检查signal的reason属性,从而确定产生的问题。
Correlated Publisher Confirms and Returns
RabbitTemplate中实现了AmqpTemplate中对于发布确认和Returns的支持。
Rabbitmq Returns
通过Rabbitmq完成RPC调用,消息中需要含有如下属性:
- deliveryMode: 将消息标识为持久化的或者暂时的,如果该属性值为2,则消息是持久化的,任何其他值消息都是暂时的
- contentType:用于表示消息的mime-type,例如可以是application/json
- replyTo:通常用于命名一个callback queue
- correlationId:该属性用于关联rpc响应和请求
Correlation Id
如果对来自所有客户端的rpc请求,都只创建一个callback queue,该方案效率会很低。因此,应该为每一个客户端都创建一个callback queue。
因为如果所有客户端使用一条callback queue,那么传递给callback queue的rpc响应会广播给所有订阅的客户端,然后再用客户端丢弃或匹配。这样会极大影响吞吐量。
在为每一个client创建一个callback queue后,从callback queue中接收到的响应再通过correlationId属性同请求关联到一起。如果correlationid关联的请求不存在,那么只需要安全的丢弃该请求即可。
Rabitmq实现RPC流程
- 对于rpc请求,客户端发送消息时消息应该携带两个属性:
replyTo:replyTo是一个匿名的独占队列(exclusive为true,当该队列只能由一个连接使用,并且当连接断开时会自动删除),correleationId则是一个唯一的值,用于将请求和响应关联起来 - 请求将会被发送到rpc_queue
- rpc worker将会从rpc_queue中拉取消息,并对拉取数据进行处理。处理完成之后,会将结果封装在消息中并且发送给客户端,消息会被发送到
replyTo属性指定的queue - client等待reply_queue中的响应消息,当拉取到响应消息之后,其会检查correlationId属性,并根据其来匹配响应对应的请求
Spring AMQP Returns
对于返回的消息, template的mandatory属性必须要被设置为true,或mandatory-expression表达式必须要为true。该特性要求CachingConnectionFactory的publisherReturns属性被设置为true。返回将会通过setReturnsCallback(ReturnsCallback callback)方法注册RabbitTemplate.ReturnsCallback回调来发送到客户端,该回调必须实现如下方法:
void returnedMessage(ReturnedMessage returned);
在通过setReturnsCallback来注册回调时,每个rabbitTemplate只能够注册一个ReturnsCallback
ReturnedMessage必须含有如下属性:
- message:返回消息本身
- replyCode:代表return reason的code
- replyText:一个文本的return reason,例如
NO_ROUTE - exchange:消息发送到的交换机
- routing key:使用的routing key
发布确认
对于发布确认,该template需要一个CachingConnectionFactory,并且CachingConnectionFactory的publisherConfirm属性需要被设置为ConfirmType.CORRELATED。确认将会被发送给客户端,可以通过setConfirmCallback(ConfirmCallback callback)注册一个RabbitTemplate.ConfirmCallback方法。该回调需要实现如下方法:
void confirm(CorrelationData correlationData, boolean ack, String cause);
其中correlationData是客户端在发送消息时提供的原始消息对象。ack参数如果为true代表ack,为false则是代表nack。对于nack,cause参数将会包含nack的原因。
例如,如果将消息发送给一个不存在的交换机,在这种情况下,broker将会关闭channel,关闭的原因则会包含在
cause中。
ConfirmCallback只在RabbitTemplate中被支持。
如果同时启用了发布确认和返回,且消息无法被路由到任何队列中,那么CorrelationData的return属性将会被注入到返回的消息中。其会保证返回消息的属性会在
Scoped Operations
通常情况下,在使用template时,Channel从cache中获取,使用完毕之后再被返还到cache中以便重用。在多线程环境下,并不会保证线程在下次使用Channel时会使用一样的Channel。