# Spring Kafka ## 连接到kafka ### 运行时切换bootstrap servers 从2.5版本开始,KafkaAdmin、ProducerFactory、ConsumerFactory都继承于`KafkaResourceFactory`抽象类。通过调用`KafkaResourceFactory`抽象类的`setBootstrapServersSupplier(Supplier bootstrapServersSupplier)`方法,可以在运行时动态的切换bootstrap servers。该Supplier将会在新建连接获取bootstrap servers时被调用。 > #### 切换bootstrap后关闭旧consumer和producer > kafka consumer和producer通常都是基于长连接的,在调用setBootstrapServersSupplier在运行时切换bootstrap servers后,如果想要关闭现存的producer,可以调用`DefaultKafkaProducerFactory`的`reset`方法。如果想要关闭现存的consumer,可以调用`KafkaListenerEndpointRegistry`的`close`方法(调用close后再调用start),或是调用其他listener container的close和start方法。 #### ABSwitchCluster 为了方便起见,framework提供了`ABSwitchCluster`类,该类支持两套bootstrap servers集合,在任一时刻,只有其中一套bootstrap servers起作用。ABSwitchCluster类继承Supplier\接口,将`ABSwitchCluster`对象提供给consumer factory, producer factory, KafkaAdmin后,如果想要切换bootstrap servers,可以调用ABSwitchCluster类的`primary`和`secondary`方法,并关闭生产者和消费者的旧实例(关闭生产者旧实例,在producer factory上调用reset方法,用于创建到新bootstrap servers的连接;对于消费者实例,可以对所有listener container先调用close方法再调用start方法,当使用@KafkaListener注解时,需要对`KafkaListenerEndpointRegistry`bean对象调用close和start方法。 ### Factory Listener 从2.5版本开始,`DefaultKafkaProducerFactory`和`DefaultKafkaConsumerFactory`都可以配置Listener,通过配置Listener可以监听生产者或消费者实例的创建和关闭。 ```java // producer listener interface Listener { default void producerAdded(String id, Producer producer) { } default void producerRemoved(String id, Producer producer) { } } ``` ```java // consumer listener interface Listener { default void consumerAdded(String id, Consumer consumer) { } default void consumerRemoved(String id, Consumer consumer) { } } ``` 再上述接口中,id代表再factory bean对象名称后追加client-id属性,二者通过`.`分隔。 ## 配置Topic 如果在当前应用上下文中定义了KafkaAdmin bean对象,kafkaAdmin可以自动的添加topic到broker。为了实现topic的自动添加,可以定义一个`NewTopic`类型的bean对象,kafkaAdmin会自动将该topic添加到broker中。 为了方便topic的创建,2.3版本中引入了TopicBuilder类。 ```java @Bean public KafkaAdmin admin() { Map configs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); return new KafkaAdmin(configs); } @Bean public NewTopic topic1() { return TopicBuilder.name("thing1") .partitions(10) .replicas(3) .compact() .build(); } @Bean public NewTopic topic2() { return TopicBuilder.name("thing2") .partitions(10) .replicas(3) .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd") .build(); } @Bean public NewTopic topic3() { return TopicBuilder.name("thing3") .assignReplicas(0, List.of(0, 1)) .assignReplicas(1, List.of(1, 2)) .assignReplicas(2, List.of(2, 0)) .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd") .build(); } ``` 从2.6版本开始,创建NewTopic时可以省略partitions()和replicas()方法的调用,此时创建的topic将会使用broker中默认的配置。支持该特性要求broker版本至少为2.4.0。 ```java @Bean public NewTopic topic4() { return TopicBuilder.name("defaultBoth") .build(); } @Bean public NewTopic topic5() { return TopicBuilder.name("defaultPart") .replicas(1) .build(); } @Bean public NewTopic topic6() { return TopicBuilder.name("defaultRepl") .partitions(3) .build(); } ``` 从版本2.7开始,可以在`KafkaAdmin.NewTopics`的bean对象中声明多个NewTopic对象: ```java @Bean public KafkaAdmin.NewTopics topics456() { return new NewTopics( TopicBuilder.name("defaultBoth") .build(), TopicBuilder.name("defaultPart") .replicas(1) .build(), TopicBuilder.name("defaultRepl") .partitions(3) .build()); } ``` > 当使用spring boot时,KafkaAdmin对象将会被自动注册,故而只需要定义NewTopic bean对象即可。 默认情况下,如果kafka broker不可用,会输出日志进行记录,但是此时context的载入还会继续,后续可以手动调用KafkaAdmin的`initalize`方法和进行重试。如果想要在kafka broker不可用时,停止context的载入,可以将kafka Admin`fatalIfBrokerNotAvailable`属性设置为true,此时context会初始化失败。 从版本2.7开始,KafkaAdmin提供了两个方法用于在运行时动态创建和检测Topic: - `createOrModifyTopics` - `describeTopics` 从版本2.9.10、3.0.9开始,KafkaAdmin提供了`setCreateOrModifyTopic(Predicate createOrModifyTopic)`接口,该接口接收一个Predicate\参数,通过该predicate可以判断是否一个NewTopic bean应该被该kafkaAdmin创建或修改。该方法通常用于上下文中含有多个KafkaAdmin bena对象,每个kafkaAdmin对应不同的broker集群,在上下文中含有多个NewTopic对象时,可以通过predicate判断每个topic应该属性哪个amdin。 ## 发送消息 KafkaTemplate类对KafkaProducer进行了包装,提供了如下接口用于向kafka topic发送消息。 ```java CompletableFuture> sendDefault(V data); CompletableFuture> sendDefault(K key, V data); CompletableFuture> sendDefault(Integer partition, K key, V data); CompletableFuture> sendDefault(Integer partition, Long timestamp, K key, V data); CompletableFuture> send(String topic, V data); CompletableFuture> send(String topic, K key, V data); CompletableFuture> send(String topic, Integer partition, K key, V data); CompletableFuture> send(String topic, Integer partition, Long timestamp, K key, V data); CompletableFuture> send(ProducerRecord record); CompletableFuture> send(Message message); Map metrics(); List partitionsFor(String topic); T execute(ProducerCallback callback); T executeInTransaction(OperationsCallback callback); // Flush the producer. void flush(); interface ProducerCallback { T doInKafka(Producer producer); } interface OperationsCallback { T doInOperations(KafkaOperations operations); } ``` 其中,sendDefault接口需要向KafkaTemplate提供一个默认的topic。 kafkaTemplate中部分api接收timestamp作为参数,并且将timestamp存储到record中。接口中指定的timestamp参数如何存储,取决于kafka topic中配置的timestamp类型。如果topic中timestamp类型被配置为`CREATE_TIME`,那么用户指定的timestamp参数将会被使用(如果用户没有指定timestamp,那么会自动创建timestamp,producer会在发送时将timestamp指定为System.currentTimeMillis())。如果topic中timstamp类型被配置为`LOG_APPEND_TIME`,那么用户指定的timestamp将会被丢弃,而broker则会负责为timestamp赋值。 mertics和partitions方法则会被委派给了底层KafkaProducer的同名方法,execute接口则是提供了对底层KafkaProducer的直接访问。 要使用KafkaTemplate,可以配置一个producer factory并将其提供给KafkaTemplate的构造方法。如下展示了如何配置一个KafkaTemplate: ```java @Bean public ProducerFactory producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public Map producerConfigs() { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // See https://kafka.apache.org/documentation/#producerconfigs for more properties return props; } @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate(producerFactory()); } ``` 从2.5开始,创建KafkaTemplate时可以基于factory进行创建,但是覆盖factory中的配置属性,具体示例如下: ```java @Bean public KafkaTemplate stringTemplate(ProducerFactory pf) { return new KafkaTemplate<>(pf); } @Bean public KafkaTemplate bytesTemplate(ProducerFactory pf) { return new KafkaTemplate<>(pf, Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class)); } ``` 当使用KafkaTemplate接收`Message\`类型的参数时,可以将topic、partition、key和timestamp参数指定在Message的header中,header中包含如下条目: - KafkaHeaders.TOPIC - KafkaHeaders.PARTITION - KafkaHeaders.KEY - KafkaHeaders.TIMESTAMP 除了调用发送方法获取CompletableFuture外,还可以为KafkaTemplate配置一个ProducerListener,从而在消息发送完成(成功或失败)后执行一个异步的回调。如下是ProducerListener接口的定义: ```java public interface ProducerListener { void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata); void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception); } ``` 默认情况下,KafkaTemplate配置了一个LoggingProducerListener,会在发送失败时打印失败日志,在发送成功时并不做任何事。并且为了方便起见,方法的默认实现已经被提供,可以只覆盖其中一个方法。 send方法默认返回的是CompletableFuture类型,可以在发送完成之后为future注册一个回调: ```java CompletableFuture> future = template.send("myTopic", "something"); future.whenComplete((result, ex) -> { ... }); ``` 其中,Throwable类型的ex可以被转化为`KafkaProducerException`,该类型的failedProducerRecord属性可以获取发送失败的record。 如果想要同步调用KafkaTemplate的发送方法并且等待返回结果,可以调用返回值CompletableFuture类型的get方法来同步等待。通常情况下,调用`CompletableFuture.get`时,推荐使用带超时参数的方法。如果在Producer配置中指定了`linger.ms`,那么在等待返回结果之前需要调用KafkaTemplate的flush方法。为了方便,KafkaTemplate提供了带autoFlush参数的构造器版本,如果设置autoFlush为true,kafkaTemplate在每次发送消息时都会调用flush方法。 ### 发送示例 如下展示了通过KafkaTemplate向broker发送消息的示例: ```java // async public void sendToKafka(final MyOutputData data) { final ProducerRecord record = createRecord(data); try { template.send(record).get(10, TimeUnit.SECONDS); handleSuccess(data); } catch (ExecutionException e) { handleFailure(data, record, e.getCause()); } catch (TimeoutException | InterruptedException e) { handleFailure(data, record, e); } } ``` ```java // sync public void sendToKafka(final MyOutputData data) { final ProducerRecord record = createRecord(data); try { template.send(record).get(10, TimeUnit.SECONDS); handleSuccess(data); } catch (ExecutionException e) { handleFailure(data, record, e.getCause()); } catch (TimeoutException | InterruptedException e) { handleFailure(data, record, e); } } ``` ### RoutingKafkaTemplate 从2.5版本开始,额可以通过RoutingKafkaTemplate在运行时选择producer实例,选择过程基于topic名称。 > RoutingKafkaTemplate不支持事务,也不支持execute、flush、metrics等方法,因为RoutingKafkaTemplate根据topic来选择producer,但是在执行这些操作时并不知道操作所属topic。 RoutingKafkaTemplate需要一个map,map的key为`java.util.regex.Pattern`,而value则是`ProducerFactory`实例。该map必须是有序的(例如LinkedHashMap),该map需要按顺序遍历,顺序在前的key-value对会优先匹配。 如下示例展示了如何通过一个RoutingKafkaTemplate向不同的topic发送消息,实例中每个topic都使用不同的序列化方式。 ```java @SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } @Bean public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context, ProducerFactory pf) { // Clone the PF with a different Serializer, register with Spring for shutdown Map configs = new HashMap<>(pf.getConfigurationProperties()); configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); DefaultKafkaProducerFactory bytesPF = new DefaultKafkaProducerFactory<>(configs); context.registerBean("bytesPF", DefaultKafkaProducerFactory.class, () -> bytesPF); Map> map = new LinkedHashMap<>(); map.put(Pattern.compile("two"), bytesPF); map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer return new RoutingKafkaTemplate(map); } @Bean public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) { return args -> { routingTemplate.send("one", "thing1"); routingTemplate.send("two", "thing2".getBytes()); }; } } ``` ### 使用DefaultKafkaProducerFactory ProducerFactory是用于创建生产者实例的。当没有使用事务时,默认情况下,`DefaultKafkaFactory`会创建一个单例的生产者实例,所有客户端都会使用生产者实例。但是,如果在template中调用了flush方法,这将会对其他同样使用该生产者实例的client操作造成阻塞。从2.3版本开始,DefaultKafkaFactory有了新的`producerPerThread`属性,当该属性设置为true时,factory会针对每个线程都创建并缓存一个producer实例。 > 当`producerPerThread`被设置为true时,若线程中的producer不再被需要,那么对factory必须手动调用`closeThreadBoundProducer()`。这将会物理上对producer进行关闭,并且从ThreadLocal中移除producer实例。单纯调用close或是destroy方法并不会清除这些producer实例。 当创建DefaultKafkaFactory时,key serializer或是value serializer可以通过DefaultKafkaFactory的构造函数单独指定。在通过构造函数指定factory的key serializer/value serializer时,可以选择向构造函数中传入serializer实例或是传入serializer supplier对象: - 当传入serializer实例时,通过该factory创建的所有生产者实例都共享该serializer实例 - 当传入的是返回一个serializer的supplier时,可令通过该factory创建的producer实例都拥有属于自己的serializer 如下是创建DefaultKafkaProducerFactory bean对象并且制定serializer的示例: ```java @Bean public ProducerFactory producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer()); } @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate(producerFactory()); } ``` 从2.5.10版本开始,可以在factory创建之后再更新factory的producer config属性。例如,可以在运行时更新ssl key/trust的存储路径。该更新操作并不会影响到已经被创建的producer实例,故而需要调用factory的reset方法,在调用reset后所有现存producer实例都会被关闭,而之后新创建的producer都会使用新的属性配置。 > 在运行时更新生产者属性时,无法将事务的生产者变为非事务的,也无法将非事务的生产者变为事务的。 为了更新producer属性配置,factory提供了如下两个接口: ```java void updateConfigs(Map updates); void removeConfig(String configKey); ``` ### ReplyingKafkaTemplate 从2.1.3版本开始,kafka引入了ReplyingKafkaTemplate,其是KafkaTemplate的一个子类,用于提供request/reply语义。该类相比父类含有两个额外的方法: ```java RequestReplyFuture sendAndReceive(ProducerRecord record); RequestReplyFuture sendAndReceive(ProducerRecord record, Duration replyTimeout); ``` 该方法的返回类型RequestReplyFuture继承了CompletableFuture,RequestReplyFuture会异步的注入该future的结果(可能正常返回,也可能是一个exception或者timeout)。 RequestReplyFuture含有一个sendFuture属性,该属性是调用kafkaTemplate的send方法发送消息的结果,类型为`CompletableFuture>`,可以通过该属性future来判断发送消息操作的结果。 如果在调用sendAndReceive方法时没有传递replyTimeout参数,或是指定replyTimeout参数为null,那么该template的`defaultReplyTimeout`属性将会被用作超时时间。默认情况下,该超时属性为5s。 从2.8.8版本开始,该template还有一个`waitForAssingment`方法。当reply container被配置为`auto.offset.reset=latest`时waitForAssingment方法相当有用,避免当reply container尚未初始化完成时,发送消息对应的reply已经返回了。 如下展示了如何使用ReplyingKafkaTemplate: ```java @SpringBootApplication public class KRequestingApplication { public static void main(String[] args) { SpringApplication.run(KRequestingApplication.class, args).close(); } @Bean public ApplicationRunner runner(ReplyingKafkaTemplate template) { return args -> { if (!template.waitForAssignment(Duration.ofSeconds(10))) { throw new IllegalStateException("Reply container did not initialize"); } ProducerRecord record = new ProducerRecord<>("kRequests", "foo"); RequestReplyFuture replyFuture = template.sendAndReceive(record); SendResult sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS); System.out.println("Sent ok: " + sendResult.getRecordMetadata()); ConsumerRecord consumerRecord = replyFuture.get(10, TimeUnit.SECONDS); System.out.println("Return value: " + consumerRecord.value()); }; } @Bean public ReplyingKafkaTemplate replyingTemplate( ProducerFactory pf, ConcurrentMessageListenerContainer repliesContainer) { return new ReplyingKafkaTemplate<>(pf, repliesContainer); } @Bean public ConcurrentMessageListenerContainer repliesContainer( ConcurrentKafkaListenerContainerFactory containerFactory) { ConcurrentMessageListenerContainer repliesContainer = containerFactory.createContainer("kReplies"); repliesContainer.getContainerProperties().setGroupId("repliesGroup"); repliesContainer.setAutoStartup(false); return repliesContainer; } @Bean public NewTopic kRequests() { return TopicBuilder.name("kRequests") .partitions(10) .replicas(2) .build(); } @Bean public NewTopic kReplies() { return TopicBuilder.name("kReplies") .partitions(10) .replicas(2) .build(); } } ``` 在上述示例中采用了spring自动注入的containerFactory来创建reply container。 > #### ErrorHandlingDeserializer > 可以考虑在reply container中使用ErrorHandlingDeserializer,如果反序列化失败,RequestReplyFuture将会以异常状态完成,可以访问获取到的ExecutionException,其cause属性中包含DeserializationException。 ### kafka poison pill & ErrorHandlingDeserializer poison pill在kafka中是指一条被发送到kafka topic中的消息始终被消费失败,不管重试过多少次之后仍然无法成功被消费。 poison pill可能在如下场景下产生: - 该记录被损坏 - 该记录发序列化失败 在生产场景中,consumer应该配置正确的deserializer来对生产者示例序列化的记录进行反序列化操作。但如果生产者的serializer和消费者的deserializer不兼容,将会进入到poison pill的场景。该不兼容情况对key和value的序列化->反序列化场景都有可能发生。 在现实场景中,可能因为如下缘故而遭遇poison pill: - 生产者改变了key或value的serializer并且持续向先前的topic中发送消息,这将会导致反序列化问题 - consumer的key或value deserializer配置错误 - 不同的生产者实例,使用不同的key或value serializer向topic中发送消息 在发生poison后,consumer在调用poll拉取数据时将无法反序列化record,调用poll时会一直抛出反序列化异常。并且消费者也无法针对posion pill进行处理,针对该topic分区的消费会被阻塞(因为consumer offset一直无法向前移动)。并且,在consumer不停重试针对该消息的反序列化时,大量的反序列化失败日志将会被追加到日志文件中,磁盘占用量将会急剧增大。 #### ErrorHandlingDeserializer 为了解决poison pill问题,spring引入了ErrorHandlingDeserializer,该deserializer将反序列化工作委托给了一个真实的deserializer。如果底层受托的deserializer反序列化失败,那么ErrorHandlingDeserializer将会返回一个null,并且在传入的headers中设置DeserializationException对象。DeserializationException对象中包含cause和raw bytes。