From 63a9ed1bca7725b8945fc15f0084d3128beaa2e8 Mon Sep 17 00:00:00 2001 From: Rikako Wu <496063163@qq.com> Date: Wed, 22 Feb 2023 21:55:05 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BB=A7=E7=BB=AD=E5=85=B3=E4=BA=8EKafka?= =?UTF-8?q?=E6=96=87=E6=A1=A3=E7=9A=84=E9=98=85=E8=AF=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mq/kafka/Spring for Apache Kafka.md | 45 ++++++++++++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/mq/kafka/Spring for Apache Kafka.md b/mq/kafka/Spring for Apache Kafka.md index a8dccfe..ad4b746 100644 --- a/mq/kafka/Spring for Apache Kafka.md +++ b/mq/kafka/Spring for Apache Kafka.md @@ -340,6 +340,49 @@ public void sendToKafka(final MyOutputData data) { 可以通过RoutingKafkaTemplate在运行时根据目标topic的名称选定producer。 > routing template不支持事务、execute、flush、metrics等操作,因为这些操作不知道topic名称 -routing template需要一个key和value分别为`java.util.regex.Pattern`和`ProducerFactory`的map实例。 +routing template需要一个key和value分别为`java.util.regex.Pattern`和`ProducerFactory`的map实例。该map应该是有序的,因为map会按顺序遍历(例如LinkedHashMap集合)。并且,应该在map最开始的时候指定更加具体的pattern。 +如下示例会展示如何通过一个template向不同的topic发送消息,并且值的序列化会采用不同的序列化器: +```java +@SpringBootApplication +public class Application { + + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } + + @Bean + public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context, + ProducerFactory pf) { + + // Clone the PF with a different Serializer, register with Spring for shutdown + Map configs = new HashMap<>(pf.getConfigurationProperties()); + configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + DefaultKafkaProducerFactory bytesPF = new DefaultKafkaProducerFactory<>(configs); + context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF); + + Map> map = new LinkedHashMap<>(); + map.put(Pattern.compile("two"), bytesPF); + map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer + return new RoutingKafkaTemplate(map); + } + + @Bean + public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) { + return args -> { + routingTemplate.send("one", "thing1"); + routingTemplate.send("two", "thing2".getBytes()); + }; + } + +} +``` +##### DefaultKafkaProducerFactory +在上述template的使用示例中,创建一个KafkaTemplate需要使用ProducerFactory。 +当不使用transaction时,默认情况下,DefaultKafkaProducerFactory创建一个单例producer,单例producer由所有客户端使用。但是,如果在template上调用了flush方法,可能会造成其使用同一producer的其他线程的延迟。 +> DefaultKafkaProducerFactory有一个属性producerPerThread,当该属性被设置为true时,Kafka会对每一个线程都创建一个producer,以此来避免这个问题。 + +> 当producerPerThread被设置为true时,若producer不再需要,用户代码必须在factory上调用closeThreadBoundProducer方法,这回物理的关闭producer并且将其中ThreadLocal中移除。调用reset或destroy并不会清除这些producer + +可以在factory创建之后,对producer properties进行更新。