From 48b5f46d52b41f2797e64daed6d3998f4ee5ac5b Mon Sep 17 00:00:00 2001 From: asahi Date: Wed, 27 Dec 2023 00:31:46 +0800 Subject: [PATCH] =?UTF-8?q?=E9=98=85=E8=AF=BBspring=20kafka=E5=8F=91?= =?UTF-8?q?=E9=80=81=E6=B6=88=E6=81=AF=E7=9A=84=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- spring/spring kafka/spring kafka.md | 111 ++++++++++++++++++++++++++++ 1 file changed, 111 insertions(+) diff --git a/spring/spring kafka/spring kafka.md b/spring/spring kafka/spring kafka.md index 1561071..14e0df0 100644 --- a/spring/spring kafka/spring kafka.md +++ b/spring/spring kafka/spring kafka.md @@ -244,3 +244,114 @@ future.whenComplete((result, ex) -> { 其中,Throwable类型的ex可以被转化为`KafkaProducerException`,该类型的failedProducerRecord属性可以获取发送失败的record。 如果想要同步调用KafkaTemplate的发送方法并且等待返回结果,可以调用返回值CompletableFuture类型的get方法来同步等待。通常情况下,调用`CompletableFuture.get`时,推荐使用带超时参数的方法。如果在Producer配置中指定了`linger.ms`,那么在等待返回结果之前需要调用KafkaTemplate的flush方法。为了方便,KafkaTemplate提供了带autoFlush参数的构造器版本,如果设置autoFlush为true,kafkaTemplate在每次发送消息时都会调用flush方法。 + +### 发送示例 +如下展示了通过KafkaTemplate向broker发送消息的示例: +```java +// async +public void sendToKafka(final MyOutputData data) { + final ProducerRecord record = createRecord(data); + + try { + template.send(record).get(10, TimeUnit.SECONDS); + handleSuccess(data); + } + catch (ExecutionException e) { + handleFailure(data, record, e.getCause()); + } + catch (TimeoutException | InterruptedException e) { + handleFailure(data, record, e); + } +} +``` +```java +// sync +public void sendToKafka(final MyOutputData data) { + final ProducerRecord record = createRecord(data); + + try { + template.send(record).get(10, TimeUnit.SECONDS); + handleSuccess(data); + } + catch (ExecutionException e) { + handleFailure(data, record, e.getCause()); + } + catch (TimeoutException | InterruptedException e) { + handleFailure(data, record, e); + } +} +``` + +### RoutingKafkaTemplate +从2.5版本开始,额可以通过RoutingKafkaTemplate在运行时选择producer实例,选择过程基于topic名称。 + +> RoutingKafkaTemplate不支持事务,也不支持execute、flush、metrics等方法,因为RoutingKafkaTemplate根据topic来选择producer,但是在执行这些操作时并不知道操作所属topic。 + +RoutingKafkaTemplate需要一个map,map的key为`java.util.regex.Pattern`,而value则是`ProducerFactory`实例。该map必须是有序的(例如LinkedHashMap),该map需要按顺序遍历,顺序在前的key-value对会优先匹配。 + +如下示例展示了如何通过一个RoutingKafkaTemplate向不同的topic发送消息,实例中每个topic都使用不同的序列化方式。 +```java +@SpringBootApplication +public class Application { + + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } + + @Bean + public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context, + ProducerFactory pf) { + + // Clone the PF with a different Serializer, register with Spring for shutdown + Map configs = new HashMap<>(pf.getConfigurationProperties()); + configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + DefaultKafkaProducerFactory bytesPF = new DefaultKafkaProducerFactory<>(configs); + context.registerBean("bytesPF", DefaultKafkaProducerFactory.class, () -> bytesPF); + + Map> map = new LinkedHashMap<>(); + map.put(Pattern.compile("two"), bytesPF); + map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer + return new RoutingKafkaTemplate(map); + } + + @Bean + public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) { + return args -> { + routingTemplate.send("one", "thing1"); + routingTemplate.send("two", "thing2".getBytes()); + }; + } + +} +``` +### 使用DefaultKafkaProducerFactory +ProducerFactory是用于创建生产者实例的。当没有使用事务时,默认情况下,`DefaultKafkaFactory`会创建一个单例的生产者实例,所有客户端都会使用生产者实例。但是,如果在template中调用了flush方法,这将会对其他同样使用该生产者实例的client操作造成阻塞。从2.3版本开始,DefaultKafkaFactory有了新的`producerPerThread`属性,当该属性设置为true时,factory会针对每个线程都创建并缓存一个producer实例。 +> 当`producerPerThread`被设置为true时,若线程中的producer不再被需要,那么对factory必须手动调用`closeThreadBoundProducer()`。这将会物理上对producer进行关闭,并且从ThreadLocal中移除producer实例。单纯调用close或是destroy方法并不会清除这些producer实例。 + +当创建DefaultKafkaFactory时,key serializer或是value serializer可以通过DefaultKafkaFactory的构造函数单独指定。在通过构造函数指定factory的key serializer/value serializer时,可以选择向构造函数中传入serializer实例或是传入serializer supplier对象: +- 当传入serializer实例时,通过该factory创建的所有生产者实例都共享该serializer实例 +- 当传入的是返回一个serializer的supplier时,可令通过该factory创建的producer实例都拥有属于自己的serializer + +如下是创建DefaultKafkaProducerFactory bean对象并且制定serializer的示例: +```java +@Bean +public ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer()); +} + +@Bean +public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate(producerFactory()); +} +``` + +从2.5.10版本开始,可以在factory创建之后再更新factory的producer config属性。例如,可以在运行时更新ssl key/trust的存储路径。该更新操作并不会影响到已经被创建的producer实例,故而需要调用factory的reset方法,在调用reset后所有现存producer实例都会被关闭,而之后新创建的producer都会使用新的属性配置。 +> 在运行时更新生产者属性时,无法将事务的生产者变为非事务的,也无法将非事务的生产者变为事务的。 + +为了更新producer属性配置,factory提供了如下两个接口: +```java +void updateConfigs(Map updates); + +void removeConfig(String configKey); +``` +