阅读spring kafka关于创建topic和发送消息的文档
This commit is contained in:
@@ -1,701 +0,0 @@
|
|||||||
# Spring for Apache Kafka
|
|
||||||
## Introduce
|
|
||||||
### 依赖
|
|
||||||
可以通过如下方式来添加Kafka的依赖包:
|
|
||||||
```xml
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.springframework.kafka</groupId>
|
|
||||||
<artifactId>spring-kafka</artifactId>
|
|
||||||
</dependency>
|
|
||||||
```
|
|
||||||
### Consumer Demo
|
|
||||||
```java
|
|
||||||
@SpringBootApplication
|
|
||||||
public class Application {
|
|
||||||
|
|
||||||
public static void main(String[] args) {
|
|
||||||
SpringApplication.run(Application.class, args);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public NewTopic topic() {
|
|
||||||
return TopicBuilder.name("topic1")
|
|
||||||
.partitions(10)
|
|
||||||
.replicas(1)
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@KafkaListener(id = "myId", topics = "topic1")
|
|
||||||
public void listen(String in) {
|
|
||||||
System.out.println(in);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
```
|
|
||||||
application.properties文件配置示例如下:
|
|
||||||
```properties
|
|
||||||
spring.kafka.consumer.auto-offset-reset=earliest
|
|
||||||
```
|
|
||||||
NewTopic的bean对象会导致broker中创建该topic,如果该topic已经存在则该topic不需要。
|
|
||||||
### Producer Demo
|
|
||||||
```java
|
|
||||||
@SpringBootApplication
|
|
||||||
public class Application {
|
|
||||||
|
|
||||||
public static void main(String[] args) {
|
|
||||||
SpringApplication.run(Application.class, args);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public NewTopic topic() {
|
|
||||||
return TopicBuilder.name("topic1")
|
|
||||||
.partitions(10)
|
|
||||||
.replicas(1)
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
|
|
||||||
return args -> {
|
|
||||||
template.send("topic1", "test");
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
```
|
|
||||||
## Reference
|
|
||||||
### Using Spring for Apache Kafka
|
|
||||||
#### 连接到Kafka
|
|
||||||
- Kafka Admin
|
|
||||||
- ProducerFactory
|
|
||||||
- ConsumerFactory
|
|
||||||
|
|
||||||
上述所有都继承了KafkaResourceFactory类。这允许在运行时动态改变bootstrap servers(同Kafka集群建立连接的ip:port),通过`setBootstrapServersSupplier(() → …)`向其传递一个`Supplier<String>`,传递的Supplier将会被所有新建连接调用来获取bootstrap servers的ip和port。
|
|
||||||
消费者和生产者通常是长期活跃的,为了关闭已经存在的所有生产者,可以调用`DefaultKafkaProducerFactory`的`reset()`方法;如果要关闭现存的所有消费者,可以调用`KafkaListenerEndpointRegistry`的`stop()`方法(调用stop方法之后再调用start方法),或者再任何其他的listener容器bean中调用stop和start方法。
|
|
||||||
为了方便,框架提供了`ABSwitchCluster`来支持两套bootstrap servers集合,再同一时间只能由一套处于活跃状态。当想要切换bootstrap servers集群时,可以调用`primary()`方法或`secondary()`方法,并且在producer factory上调用`reset`方法用于建立新的连接;对于消费者,在所有listener容器中调用`stop`和`start`方法。当使用`@KafkaListener`注解时,在`KafkaListenerEndpointRegistry`上调用stop和start方法。
|
|
||||||
##### Factory Listener
|
|
||||||
`DefaultKafkaProducerFactory`和`DefaultKafkaConsumerFactory`可以配置`Listener`来接收通知,当生产者或消费者被创建或关闭时,Listener接收到提醒。
|
|
||||||
```java
|
|
||||||
// Producer Factory Listener
|
|
||||||
interface Listener<K, V> {
|
|
||||||
|
|
||||||
default void producerAdded(String id, Producer<K, V> producer) {
|
|
||||||
}
|
|
||||||
|
|
||||||
default void producerRemoved(String id, Producer<K, V> producer) {
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
```
|
|
||||||
```java
|
|
||||||
// Consumer Factory Listener
|
|
||||||
interface Listener<K, V> {
|
|
||||||
|
|
||||||
default void consumerAdded(String id, Consumer<K, V> consumer) {
|
|
||||||
}
|
|
||||||
|
|
||||||
default void consumerRemoved(String id, Consumer<K, V> consumer) {
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
```
|
|
||||||
在上述场景中,`id`创建时都是将`client-id`添加到factory bean-name之后,之间用`.`隔开。
|
|
||||||
#### 设置Topic
|
|
||||||
如果你定义了一个`KafkaAdmin`的bean对象,那么其就能够自动添加topic到broker中。为了创建topic,可以为每个topic在应用容器中创建一个`NewTopic`bean对象。可以通过`TopicBuilder`工具类来使创建topic bean对象的过程更加简单。
|
|
||||||
创建Topic的示例如下所示:
|
|
||||||
```java
|
|
||||||
@Bean
|
|
||||||
public KafkaAdmin admin() {
|
|
||||||
Map<String, Object> configs = new HashMap<>();
|
|
||||||
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
|
||||||
return new KafkaAdmin(configs);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public NewTopic topic1() {
|
|
||||||
return TopicBuilder.name("thing1")
|
|
||||||
.partitions(10)
|
|
||||||
.replicas(3)
|
|
||||||
.compact()
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public NewTopic topic2() {
|
|
||||||
return TopicBuilder.name("thing2")
|
|
||||||
.partitions(10)
|
|
||||||
.replicas(3)
|
|
||||||
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public NewTopic topic3() {
|
|
||||||
return TopicBuilder.name("thing3")
|
|
||||||
.assignReplicas(0, Arrays.asList(0, 1))
|
|
||||||
.assignReplicas(1, Arrays.asList(1, 2))
|
|
||||||
.assignReplicas(2, Arrays.asList(2, 0))
|
|
||||||
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
```
|
|
||||||
可以省略partitions()方法和replicas()方法,在省略的情况下broker默认设置值将会被采用:
|
|
||||||
```java
|
|
||||||
@Bean
|
|
||||||
public NewTopic topic4() {
|
|
||||||
return TopicBuilder.name("defaultBoth")
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public NewTopic topic5() {
|
|
||||||
return TopicBuilder.name("defaultPart")
|
|
||||||
.replicas(1)
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public NewTopic topic6() {
|
|
||||||
return TopicBuilder.name("defaultRepl")
|
|
||||||
.partitions(3)
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
```
|
|
||||||
可以定义一个NewTopics类型的bean对象,NewTopics对象中包含多个NewTopic对象:
|
|
||||||
```java
|
|
||||||
@Bean
|
|
||||||
public KafkaAdmin.NewTopics topics456() {
|
|
||||||
return new NewTopics(
|
|
||||||
TopicBuilder.name("defaultBoth")
|
|
||||||
.build(),
|
|
||||||
TopicBuilder.name("defaultPart")
|
|
||||||
.replicas(1)
|
|
||||||
.build(),
|
|
||||||
TopicBuilder.name("defaultRepl")
|
|
||||||
.partitions(3)
|
|
||||||
.build());
|
|
||||||
}
|
|
||||||
```
|
|
||||||
> 当使用spring boot时,KafkaAdmin对象是自动注册的,故而只需要注册NewTopic类型的bean对象即可。
|
|
||||||
|
|
||||||
默认情况下,如果broker不可访问,那么只会打出日志信息,会继续加载上下文。如果想要将这种情况标记为fatal,需要设置admin的`fatalIfBrokerNotAvailable`属性为true,设置完属性后如果broker不可访问,上下文加载失败。
|
|
||||||
KafkaAdmin提供了方法在运行时动态创建topic,
|
|
||||||
- `createOrModifyTopics`
|
|
||||||
- `describeTopics`
|
|
||||||
|
|
||||||
如果想要使用更多特性,可以直接使用AdminClient:
|
|
||||||
```java
|
|
||||||
@Autowired
|
|
||||||
private KafkaAdmin admin;
|
|
||||||
|
|
||||||
...
|
|
||||||
|
|
||||||
AdminClient client = AdminClient.create(admin.getConfigurationProperties());
|
|
||||||
...
|
|
||||||
client.close();
|
|
||||||
```
|
|
||||||
#### Sending Messages
|
|
||||||
##### KafkaTemplate
|
|
||||||
`KafkaTemplate`类包装了生产者,并且提供方法向Topic发送消息。如下显示了KafkaTemplate相关的方法:
|
|
||||||
```java
|
|
||||||
CompletableFuture<SendResult<K, V>> sendDefault(V data);
|
|
||||||
|
|
||||||
CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);
|
|
||||||
|
|
||||||
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
|
|
||||||
|
|
||||||
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
|
|
||||||
|
|
||||||
CompletableFuture<SendResult<K, V>> send(String topic, V data);
|
|
||||||
|
|
||||||
CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);
|
|
||||||
|
|
||||||
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
|
|
||||||
|
|
||||||
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
|
|
||||||
|
|
||||||
CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
|
|
||||||
|
|
||||||
CompletableFuture<SendResult<K, V>> send(Message<?> message);
|
|
||||||
|
|
||||||
Map<MetricName, ? extends Metric> metrics();
|
|
||||||
|
|
||||||
List<PartitionInfo> partitionsFor(String topic);
|
|
||||||
|
|
||||||
<T> T execute(ProducerCallback<K, V, T> callback);
|
|
||||||
|
|
||||||
// Flush the producer.
|
|
||||||
|
|
||||||
void flush();
|
|
||||||
|
|
||||||
interface ProducerCallback<K, V, T> {
|
|
||||||
|
|
||||||
T doInKafka(Producer<K, V> producer);
|
|
||||||
|
|
||||||
}
|
|
||||||
```
|
|
||||||
`sendDefault`方法需要一个提供给KafkaTemplate的默认Topic。
|
|
||||||
上述API会接收一个时间戳参数(如果时间戳参数没有指定,则自动产生一个时间戳),并且会将该时间戳存储到记录中。用户指定的时间戳如何存储取决于Kafka topic中关于时间戳类型的配置。如果topic的时间戳类型被配置为`CREATE_TIME`,那么用户指定的时间戳将会被记录到record中。如果时间戳类型被配置为`LOG_APPEND_TIME`,那么用户指定的时间戳将会被忽略,并且由broker进行添加(broker会添加时间戳为local broker time)。
|
|
||||||
`metrics`方法和`partitionsFor`方法将会被委托给Producer中的同名方法`execute`方法则是会提供对底层Producer的直接访问。
|
|
||||||
如果要使用KafkaTemplate,需要配置一个Producer并且将其传递给KafkaTemplate的构造函数:
|
|
||||||
```java
|
|
||||||
@Bean
|
|
||||||
public ProducerFactory<Integer, String> producerFactory() {
|
|
||||||
return new DefaultKafkaProducerFactory<>(producerConfigs());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public Map<String, Object> producerConfigs() {
|
|
||||||
Map<String, Object> props = new HashMap<>();
|
|
||||||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
|
||||||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
|
|
||||||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
|
||||||
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
|
|
||||||
return props;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public KafkaTemplate<Integer, String> kafkaTemplate() {
|
|
||||||
return new KafkaTemplate<Integer, String>(producerFactory());
|
|
||||||
}
|
|
||||||
```
|
|
||||||
可以通过如下方式覆盖factory的ProducerConfig,根据相同的factory创建不同Producer config的template对象:
|
|
||||||
```java
|
|
||||||
@Bean
|
|
||||||
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
|
|
||||||
return new KafkaTemplate<>(pf);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
|
|
||||||
return new KafkaTemplate<>(pf,
|
|
||||||
Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
|
|
||||||
}
|
|
||||||
```
|
|
||||||
一个类型为`ProducerFactory<?, ?>`的bean对象可以被任何泛型类型引用。(该bean对象已经被spring boot自动配置)
|
|
||||||
当使用带`Message<?>`参数的方法时,topic、partition、key information都会在message header中被提供:
|
|
||||||
- KafkaHeaders.TOPIC
|
|
||||||
- KafkaHeaders.PARTITION
|
|
||||||
- KafkaHeaders.KEY
|
|
||||||
- KafkaHeaders.TIMESTAMP
|
|
||||||
|
|
||||||
该消息的载荷是data。
|
|
||||||
可以为KafkaTemplate配置ProducerListener来为发送的结果(成功或失败)提供一个异步回调,而不是通过CompletableFuture。如下是`ProducerListener`接口的定义:
|
|
||||||
```java
|
|
||||||
public interface ProducerListener<K, V> {
|
|
||||||
|
|
||||||
void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);
|
|
||||||
|
|
||||||
void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
|
|
||||||
Exception exception);
|
|
||||||
|
|
||||||
}
|
|
||||||
```
|
|
||||||
默认情况下,template配置了一个`LoggingProducerListener`,会打印发送失败到日志,但是发送成功时不会做任何事。
|
|
||||||
为了方便,默认的方法实现已经在接口中提供了提供了,如果只需要覆盖一个方法,只需要对onSuccess或onError进行Override即可。
|
|
||||||
发送消息会返回一个`CompletableFuture<SendResult>`对象。可以为异步操作注册一个listener:
|
|
||||||
```java
|
|
||||||
CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
|
|
||||||
future.whenComplete((result, ex) -> {
|
|
||||||
...
|
|
||||||
});
|
|
||||||
```
|
|
||||||
SendResult拥有两个属性,ProducerRecord和RecordMetaData。
|
|
||||||
Throwable可以被转化为KafkaProducerException,该异常的failedProducerRecord可以包含失败的record。
|
|
||||||
当为producer config设置了`linger.ms`时,如果待发送的消息没有达到`batch.size`,会延迟发送消息等待更多的消息出现并被批量发送。默认情况下,linger.ms为0,不会有延迟,但是如果linger.ms有值,那么在发送消息之后如果希望消息立马发送,需要手动调用flush方法。
|
|
||||||
如下示例展示了如何通过kafkaTemplate向broker发送消息:
|
|
||||||
```java
|
|
||||||
public void sendToKafka(final MyOutputData data) {
|
|
||||||
final ProducerRecord<String, String> record = createRecord(data);
|
|
||||||
|
|
||||||
CompletableFuture<SendResult<Integer, String>> future = template.send(record);
|
|
||||||
future.whenComplete((result, ex) -> {
|
|
||||||
if (ex == null) {
|
|
||||||
handleSuccess(data);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
handleFailure(data, record, ex);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// Blocking (Sync)
|
|
||||||
public void sendToKafka(final MyOutputData data) {
|
|
||||||
final ProducerRecord<String, String> record = createRecord(data);
|
|
||||||
|
|
||||||
try {
|
|
||||||
template.send(record).get(10, TimeUnit.SECONDS);
|
|
||||||
handleSuccess(data);
|
|
||||||
}
|
|
||||||
catch (ExecutionException e) {
|
|
||||||
handleFailure(data, record, e.getCause());
|
|
||||||
}
|
|
||||||
catch (TimeoutException | InterruptedException e) {
|
|
||||||
handleFailure(data, record, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
```
|
|
||||||
上述示例中,ex为KafkaProducerException类型,并且有failedProducerRecord属性。
|
|
||||||
##### RoutingKafkaTemplate
|
|
||||||
可以通过RoutingKafkaTemplate在运行时根据目标topic的名称选定producer。
|
|
||||||
> routing template不支持事务、execute、flush、metrics等操作,因为这些操作不知道topic名称
|
|
||||||
|
|
||||||
routing template需要一个key和value分别为`java.util.regex.Pattern`和`ProducerFactory<Object, Object>`的map实例。该map应该是有序的,因为map会按顺序遍历(例如LinkedHashMap集合)。并且,应该在map最开始的时候指定更加具体的pattern。
|
|
||||||
如下示例会展示如何通过一个template向不同的topic发送消息,并且值的序列化会采用不同的序列化器:
|
|
||||||
```java
|
|
||||||
@SpringBootApplication
|
|
||||||
public class Application {
|
|
||||||
|
|
||||||
public static void main(String[] args) {
|
|
||||||
SpringApplication.run(Application.class, args);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
|
|
||||||
ProducerFactory<Object, Object> pf) {
|
|
||||||
|
|
||||||
// Clone the PF with a different Serializer, register with Spring for shutdown
|
|
||||||
Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
|
|
||||||
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
|
|
||||||
DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
|
|
||||||
context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF);
|
|
||||||
|
|
||||||
Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
|
|
||||||
map.put(Pattern.compile("two"), bytesPF);
|
|
||||||
map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
|
|
||||||
return new RoutingKafkaTemplate(map);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
|
|
||||||
return args -> {
|
|
||||||
routingTemplate.send("one", "thing1");
|
|
||||||
routingTemplate.send("two", "thing2".getBytes());
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
```
|
|
||||||
##### DefaultKafkaProducerFactory
|
|
||||||
在上述template的使用示例中,创建一个KafkaTemplate需要使用ProducerFactory。
|
|
||||||
当不使用transaction时,默认情况下,DefaultKafkaProducerFactory创建一个单例producer,单例producer由所有客户端使用。但是,如果在template上调用了flush方法,可能会造成其使用同一producer的其他线程的延迟。
|
|
||||||
> DefaultKafkaProducerFactory有一个属性producerPerThread,当该属性被设置为true时,Kafka会对每一个线程都创建一个producer,以此来避免这个问题。
|
|
||||||
|
|
||||||
> 当producerPerThread被设置为true时,若producer不再需要,用户代码必须在factory上调用closeThreadBoundProducer方法,这回物理的关闭producer并且将其中ThreadLocal中移除。调用reset或destroy并不会清除这些producer
|
|
||||||
|
|
||||||
当创建DefaultKafkaProducerFactory时,key和value的serializer class可以通过配置来制定,配置的信息会通过接收一个Map的构造函数传递给DefaultKafkaProducerFactory。
|
|
||||||
serializer实例也可以作为参数传递给DefaultKafkaProducerFactory构造函数,此时所有生产者都会共享同一个serializer实例。 可选的,也可以提供一个`Supplier<Serializer>`给构造函数,此时每个生产者都会调用该Supplier获取一个独立的Serializer。
|
|
||||||
```java
|
|
||||||
@Bean
|
|
||||||
public ProducerFactory<Integer, CustomValue> producerFactory() {
|
|
||||||
return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
|
|
||||||
return new KafkaTemplate<Integer, CustomValue>(producerFactory());
|
|
||||||
}
|
|
||||||
```
|
|
||||||
可以在factory创建之后,对producer properties进行更新。这些更新并不会影响现存的生产者,可以调用reset方法来关闭所有现存的生产者,新的生产者会根据新配置项来创建。
|
|
||||||
> 但是,无法将事务的producer factory修改为非事务的,反之亦然,无法将非事务的producer修改为事物的
|
|
||||||
|
|
||||||
目前提供如下两个方法对producer properties进行更新:
|
|
||||||
```java
|
|
||||||
void updateConfigs(Map<String, Object> updates);
|
|
||||||
|
|
||||||
void removeConfig(String configKey);
|
|
||||||
```
|
|
||||||
##### ReplyingKafkaTemplate
|
|
||||||
ReplyingKafkaTemplate作为KafkaTemplate的子类,提供了请求、回复的语义。相对于KafkaTemplate,ReplyingKafkaTemplate具有两个额外的方法:
|
|
||||||
```java
|
|
||||||
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
|
|
||||||
|
|
||||||
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
|
|
||||||
Duration replyTimeout);
|
|
||||||
```
|
|
||||||
|
|
||||||
方法的返回结果是一个CompletableFuture,实际结果以异步的方式填充到其中。结果含有一个sendFuture属性,是调用kafkaTemplate.send方法的结果。可以用该future对象来获知send操作的返回结果。
|
|
||||||
如果使用第一个方法,或是replyTimeout传递参数为null,那么会使用默认的replyTimeout,默认值为5s。
|
|
||||||
该template含有一个新的方法`waitForAssignment`,如果reply container通过`auto.offset.reset=latest`来进行配置时,可以避免发送了一个请求并且结果被返回,但是container还尚未被初始化。
|
|
||||||
如下实例展示了如何使用ReplyingKafkaTemplate:
|
|
||||||
```java
|
|
||||||
@SpringBootApplication
|
|
||||||
public class KRequestingApplication {
|
|
||||||
|
|
||||||
public static void main(String[] args) {
|
|
||||||
SpringApplication.run(KRequestingApplication.class, args).close();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
|
|
||||||
return args -> {
|
|
||||||
if (!template.waitForAssignment(Duration.ofSeconds(10))) {
|
|
||||||
throw new IllegalStateException("Reply container did not initialize");
|
|
||||||
}
|
|
||||||
ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
|
|
||||||
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
|
|
||||||
SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
|
|
||||||
System.out.println("Sent ok: " + sendResult.getRecordMetadata());
|
|
||||||
ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
|
|
||||||
System.out.println("Return value: " + consumerRecord.value());
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
|
|
||||||
ProducerFactory<String, String> pf,
|
|
||||||
ConcurrentMessageListenerContainer<String, String> repliesContainer) {
|
|
||||||
|
|
||||||
return new ReplyingKafkaTemplate<>(pf, repliesContainer);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public ConcurrentMessageListenerContainer<String, String> repliesContainer(
|
|
||||||
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
|
|
||||||
|
|
||||||
ConcurrentMessageListenerContainer<String, String> repliesContainer =
|
|
||||||
containerFactory.createContainer("kReplies");
|
|
||||||
repliesContainer.getContainerProperties().setGroupId("repliesGroup");
|
|
||||||
repliesContainer.setAutoStartup(false);
|
|
||||||
return repliesContainer;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public NewTopic kRequests() {
|
|
||||||
return TopicBuilder.name("kRequests")
|
|
||||||
.partitions(10)
|
|
||||||
.replicas(2)
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public NewTopic kReplies() {
|
|
||||||
return TopicBuilder.name("kReplies")
|
|
||||||
.partitions(10)
|
|
||||||
.replicas(2)
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
```
|
|
||||||
上述通过spring boot自动配置的container factory来创建了一个reply container。
|
|
||||||
ReplyingKafkaTemplate会设置一个name为KafkaHeaders.CORRELATION_ID的header,并且该header必须被server端(消费者端)返回。
|
|
||||||
在这种情况下,如果`@KafkaListener`应用会返回:
|
|
||||||
```java
|
|
||||||
@SpringBootApplication
|
|
||||||
public class KReplyingApplication {
|
|
||||||
|
|
||||||
public static void main(String[] args) {
|
|
||||||
SpringApplication.run(KReplyingApplication.class, args);
|
|
||||||
}
|
|
||||||
|
|
||||||
@KafkaListener(id="server", topics = "kRequests")
|
|
||||||
@SendTo // use default replyTo expression
|
|
||||||
public String listen(String in) {
|
|
||||||
System.out.println("Server received: " + in);
|
|
||||||
return in.toUpperCase();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public NewTopic kRequests() {
|
|
||||||
return TopicBuilder.name("kRequests")
|
|
||||||
.partitions(10)
|
|
||||||
.replicas(2)
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean // not required if Jackson is on the classpath
|
|
||||||
public MessagingMessageConverter simpleMapperConverter() {
|
|
||||||
MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
|
|
||||||
messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
|
|
||||||
return messagingMessageConverter;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
```
|
|
||||||
上述`@KafkaListener`结构会回应correlation id并且决定reply topic。template会使用默认header `KafKaHeaders.REPLY_TOPIC`来告知消费者应该将回复发送到哪个topic中。
|
|
||||||
template会根据配置的reply container来探知reply topic或是分区。如果容器被配置监听单个topic或是单个TopicPartitionOffset,会将监听的topic或是分区设置到reply header中。如果容器通过其他方式配置(如监听多个topic),那么用户必须显式设置reply header。
|
|
||||||
如下展示了用户如何设置`KafkaHeaders.REPLY_TOPIC`:
|
|
||||||
```java
|
|
||||||
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));
|
|
||||||
```
|
|
||||||
如果只设置了一个reply TopicPartitionOffset,如果每个实例监听一个不同的分区,那么可以多个template共用一个reply topic。
|
|
||||||
如果只配置了一个reply topic,每个实例必须要有不同的`group.id`。在这种情况下,每个实例都会接收到每个请求,但是只有发送请求的实例能够找到correlation id。这种情况能够自动扩容,但是会带来额外的网络负载,每个实例接收到不想要消息时的丢弃操作也会带来开销。在这种情况下,推荐将template的sharedReplyTopic设置为true,将非预期reply的日志级别从info降低为debug。
|
|
||||||
如下为配置一个shared reply topic容器的示例:
|
|
||||||
```java
|
|
||||||
@Bean
|
|
||||||
public ConcurrentMessageListenerContainer<String, String> replyContainer(
|
|
||||||
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
|
|
||||||
|
|
||||||
ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
|
|
||||||
container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
|
|
||||||
Properties props = new Properties();
|
|
||||||
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
|
|
||||||
container.getContainerProperties().setKafkaConsumerProperties(props);
|
|
||||||
return container;
|
|
||||||
}
|
|
||||||
```
|
|
||||||
> 如果没有按照上述的方法设置template,那么当存在多个template实例时,每个实例都需要一个确定的reply topic。
|
|
||||||
> 一个可选的替代方案是显式设置`KafkaHeaders.REPLY_PARTITION`,并且对每个实例使用特定的分区。此时server必须显式使用该header将reply路由到正确的分区中(@KafkaListener会做)。在这种情况下,reply container必须不使用kafka group特性,并且被配置监听一个固定的分区。
|
|
||||||
|
|
||||||
默认情况下,会使用3个header:
|
|
||||||
- KafkaHeaders.CORRELATION_ID:用于将reply关联到请求
|
|
||||||
- KafkaHeaders.REPLY_TOPIC:用于告知server写入哪个topic
|
|
||||||
- KafkaHeaders.REPLY_PARTITION:该header是可选的,用于告知reply会写入到哪个分区
|
|
||||||
|
|
||||||
上述的header会被@KafkaListener使用,用于路由reply。
|
|
||||||
|
|
||||||
#### Receiving Messages
|
|
||||||
|
|
||||||
可以通过配置MessageListenerContainer并提供一个message listener来接收消息,也可以通过使用@KafkaListener注解来监听消息。
|
|
||||||
当使用MessageListenerContainer时,可以提供一个listener来接收数据。message listener目前有如下接口:
|
|
||||||
```java
|
|
||||||
// 该接口用于处理单独的ConsumerRecord实例,该实例从Kafka消费者的poll
|
|
||||||
// 操作获取,当使用自动提交或一个由容器管理的提交方法
|
|
||||||
public interface MessageListener<K, V> { (1)
|
|
||||||
|
|
||||||
void onMessage(ConsumerRecord<K, V> data);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// 该接口用于处理单独的ConsumerRecord实例,该实例从Kafka消费者的poll
|
|
||||||
// 操作获取,当使用一个手动提交方法
|
|
||||||
public interface AcknowledgingMessageListener<K, V> { (2)
|
|
||||||
|
|
||||||
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// 该接口和(1)类似,但是该接口可以访问提供的consumer对象
|
|
||||||
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { (3)
|
|
||||||
|
|
||||||
void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// 该接口和(2)类似,但是该接口可以访问提供的consumer对象
|
|
||||||
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { (4)
|
|
||||||
|
|
||||||
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// 该接口用于处理所有由kafka消费者的poll操作接收的ConsumerRecord,当使用自动
|
|
||||||
// 提交或容器管理的提交方法时。当使用该接口时,不支持`ACKMODE.RECORD`,该传递
|
|
||||||
// 给该listener的batch数据是已完成的
|
|
||||||
public interface BatchMessageListener<K, V> { (5)
|
|
||||||
|
|
||||||
void onMessage(List<ConsumerRecord<K, V>> data);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// 该接口用于处理所有由kafka消费者的poll操作接收到的ConsumerRecord,当使用手动
|
|
||||||
// 提交时
|
|
||||||
public interface BatchAcknowledgingMessageListener<K, V> { (6)
|
|
||||||
|
|
||||||
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// 该接口和(5)类似,但是可以访问consumer对象
|
|
||||||
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { (7)
|
|
||||||
|
|
||||||
void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// 该接口和(6)类似,但是可以访问consumer对象
|
|
||||||
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { (8)
|
|
||||||
|
|
||||||
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
|
|
||||||
|
|
||||||
}
|
|
||||||
```
|
|
||||||
> Consumer对象并不是线程安全的,故而只能够在调用该listener的线程中调用consumer的方法
|
|
||||||
|
|
||||||
##### Message Listener Container
|
|
||||||
为MessageListenerContainer提供了两个实现:
|
|
||||||
- KafkaMessageListenerContainer
|
|
||||||
- ConcurrentMessageListenerContainer
|
|
||||||
|
|
||||||
KafkaMessageListenerContainer会在单线程中接收所有来自topic和分区的消息。ConcurrentMessageListenerContainer会委托给一个或多个KafkaMessageListenerContainer实例,从而提供多线程的消费。
|
|
||||||
|
|
||||||
**RecordInterceptor**
|
|
||||||
|
|
||||||
可以添加一个RecordInterceptor到ListenerContainer,在调用listener之前,interceptor会被调用。record拦截器允许对record进行查看和修改。如果interceptor返回结果为null,那么listener则不会被调用。而且,其还有一个额外的方法,可以在listener退出之后被调用(退出之后,指正常退出或以抛出异常的形式退出)。
|
|
||||||
还有一个BatchInterceptor,提供了和BatchListener类似的功能。ConsumerAwareRecordInterceptor则是提供了访问consumer对象的功能。
|
|
||||||
如果想要调用多个Interceptor,则是可以使用CompositeRecordInterceptor和CompositeBatchInterceptor。
|
|
||||||
默认情况下,当使用事务时,interceptor会在事务开始之前被调用。可以设置listener container的interceptBeforeTx属性为false来令interceptor的调用时机为事务开始之后。
|
|
||||||
ConcurrentMessageListenerContainer支持“静态成员”(即固定消费者,即使消费者实例重启,如此可以降低事件在消费者之间重新负载均衡的开销),当并发量大于1时。`group.instance.id`的后缀为`-n`,其中n从1开始。
|
|
||||||
|
|
||||||
**KafkaMessageListenerContainer**
|
|
||||||
|
|
||||||
可以通过如下构造函数来使用MessageListenerContainer:
|
|
||||||
```java
|
|
||||||
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,ContainerProperties containerProperties)
|
|
||||||
```
|
|
||||||
该构造方法接收一个ConsumerFactory,并通过ContainerProperties实例来接收其他配置。ContainerProperties拥有如下的构造函数:
|
|
||||||
```java
|
|
||||||
public ContainerProperties(TopicPartitionOffset... topicPartitions)
|
|
||||||
|
|
||||||
public ContainerProperties(String... topics)
|
|
||||||
|
|
||||||
public ContainerProperties(Pattern topicPattern)
|
|
||||||
```
|
|
||||||
- 第一个构造方法接收一个TopicPartitionOffset的数组作为参数,来显式指定container使用哪些分区(通过consumer的assign方法),并可以附带一个可选的初始offset。默认情况下,如果offset为正值,代表其是绝对的offset。若offset为负值,则offset代表在默认分区中相对于current last offset的相对位置。并且,对于TopicPartitionOffset类,其提供了一个接收额外boolean参数的构造方法,如果该值设置为true,无论init offset为正值或者负值,都是相对于consumer当前位置的相对值。当容器启动时,offset将会被使用。
|
|
||||||
- 第二个构造方法接收一个topic数组,并且kafka根据`group.id`属性来分配分区(在group中对分区进行分配)。
|
|
||||||
- 第三个构造方法接收一个正则表达式,根据正则表达式来选中topic
|
|
||||||
|
|
||||||
想要为容器指定一个MessageListener,可以在创建容器时使用ContainerProps.setMessageListener方法。如下是创建容器时设置MessageListener的示例:
|
|
||||||
```java
|
|
||||||
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
|
|
||||||
containerProps.setMessageListener(new MessageListener<Integer, String>() {
|
|
||||||
...
|
|
||||||
});
|
|
||||||
DefaultKafkaConsumerFactory<Integer, String> cf =
|
|
||||||
new DefaultKafkaConsumerFactory<>(consumerProps());
|
|
||||||
KafkaMessageListenerContainer<Integer, String> container =
|
|
||||||
new KafkaMessageListenerContainer<>(cf, containerProps);
|
|
||||||
return container;
|
|
||||||
```
|
|
||||||
`missingTopicsFatal`可以控制topic不存在时container是否启动(`missingTopicsFatal`的值默认为false)。如果任一topic在broker中不存在,那么container的启动会被终止。
|
|
||||||
|
|
||||||
**ConcurrentMessageListenerContainer**
|
|
||||||
|
|
||||||
`ConcurrentMessageListenerContainer`唯一的构造方法和`KafkaListenerContainer`类似,如下显示了构造方法的签名:
|
|
||||||
```java
|
|
||||||
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,ContainerProperties containerProperties)
|
|
||||||
```
|
|
||||||
ConcurrentMessageListenerContainer还有一个concurrency属性,例如,`container.setConcurrency(3)`会创建3个`KafkaMessageListenerContainer`实例。
|
|
||||||
kafka会根据其组管理功能再消费者之间分配分区。
|
|
||||||
> **kafka分配策略**
|
|
||||||
> 当监听多个topic时,默认的分区分配策略可能并不是想要的。例如有三个topic,每个topic有5个分区,那么将concurrency设置为15时,只会看到5个活跃的consumer
|
|
||||||
> **RangeAssignor**
|
|
||||||
> 默认情况下,kafka的分区分配策略是通过RangeAssignor来进行分配的,其会将每个topic的分区在消费者group中所有消费者实例之间进行分配,如果消费者实例数大于分区数,则是会将消费者按字典顺序排序且分配分区给排序靠前的人。
|
|
||||||
> 故而,在RangeAssignor策略下,只有字典排序靠前的消费者实例才能在每个topic中分配到一个分区,前5个消费者每个实例3个分区,后面的10个消费者完全空闲。
|
|
||||||
> **RoundRobinAssignor**
|
|
||||||
> 相对于RangeAssignor,RoundRobinAssingor则是会基于轮询在所有消费者之间均匀的分配所有的分区,在上述情况下,15个消费者实例,每个都会分配到一个分区。
|
|
||||||
>
|
|
||||||
> 想要改变`PartitionAssignor`,可以再提供给DefaultKafkaConsumerFactory的参数中设置`ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG`属性。
|
|
||||||
> 如果使用Spring Boot,可以按如下方式来配置:
|
|
||||||
> ```properties
|
|
||||||
> spring.kafka.consumer.properties.partition.assignment.strategy=
|
|
||||||
> org.apache.kafka.clients.consumer.RoundRobinAssignor
|
|
||||||
> ```
|
|
||||||
|
|
||||||
如果container属性通过`TopicPartitionOffset`配置,`ConcurrentMessageListenerContainer`将会把TopicPartitionOffset分发给所有其委托的KafkaMessageListenerContainer实例。
|
|
||||||
> 如果提供了六个TopicPartitionOffset实例,并且concurrency被设置为3,那么每个实例都会获取到2个分区。
|
|
||||||
> 如果5个TopicPartitionOffset实例被提供,且concurrency被设置为2,那么两个实例会获取到2个分区,一个实例会获取到一个分区。
|
|
||||||
> 如果concurrency比TopicPartitionOffset的数量更大,那么concurrency会向下进行调整,让每个实例都获取一个分区。
|
|
||||||
|
|
||||||
**Committing Offsets**
|
|
||||||
如果消费者属性`enable.auto.commit`被设置为true,Kafka会对offset进行自动提交。如果该选项被设置为false,container支持一些`AckMode`设置。默认的`AckMode`值为`Batch`。`enable.auto.commit`被默认设置为false,如果想要开启自动提交,可以手动将该属性设置为true。
|
|
||||||
消费者的poll方法会返回一个或者多个ConsumerRecord。MessageListener对于每条record都会被调用。
|
|
||||||
根据`AckMode`的值,container会执行如下操作:
|
|
||||||
- RECORD:当listener处理完record返回之后,会提交Offset
|
|
||||||
- BATCH:当poll方法返回的所有record都被处理之后,提交Offset
|
|
||||||
- TIME:当poll方法返回的所有record都被处理之后,提交Offset,或者自上次提交之后已经超过了ackTime,也会提交Offset
|
|
||||||
- COUNT:当poll方法返回的所有record都被处理之后,提交Offset,或者自从上次提交之后已经接收了超过ackCount条record,也会被提交
|
|
||||||
- COUNT_TIME:和TIME和COUNT类似,但是只要TIME和COUNT任一满足,也会提交Offset
|
|
||||||
- MANNUAL:message listener负责调用`Acknowledgment.acknowledge()`方法。除此之外,和BATCH语义相同
|
|
||||||
- MANNUAL_IMMEDIATE:当listener调用`Acknowledgment.acknowledge()`之后,对Offset进行提交
|
|
||||||
246
spring/spring kafka/spring kafka.md
Normal file
246
spring/spring kafka/spring kafka.md
Normal file
@@ -0,0 +1,246 @@
|
|||||||
|
# Spring Kafka
|
||||||
|
## 连接到kafka
|
||||||
|
### 运行时切换bootstrap servers
|
||||||
|
从2.5版本开始,KafkaAdmin、ProducerFactory、ConsumerFactory都继承于`KafkaResourceFactory`抽象类。通过调用`KafkaResourceFactory`抽象类的`setBootstrapServersSupplier(Supplier<String> bootstrapServersSupplier)`方法,可以在运行时动态的切换bootstrap servers。该Supplier将会在新建连接获取bootstrap servers时被调用。
|
||||||
|
|
||||||
|
> #### 切换bootstrap后关闭旧consumer和producer
|
||||||
|
> kafka consumer和producer通常都是基于长连接的,在调用setBootstrapServersSupplier在运行时切换bootstrap servers后,如果想要关闭现存的producer,可以调用`DefaultKafkaProducerFactory`的`reset`方法。如果想要关闭现存的consumer,可以调用`KafkaListenerEndpointRegistry`的`close`方法(调用close后再调用start),或是调用其他listener container的close和start方法。
|
||||||
|
|
||||||
|
#### ABSwitchCluster
|
||||||
|
为了方便起见,framework提供了`ABSwitchCluster`类,该类支持两套bootstrap servers集合,在任一时刻,只有其中一套bootstrap servers起作用。ABSwitchCluster类继承Supplier\<String\>接口,将`ABSwitchCluster`对象提供给consumer factory, producer factory, KafkaAdmin后,如果想要切换bootstrap servers,可以调用ABSwitchCluster类的`primary`和`secondary`方法,并关闭生产者和消费者的旧实例(关闭生产者旧实例,在producer factory上调用reset方法,用于创建到新bootstrap servers的连接;对于消费者实例,可以对所有listener container先调用close方法再调用start方法,当使用@KafkaListener注解时,需要对`KafkaListenerEndpointRegistry`bean对象调用close和start方法。
|
||||||
|
|
||||||
|
### Factory Listener
|
||||||
|
从2.5版本开始,`DefaultKafkaProducerFactory`和`DefaultKafkaConsumerFactory`都可以配置Listener,通过配置Listener可以监听生产者或消费者实例的创建和关闭。
|
||||||
|
|
||||||
|
```java
|
||||||
|
// producer listener
|
||||||
|
interface Listener<K, V> {
|
||||||
|
|
||||||
|
default void producerAdded(String id, Producer<K, V> producer) {
|
||||||
|
}
|
||||||
|
|
||||||
|
default void producerRemoved(String id, Producer<K, V> producer) {
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
```
|
||||||
|
```java
|
||||||
|
// consumer listener
|
||||||
|
interface Listener<K, V> {
|
||||||
|
|
||||||
|
default void consumerAdded(String id, Consumer<K, V> consumer) {
|
||||||
|
}
|
||||||
|
|
||||||
|
default void consumerRemoved(String id, Consumer<K, V> consumer) {
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
```
|
||||||
|
再上述接口中,id代表再factory bean对象名称后追加client-id属性,二者通过`.`分隔。
|
||||||
|
|
||||||
|
## 配置Topic
|
||||||
|
如果在当前应用上下文中定义了KafkaAdmin bean对象,kafkaAdmin可以自动的添加topic到broker。为了实现topic的自动添加,可以定义一个`NewTopic`类型的bean对象,kafkaAdmin会自动将该topic添加到broker中。
|
||||||
|
|
||||||
|
为了方便topic的创建,2.3版本中引入了TopicBuilder类。
|
||||||
|
```java
|
||||||
|
@Bean
|
||||||
|
public KafkaAdmin admin() {
|
||||||
|
Map<String, Object> configs = new HashMap<>();
|
||||||
|
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||||
|
return new KafkaAdmin(configs);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public NewTopic topic1() {
|
||||||
|
return TopicBuilder.name("thing1")
|
||||||
|
.partitions(10)
|
||||||
|
.replicas(3)
|
||||||
|
.compact()
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public NewTopic topic2() {
|
||||||
|
return TopicBuilder.name("thing2")
|
||||||
|
.partitions(10)
|
||||||
|
.replicas(3)
|
||||||
|
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public NewTopic topic3() {
|
||||||
|
return TopicBuilder.name("thing3")
|
||||||
|
.assignReplicas(0, List.of(0, 1))
|
||||||
|
.assignReplicas(1, List.of(1, 2))
|
||||||
|
.assignReplicas(2, List.of(2, 0))
|
||||||
|
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
从2.6版本开始,创建NewTopic时可以省略partitions()和replicas()方法的调用,此时创建的topic将会使用broker中默认的配置。支持该特性要求broker版本至少为2.4.0。
|
||||||
|
|
||||||
|
```java
|
||||||
|
@Bean
|
||||||
|
public NewTopic topic4() {
|
||||||
|
return TopicBuilder.name("defaultBoth")
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public NewTopic topic5() {
|
||||||
|
return TopicBuilder.name("defaultPart")
|
||||||
|
.replicas(1)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public NewTopic topic6() {
|
||||||
|
return TopicBuilder.name("defaultRepl")
|
||||||
|
.partitions(3)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
```
|
||||||
|
从版本2.7开始,可以在`KafkaAdmin.NewTopics`的bean对象中声明多个NewTopic对象:
|
||||||
|
```java
|
||||||
|
@Bean
|
||||||
|
public KafkaAdmin.NewTopics topics456() {
|
||||||
|
return new NewTopics(
|
||||||
|
TopicBuilder.name("defaultBoth")
|
||||||
|
.build(),
|
||||||
|
TopicBuilder.name("defaultPart")
|
||||||
|
.replicas(1)
|
||||||
|
.build(),
|
||||||
|
TopicBuilder.name("defaultRepl")
|
||||||
|
.partitions(3)
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
```
|
||||||
|
> 当使用spring boot时,KafkaAdmin对象将会被自动注册,故而只需要定义NewTopic bean对象即可。
|
||||||
|
|
||||||
|
默认情况下,如果kafka broker不可用,会输出日志进行记录,但是此时context的载入还会继续,后续可以手动调用KafkaAdmin的`initalize`方法和进行重试。如果想要在kafka broker不可用时,停止context的载入,可以将kafka Admin`fatalIfBrokerNotAvailable`属性设置为true,此时context会初始化失败。
|
||||||
|
|
||||||
|
从版本2.7开始,KafkaAdmin提供了两个方法用于在运行时动态创建和检测Topic:
|
||||||
|
- `createOrModifyTopics`
|
||||||
|
- `describeTopics`
|
||||||
|
|
||||||
|
从版本2.9.10、3.0.9开始,KafkaAdmin提供了`setCreateOrModifyTopic(Predicate<org.apache.kafka.clients.admin.NewTopic> createOrModifyTopic)`接口,该接口接收一个Predicate\<NewTopic\>参数,通过该predicate可以判断是否一个NewTopic bean应该被该kafkaAdmin创建或修改。该方法通常用于上下文中含有多个KafkaAdmin bena对象,每个kafkaAdmin对应不同的broker集群,在上下文中含有多个NewTopic对象时,可以通过predicate判断每个topic应该属性哪个amdin。
|
||||||
|
|
||||||
|
## 发送消息
|
||||||
|
KafkaTemplate类对KafkaProducer进行了包装,提供了如下接口用于向kafka topic发送消息。
|
||||||
|
```java
|
||||||
|
CompletableFuture<SendResult<K, V>> sendDefault(V data);
|
||||||
|
|
||||||
|
CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);
|
||||||
|
|
||||||
|
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
|
||||||
|
|
||||||
|
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
|
||||||
|
|
||||||
|
CompletableFuture<SendResult<K, V>> send(String topic, V data);
|
||||||
|
|
||||||
|
CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);
|
||||||
|
|
||||||
|
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
|
||||||
|
|
||||||
|
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
|
||||||
|
|
||||||
|
CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
|
||||||
|
|
||||||
|
CompletableFuture<SendResult<K, V>> send(Message<?> message);
|
||||||
|
|
||||||
|
Map<MetricName, ? extends Metric> metrics();
|
||||||
|
|
||||||
|
List<PartitionInfo> partitionsFor(String topic);
|
||||||
|
|
||||||
|
<T> T execute(ProducerCallback<K, V, T> callback);
|
||||||
|
|
||||||
|
<T> T executeInTransaction(OperationsCallback<K, V, T> callback);
|
||||||
|
|
||||||
|
// Flush the producer.
|
||||||
|
void flush();
|
||||||
|
|
||||||
|
interface ProducerCallback<K, V, T> {
|
||||||
|
|
||||||
|
T doInKafka(Producer<K, V> producer);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
interface OperationsCallback<K, V, T> {
|
||||||
|
|
||||||
|
T doInOperations(KafkaOperations<K, V> operations);
|
||||||
|
|
||||||
|
}
|
||||||
|
```
|
||||||
|
其中,sendDefault接口需要向KafkaTemplate提供一个默认的topic。
|
||||||
|
|
||||||
|
kafkaTemplate中部分api接收timestamp作为参数,并且将timestamp存储到record中。接口中指定的timestamp参数如何存储,取决于kafka topic中配置的timestamp类型。如果topic中timestamp类型被配置为`CREATE_TIME`,那么用户指定的timestamp参数将会被使用(如果用户没有指定timestamp,那么会自动创建timestamp,producer会在发送时将timestamp指定为System.currentTimeMillis())。如果topic中timstamp类型被配置为`LOG_APPEND_TIME`,那么用户指定的timestamp将会被丢弃,而broker则会负责为timestamp赋值。
|
||||||
|
|
||||||
|
mertics和partitions方法则会被委派给了底层KafkaProducer的同名方法,execute接口则是提供了对底层KafkaProducer的直接访问。
|
||||||
|
|
||||||
|
要使用KafkaTemplate,可以配置一个producer factory并将其提供给KafkaTemplate的构造方法。如下展示了如何配置一个KafkaTemplate:
|
||||||
|
```java
|
||||||
|
@Bean
|
||||||
|
public ProducerFactory<Integer, String> producerFactory() {
|
||||||
|
return new DefaultKafkaProducerFactory<>(producerConfigs());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public Map<String, Object> producerConfigs() {
|
||||||
|
Map<String, Object> props = new HashMap<>();
|
||||||
|
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||||
|
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
|
||||||
|
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||||
|
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
|
||||||
|
return props;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public KafkaTemplate<Integer, String> kafkaTemplate() {
|
||||||
|
return new KafkaTemplate<Integer, String>(producerFactory());
|
||||||
|
}
|
||||||
|
```
|
||||||
|
从2.5开始,创建KafkaTemplate时可以基于factory进行创建,但是覆盖factory中的配置属性,具体示例如下:
|
||||||
|
```java
|
||||||
|
@Bean
|
||||||
|
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
|
||||||
|
return new KafkaTemplate<>(pf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
|
||||||
|
return new KafkaTemplate<>(pf,
|
||||||
|
Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
当使用KafkaTemplate接收`Message\<?\>`类型的参数时,可以将topic、partition、key和timestamp参数指定在Message的header中,header中包含如下条目:
|
||||||
|
- KafkaHeaders.TOPIC
|
||||||
|
- KafkaHeaders.PARTITION
|
||||||
|
- KafkaHeaders.KEY
|
||||||
|
- KafkaHeaders.TIMESTAMP
|
||||||
|
|
||||||
|
除了调用发送方法获取CompletableFuture外,还可以为KafkaTemplate配置一个ProducerListener,从而在消息发送完成(成功或失败)后执行一个异步的回调。如下是ProducerListener接口的定义:
|
||||||
|
```java
|
||||||
|
public interface ProducerListener<K, V> {
|
||||||
|
|
||||||
|
void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);
|
||||||
|
|
||||||
|
void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
|
||||||
|
Exception exception);
|
||||||
|
|
||||||
|
}
|
||||||
|
```
|
||||||
|
默认情况下,KafkaTemplate配置了一个LoggingProducerListener,会在发送失败时打印失败日志,在发送成功时并不做任何事。并且为了方便起见,方法的默认实现已经被提供,可以只覆盖其中一个方法。
|
||||||
|
|
||||||
|
send方法默认返回的是CompletableFuture类型,可以在发送完成之后为future注册一个回调:
|
||||||
|
```java
|
||||||
|
CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
|
||||||
|
future.whenComplete((result, ex) -> {
|
||||||
|
...
|
||||||
|
});
|
||||||
|
```
|
||||||
|
其中,Throwable类型的ex可以被转化为`KafkaProducerException`,该类型的failedProducerRecord属性可以获取发送失败的record。
|
||||||
|
|
||||||
|
如果想要同步调用KafkaTemplate的发送方法并且等待返回结果,可以调用返回值CompletableFuture类型的get方法来同步等待。通常情况下,调用`CompletableFuture.get`时,推荐使用带超时参数的方法。如果在Producer配置中指定了`linger.ms`,那么在等待返回结果之前需要调用KafkaTemplate的flush方法。为了方便,KafkaTemplate提供了带autoFlush参数的构造器版本,如果设置autoFlush为true,kafkaTemplate在每次发送消息时都会调用flush方法。
|
||||||
Reference in New Issue
Block a user