diff --git a/spring/spring kafka/spring kafka.md b/spring/spring kafka/spring kafka.md index 45548ee..f9bf4ae 100644 --- a/spring/spring kafka/spring kafka.md +++ b/spring/spring kafka/spring kafka.md @@ -50,6 +50,17 @@ - [seek到指定offset](#seek到指定offset) - [Container Factory](#container-factory) - [线程安全](#线程安全) + - [监控](#监控) + - [监控listener性能](#监控listener性能) + - [监控KafkaTemplate性能](#监控kafkatemplate性能) + - [Micrometer Native Metrics](#micrometer-native-metrics) + - [事务](#事务) + - [使用KafkaTransactionManager](#使用kafkatransactionmanager) + - [事务同步](#事务同步) + - [使用由Consumer发起的事务](#使用由consumer发起的事务) + - [KafkaTemplate本地事务](#kafkatemplate本地事务) + - [KafkaTemplate事务发布和非事务发布](#kafkatemplate事务发布和非事务发布) + - [事务结合BatchListener使用](#事务结合batchlistener使用) # Spring Kafka @@ -2003,6 +2014,236 @@ public KafkaListenerContainerFactory kafkaListenerContainerFactory() { > 默认情况下,spring将会在事件触发的线程中调用event listener。如果将multicaster改为async executor,那么清理操作将没有作用。 +## 监控 +### 监控listener性能 +从2.3版本开始,如果在classpath中检测到`Micrometer`,并且spring容器中只有一个`MeterRegistry`实例,listener container会为listener自动创建和更新Micrometer timer。如果想要禁用micrometer timer,可以将ContainerProperties中的`micrometerEnabled`设置为false。 + +container会为listener维护两个timer,一个记录成功的调用,一个记录失败的调用。 + +timer根据`spring.kafka.listener`命名,并且含有如下的tag: +- name : (container bean name) +- result : success or failure(两个timer分开统计) +- exception: none or ListenerExecutionFailedException + +可以使用`ContainerProperties`的`micrometerTags`属性来添加额外的tag。 + +从2.9.8、3.0.6版本开始,可以为`ContainerProperties`中的`micrometerTagsProvider`属性中提供一个方法,改方法获取`ConsumerRecord`并且基于record返回tags,并且将tag与`micrometerTags`属性中的任意static tags合并。 + +### 监控KafkaTemplate性能 +从2.5版本开始,如果在classpath中检测到`Micrometer`,并且spring容器中只有一个`MeterRegistry`实例,template会自动的创建并且更新`Micrometer Timer`。如果想要禁用micrometer timer,可以将template的`micrometerEnabled`设置为false。 + +同样会为template维护两个timer,一个记录成功的调用,一个记录失败的调用。 + +timer根据`spring.kafka.template`命名,并且含有如下tag: +- name : (template bean name) +- result : success or failure(两个timer分开统计) +- exception: none or ListenerExecutionFailedException + +### Micrometer Native Metrics +从2.5版本开始,spring kafka框架提供了`Factory Listeners`用于管理Micrometer `KafkaClientMetrics`实例,用于监听producer或consumer的创建或关闭。 + +用于启用该属性,只需要为producer或consumer factory添加listener即可: +```java +@Bean +public ConsumerFactory myConsumerFactory() { + Map configs = consumerConfigs(); + ... + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(configs); + ... + cf.addListener(new MicrometerConsumerListener(meterRegistry(), + Collections.singletonList(new ImmutableTag("customTag", "customTagValue")))); + ... + return cf; +} + +@Bean +public ProducerFactory myProducerFactory() { + Map configs = producerConfigs(); + configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId"); + ... + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(configs); + ... + pf.addListener(new MicrometerProducerListener(meterRegistry(), + Collections.singletonList(new ImmutableTag("customTag", "customTagValue")))); + ... + return pf; +} +``` +consumer/producer id将会作为`spring.id`被传递到tag中。 + +获取metrics的示例如下: +```java +double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total") + .tag("customTag", "customTagValue") + .tag("spring.id", "myProducerFactory.myClientId-1") + .functionCounter() + .count(); +``` +## 事务 +spring kafka通过如下方式添加了对事务的支持: +- KafkaTransactionManager:和spring transaction支持类似(@Transaction、TransactionTemplate) +- 使用Transactional KafkaMessageListenerContainer +- kafkaTemplate支持本地事务 +- 和其他transactionManager同步 + +当为DefaultKafkaProducerFactory提供了`transactionPrefixId`属性时,事务是默认启用的。在指定了事务前缀id的情况下,DefaultKafkaProducerFactory并不是只维护一个transactional producer,而是维护了一个transactional producers cache。当在producer上调用close时,producer将会被归还到cache中并在后续操作中被重用。对于每个producer来说,`transaction.id`属性为`transactionPrefixId + n`,n从0开始并且对每个producer依次递增。 + +> 如果当前服务存在多个实例,那么各个实例的`transactionPrefixId`属性必须都不同。 + +对于使用了springboot的程序,只需要为producer factory设置`spring.kafka.producer.transaction-id-prefix`属性即可,spring boot会自动装配`KafkaTransactionManager`bean 对象并将其注入到listener container。 + +### 使用KafkaTransactionManager +`KafkaTransactionManager`是`PlatformTransactionManager`的一个实现,其构造器中提供了一个到producer factory的引用参数。如果为该参数指定了自定义的producer factory,其必须支持事务。 + +可以将KafkaTransactionManager和spring transaction support结合使用,如果事务被启用,那么在事务范围内的KafkaTemplate操作将会使用transactional producer。取决事务执行成功或者失败,transaction manager将会被事务进行提交或回滚。KafkaTemplate必须和ProducerFactory使用相同的事务管理器。 + +### 事务同步 +本节只涉及到由producer发起的事务(不包含由listener container发起的事务)。 + +如果想要在发送消息到kafka的同时执行一些数据库更新操作,可以使用正常的spring transaction management,例如DataSourceTransactionManager。 +```java +@Transactional +public void process(List things) { + things.forEach(thing -> this.kafkaTemplate.send("topic", thing)); + updateDb(things); +} +``` +@Transactional注解的拦截器将会开启一个事务,并且KafkaTemplate将会与该transaction manager同步一个事务,每次发送record的操作都会加入到该事务。当方法退出时,database transaction提交之后,kafka transaction才会被提交。 + +如果想要让kafka事务先被提交,应该嵌套使用@Transactional注解。外部方法的@Transactional其transactionManager属性应该被配置为DataSourceTransactionManager,内部方法的@Transactional注解其transactionManager属性被配置为kafkaTransactionManager。 + +### 使用由Consumer发起的事务 +从2.7版本开始,`ChainedKafkaTransactionManager`被废弃。container中会通过KafkaTransactionMananger来启用一个事务,并且在listener method上可以加上@Transactional来开启其他非kafka事务。 + +在spring boot中listener container会自动注入kafkaTransactionManager。listener container会开启kafka transaction,并且可以通过@Transactional注解来开启db transaction。 + +db事务会在kafka事务之前提交,如果kafka事务提交失败,那么该recrod会被重新传递,故而数据库操作必须是幂等的。 + +使用示例如下: +```java +@SpringBootApplication +public class Application { + + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } + + @Bean + public ApplicationRunner runner(KafkaTemplate template) { + return args -> template.executeInTransaction(t -> t.send("topic1", "test")); + } + + @Bean + public DataSourceTransactionManager dstm(DataSource dataSource) { + return new DataSourceTransactionManager(dataSource); + } + + @Component + public static class Listener { + + private final JdbcTemplate jdbcTemplate; + + private final KafkaTemplate kafkaTemplate; + + public Listener(JdbcTemplate jdbcTemplate, KafkaTemplate kafkaTemplate) { + this.jdbcTemplate = jdbcTemplate; + this.kafkaTemplate = kafkaTemplate; + } + + @KafkaListener(id = "group1", topics = "topic1") + @Transactional("dstm") + public void listen1(String in) { + this.kafkaTemplate.send("topic2", in.toUpperCase()); + this.jdbcTemplate.execute("insert into mytable (data) values ('" + in + "')"); + } + + @KafkaListener(id = "group2", topics = "topic2") + public void listen2(String in) { + System.out.println(in); + } + + } + + @Bean + public NewTopic topic1() { + return TopicBuilder.name("topic1").build(); + } + + @Bean + public NewTopic topic2() { + return TopicBuilder.name("topic2").build(); + } + +} +``` + +```properties +spring.datasource.url=jdbc:mysql://localhost/integration?serverTimezone=UTC +spring.datasource.username=root +spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver + +spring.kafka.consumer.auto-offset-reset=earliest +spring.kafka.consumer.enable-auto-commit=false +spring.kafka.consumer.properties.isolation.level=read_committed + +spring.kafka.producer.transaction-id-prefix=tx- + +#logging.level.org.springframework.transaction=trace +#logging.level.org.springframework.kafka.transaction=debug +#logging.level.org.springframework.jdbc=debug +``` + +对于只存在于producer中的transaction,可以使用transaction synchronize: +```java +@Transactional("dstm") +public void someMethod(String in) { + this.kafkaTemplate.send("topic2", in.toUpperCase()); + this.jdbcTemplate.execute("insert into mytable (data) values ('" + in + "')"); +} +``` +kafkaTemplate将会将kafka transaction和db transaction进行同步,并且kafka事务在db事务之后才执行commit或rollback。 + +如果想要先提交kafka transaction,再提交db transaction,可以使用嵌套@Transactional,示例如下所示: +```java +@Transactional("dstm") +public void someMethod(String in) { + this.jdbcTemplate.execute("insert into mytable (data) values ('" + in + "')"); + sendToKafka(in); +} + +@Transactional("kafkaTransactionManager") +public void sendToKafka(String in) { + this.kafkaTemplate.send("topic2", in.toUpperCase()); +} +``` +### KafkaTemplate本地事务 +可以通过kafkaTemplate在local transaction中执行一系列操作。如下为使用示例: +```java +boolean result = template.executeInTransaction(t -> { + t.sendDefault("thing1", "thing2"); + t.sendDefault("cat", "hat"); + return true; +}); +``` +上述示例中,callback的参数代表template本身。如果该callback正常退出,那么事务则执行commit操作;如果callback抛出异常,那么transaction将会回滚。 + +> 如果在调用executeInTransaction时,已经存在KafkaTransactionManager事务或synchronized事务,那么kafkaTransactionManager事务不会被使用,而会使用由executeInTransaction方法开启的事务 + +### KafkaTemplate事务发布和非事务发布 +一般来说,当KafkaTemplate是事务的(producer factory配置了transactionIdPrefix),那么提交record的操作应该是事务的。开启事务可以通过transactionManager、@Transactional方法,executeInTransaction调用,或由listener container开启。任何在事务范围内使用template的行为将会导致抛出`IllegalStateException`。从2.4.3版本开始,可以设置template的`allowNonTransactional`属性从而允许kafkaTemplate执行非事务操作。 + +在allowNonTransactional属性被设置为true时,将会调用producer container的createNonTransactionalProducer方法创建非事务producer,该nontransactional producer创建后也会被缓存,或与线程相绑定,从而进行重用。 + +### 事务结合BatchListener使用 +当listener在事务存在时执行失败,rollback触发之后会调用AfterRollbackProcessor来执行一些操作。在recordListener使用默认的AfterRollbackProcessor时,会执行seek操作,故而失败的消息将会被重新传递。 + +在使用batch listener时,如果处理失败,那么整个batch中的消息都会被重新传递,因为framwork不知道在batch中具体那条record处理失败。 + + + + + +