Files
rikako-note/spring/spring kafka/spring kafka.md
2023-12-27 00:31:46 +08:00

17 KiB
Raw Blame History

Spring Kafka

连接到kafka

运行时切换bootstrap servers

从2.5版本开始KafkaAdmin、ProducerFactory、ConsumerFactory都继承于KafkaResourceFactory抽象类。通过调用KafkaResourceFactory抽象类的setBootstrapServersSupplier(Supplier<String> bootstrapServersSupplier)方法可以在运行时动态的切换bootstrap servers。该Supplier将会在新建连接获取bootstrap servers时被调用。

切换bootstrap后关闭旧consumer和producer

kafka consumer和producer通常都是基于长连接的在调用setBootstrapServersSupplier在运行时切换bootstrap servers后如果想要关闭现存的producer可以调用DefaultKafkaProducerFactoryreset方法。如果想要关闭现存的consumer可以调用KafkaListenerEndpointRegistryclose方法调用close后再调用start或是调用其他listener container的close和start方法。

ABSwitchCluster

为了方便起见framework提供了ABSwitchCluster该类支持两套bootstrap servers集合在任一时刻只有其中一套bootstrap servers起作用。ABSwitchCluster类继承Supplier<String>接口,将ABSwitchCluster对象提供给consumer factory, producer factory, KafkaAdmin后如果想要切换bootstrap servers可以调用ABSwitchCluster类的primarysecondary方法并关闭生产者和消费者的旧实例关闭生产者旧实例在producer factory上调用reset方法用于创建到新bootstrap servers的连接对于消费者实例可以对所有listener container先调用close方法再调用start方法当使用@KafkaListener注解时需要对KafkaListenerEndpointRegistrybean对象调用close和start方法。

Factory Listener

从2.5版本开始,DefaultKafkaProducerFactoryDefaultKafkaConsumerFactory都可以配置Listener通过配置Listener可以监听生产者或消费者实例的创建和关闭。

// producer listener
interface Listener<K, V> {

    default void producerAdded(String id, Producer<K, V> producer) {
    }

    default void producerRemoved(String id, Producer<K, V> producer) {
    }

}
// consumer listener
interface Listener<K, V> {

    default void consumerAdded(String id, Consumer<K, V> consumer) {
    }

    default void consumerRemoved(String id, Consumer<K, V> consumer) {
    }

}

再上述接口中id代表再factory bean对象名称后追加client-id属性二者通过.分隔。

配置Topic

如果在当前应用上下文中定义了KafkaAdmin bean对象kafkaAdmin可以自动的添加topic到broker。为了实现topic的自动添加可以定义一个NewTopic类型的bean对象kafkaAdmin会自动将该topic添加到broker中。

为了方便topic的创建2.3版本中引入了TopicBuilder类。

@Bean
public KafkaAdmin admin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    return new KafkaAdmin(configs);
}

@Bean
public NewTopic topic1() {
    return TopicBuilder.name("thing1")
            .partitions(10)
            .replicas(3)
            .compact()
            .build();
}

@Bean
public NewTopic topic2() {
    return TopicBuilder.name("thing2")
            .partitions(10)
            .replicas(3)
            .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
            .build();
}

@Bean
public NewTopic topic3() {
    return TopicBuilder.name("thing3")
            .assignReplicas(0, List.of(0, 1))
            .assignReplicas(1, List.of(1, 2))
            .assignReplicas(2, List.of(2, 0))
            .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
            .build();
}

从2.6版本开始创建NewTopic时可以省略partitions()和replicas()方法的调用此时创建的topic将会使用broker中默认的配置。支持该特性要求broker版本至少为2.4.0。

@Bean
public NewTopic topic4() {
    return TopicBuilder.name("defaultBoth")
            .build();
}

@Bean
public NewTopic topic5() {
    return TopicBuilder.name("defaultPart")
            .replicas(1)
            .build();
}

@Bean
public NewTopic topic6() {
    return TopicBuilder.name("defaultRepl")
            .partitions(3)
            .build();
}

从版本2.7开始,可以在KafkaAdmin.NewTopics的bean对象中声明多个NewTopic对象

@Bean
public KafkaAdmin.NewTopics topics456() {
    return new NewTopics(
            TopicBuilder.name("defaultBoth")
                .build(),
            TopicBuilder.name("defaultPart")
                .replicas(1)
                .build(),
            TopicBuilder.name("defaultRepl")
                .partitions(3)
                .build());
}

当使用spring boot时KafkaAdmin对象将会被自动注册故而只需要定义NewTopic bean对象即可。

默认情况下如果kafka broker不可用会输出日志进行记录但是此时context的载入还会继续后续可以手动调用KafkaAdmin的initalize方法和进行重试。如果想要在kafka broker不可用时停止context的载入可以将kafka AdminfatalIfBrokerNotAvailable属性设置为true此时context会初始化失败。

从版本2.7开始KafkaAdmin提供了两个方法用于在运行时动态创建和检测Topic

  • createOrModifyTopics
  • describeTopics

从版本2.9.10、3.0.9开始KafkaAdmin提供了setCreateOrModifyTopic(Predicate<org.apache.kafka.clients.admin.NewTopic> createOrModifyTopic)接口该接口接收一个Predicate<NewTopic>参数通过该predicate可以判断是否一个NewTopic bean应该被该kafkaAdmin创建或修改。该方法通常用于上下文中含有多个KafkaAdmin bena对象每个kafkaAdmin对应不同的broker集群在上下文中含有多个NewTopic对象时可以通过predicate判断每个topic应该属性哪个amdin。

发送消息

KafkaTemplate类对KafkaProducer进行了包装提供了如下接口用于向kafka topic发送消息。

CompletableFuture<SendResult<K, V>> sendDefault(V data);

CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, V data);

CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

CompletableFuture<SendResult<K, V>> send(Message<?> message);

Map<MetricName, ? extends Metric> metrics();

List<PartitionInfo> partitionsFor(String topic);

<T> T execute(ProducerCallback<K, V, T> callback);

<T> T executeInTransaction(OperationsCallback<K, V, T> callback);

// Flush the producer.
void flush();

interface ProducerCallback<K, V, T> {

    T doInKafka(Producer<K, V> producer);

}

interface OperationsCallback<K, V, T> {

    T doInOperations(KafkaOperations<K, V> operations);

}

其中sendDefault接口需要向KafkaTemplate提供一个默认的topic。

kafkaTemplate中部分api接收timestamp作为参数并且将timestamp存储到record中。接口中指定的timestamp参数如何存储取决于kafka topic中配置的timestamp类型。如果topic中timestamp类型被配置为CREATE_TIME那么用户指定的timestamp参数将会被使用如果用户没有指定timestamp那么会自动创建timestampproducer会在发送时将timestamp指定为System.currentTimeMillis()。如果topic中timstamp类型被配置为LOG_APPEND_TIME那么用户指定的timestamp将会被丢弃而broker则会负责为timestamp赋值。

mertics和partitions方法则会被委派给了底层KafkaProducer的同名方法execute接口则是提供了对底层KafkaProducer的直接访问。

要使用KafkaTemplate可以配置一个producer factory并将其提供给KafkaTemplate的构造方法。如下展示了如何配置一个KafkaTemplate

@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // See https://kafka.apache.org/documentation/#producerconfigs for more properties
    return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
    return new KafkaTemplate<Integer, String>(producerFactory());
}

从2.5开始创建KafkaTemplate时可以基于factory进行创建但是覆盖factory中的配置属性具体示例如下

@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
    return new KafkaTemplate<>(pf);
}

@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
    return new KafkaTemplate<>(pf,
            Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}

当使用KafkaTemplate接收Message\<?\>类型的参数时可以将topic、partition、key和timestamp参数指定在Message的header中header中包含如下条目

  • KafkaHeaders.TOPIC
  • KafkaHeaders.PARTITION
  • KafkaHeaders.KEY
  • KafkaHeaders.TIMESTAMP

除了调用发送方法获取CompletableFuture外还可以为KafkaTemplate配置一个ProducerListener从而在消息发送完成成功或失败后执行一个异步的回调。如下是ProducerListener接口的定义

public interface ProducerListener<K, V> {

    void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);

    void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
            Exception exception);

}

默认情况下KafkaTemplate配置了一个LoggingProducerListener会在发送失败时打印失败日志在发送成功时并不做任何事。并且为了方便起见方法的默认实现已经被提供可以只覆盖其中一个方法。

send方法默认返回的是CompletableFuture类型可以在发送完成之后为future注册一个回调

CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {
    ...
});

其中Throwable类型的ex可以被转化为KafkaProducerException该类型的failedProducerRecord属性可以获取发送失败的record。

如果想要同步调用KafkaTemplate的发送方法并且等待返回结果可以调用返回值CompletableFuture类型的get方法来同步等待。通常情况下调用CompletableFuture.get推荐使用带超时参数的方法。如果在Producer配置中指定了linger.ms那么在等待返回结果之前需要调用KafkaTemplate的flush方法。为了方便KafkaTemplate提供了带autoFlush参数的构造器版本如果设置autoFlush为truekafkaTemplate在每次发送消息时都会调用flush方法。

发送示例

如下展示了通过KafkaTemplate向broker发送消息的示例

// async
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}
// sync
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}

RoutingKafkaTemplate

从2.5版本开始额可以通过RoutingKafkaTemplate在运行时选择producer实例选择过程基于topic名称。

RoutingKafkaTemplate不支持事务也不支持execute、flush、metrics等方法因为RoutingKafkaTemplate根据topic来选择producer但是在执行这些操作时并不知道操作所属topic。

RoutingKafkaTemplate需要一个mapmap的key为java.util.regex.Pattern而value则是ProducerFactory<Object, Object>实例。该map必须是有序的例如LinkedHashMap该map需要按顺序遍历顺序在前的key-value对会优先匹配。

如下示例展示了如何通过一个RoutingKafkaTemplate向不同的topic发送消息实例中每个topic都使用不同的序列化方式。

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
            ProducerFactory<Object, Object> pf) {

        // Clone the PF with a different Serializer, register with Spring for shutdown
        Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
        context.registerBean("bytesPF", DefaultKafkaProducerFactory.class, () -> bytesPF);

        Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
        map.put(Pattern.compile("two"), bytesPF);
        map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
        return new RoutingKafkaTemplate(map);
    }

    @Bean
    public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
        return args -> {
            routingTemplate.send("one", "thing1");
            routingTemplate.send("two", "thing2".getBytes());
        };
    }

}

使用DefaultKafkaProducerFactory

ProducerFactory是用于创建生产者实例的。当没有使用事务时默认情况下DefaultKafkaFactory会创建一个单例的生产者实例所有客户端都会使用生产者实例。但是如果在template中调用了flush方法这将会对其他同样使用该生产者实例的client操作造成阻塞。从2.3版本开始DefaultKafkaFactory有了新的producerPerThread属性当该属性设置为true时factory会针对每个线程都创建并缓存一个producer实例。

producerPerThread被设置为true时若线程中的producer不再被需要那么对factory必须手动调用closeThreadBoundProducer()。这将会物理上对producer进行关闭并且从ThreadLocal中移除producer实例。单纯调用close或是destroy方法并不会清除这些producer实例。

当创建DefaultKafkaFactory时key serializer或是value serializer可以通过DefaultKafkaFactory的构造函数单独指定。在通过构造函数指定factory的key serializer/value serializer时可以选择向构造函数中传入serializer实例或是传入serializer supplier对象

  • 当传入serializer实例时通过该factory创建的所有生产者实例都共享该serializer实例
  • 当传入的是返回一个serializer的supplier时可令通过该factory创建的producer实例都拥有属于自己的serializer

如下是创建DefaultKafkaProducerFactory bean对象并且制定serializer的示例

@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}

@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
    return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}

从2.5.10版本开始可以在factory创建之后再更新factory的producer config属性。例如可以在运行时更新ssl key/trust的存储路径。该更新操作并不会影响到已经被创建的producer实例故而需要调用factory的reset方法在调用reset后所有现存producer实例都会被关闭而之后新创建的producer都会使用新的属性配置。

在运行时更新生产者属性时,无法将事务的生产者变为非事务的,也无法将非事务的生产者变为事务的。

为了更新producer属性配置factory提供了如下两个接口

void updateConfigs(Map<String, Object> updates);

void removeConfig(String configKey);