Files
rikako-note/mq/kafka/Spring for Apache Kafka.md

13 KiB
Raw Blame History

Spring for Apache Kafka

Introduce

依赖

可以通过如下方式来添加Kafka的依赖包

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

Consumer Demo

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("topic1")
                .partitions(10)
                .replicas(1)
                .build();
    }

    @KafkaListener(id = "myId", topics = "topic1")
    public void listen(String in) {
        System.out.println(in);
    }

}

application.properties文件配置示例如下

spring.kafka.consumer.auto-offset-reset=earliest

NewTopic的bean对象会导致broker中创建该topic如果该topic已经存在则该topic不需要。

Producer Demo

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("topic1")
                .partitions(10)
                .replicas(1)
                .build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("topic1", "test");
        };
    }

}

Reference

Using Spring for Apache Kafka

连接到Kafka

  • Kafka Admin
  • ProducerFactory
  • ConsumerFactory

上述所有都继承了KafkaResourceFactory类。这允许在运行时动态改变bootstrap servers同Kafka集群建立连接的ip:port通过setBootstrapServersSupplier(() → …​)向其传递一个Supplier<String>传递的Supplier将会被所有新建连接调用来获取bootstrap servers的ip和port。
消费者和生产者通常是长期活跃的,为了关闭已经存在的所有生产者,可以调用DefaultKafkaProducerFactoryreset()方法;如果要关闭现存的所有消费者,可以调用KafkaListenerEndpointRegistrystop()方法调用stop方法之后再调用start方法或者再任何其他的listener容器bean中调用stop和start方法。
为了方便,框架提供了ABSwitchCluster来支持两套bootstrap servers集合再同一时间只能由一套处于活跃状态。当想要切换bootstrap servers集群时可以调用primary()方法或secondary()方法并且在producer factory上调用reset方法用于建立新的连接对于消费者在所有listener容器中调用stopstart方法。当使用@KafkaListener注解时,在KafkaListenerEndpointRegistry上调用stop和start方法。

Factory Listener

DefaultKafkaProducerFactoryDefaultKafkaConsumerFactory可以配置Listener来接收通知当生产者或消费者被创建或关闭时Listener接收到提醒。

// Producer Factory Listener
interface Listener<K, V> {

    default void producerAdded(String id, Producer<K, V> producer) {
    }

    default void producerRemoved(String id, Producer<K, V> producer) {
    }

}
// Consumer Factory Listener
interface Listener<K, V> {

    default void consumerAdded(String id, Consumer<K, V> consumer) {
    }

    default void consumerRemoved(String id, Consumer<K, V> consumer) {
    }

}

在上述场景中,id创建时都是将client-id添加到factory bean-name之后之间用.隔开。

设置Topic

如果你定义了一个KafkaAdmin的bean对象那么其就能够自动添加topic到broker中。为了创建topic可以为每个topic在应用容器中创建一个NewTopicbean对象。可以通过TopicBuilder工具类来使创建topic bean对象的过程更加简单。
创建Topic的示例如下所示

@Bean
public KafkaAdmin admin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    return new KafkaAdmin(configs);
}

@Bean
public NewTopic topic1() {
    return TopicBuilder.name("thing1")
            .partitions(10)
            .replicas(3)
            .compact()
            .build();
}

@Bean
public NewTopic topic2() {
    return TopicBuilder.name("thing2")
            .partitions(10)
            .replicas(3)
            .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
            .build();
}

@Bean
public NewTopic topic3() {
    return TopicBuilder.name("thing3")
            .assignReplicas(0, Arrays.asList(0, 1))
            .assignReplicas(1, Arrays.asList(1, 2))
            .assignReplicas(2, Arrays.asList(2, 0))
            .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
            .build();
}

可以省略partitions()方法和replicas()方法在省略的情况下broker默认设置值将会被采用

@Bean
public NewTopic topic4() {
    return TopicBuilder.name("defaultBoth")
            .build();
}

@Bean
public NewTopic topic5() {
    return TopicBuilder.name("defaultPart")
            .replicas(1)
            .build();
}

@Bean
public NewTopic topic6() {
    return TopicBuilder.name("defaultRepl")
            .partitions(3)
            .build();
}

可以定义一个NewTopics类型的bean对象NewTopics对象中包含多个NewTopic对象

@Bean
public KafkaAdmin.NewTopics topics456() {
    return new NewTopics(
            TopicBuilder.name("defaultBoth")
                .build(),
            TopicBuilder.name("defaultPart")
                .replicas(1)
                .build(),
            TopicBuilder.name("defaultRepl")
                .partitions(3)
                .build());
}

当使用spring boot时KafkaAdmin对象是自动注册的故而只需要注册NewTopic类型的bean对象即可。

默认情况下如果broker不可访问那么只会打出日志信息会继续加载上下文。如果想要将这种情况标记为fatal需要设置admin的fatalIfBrokerNotAvailable属性为true设置完属性后如果broker不可访问上下文加载失败。
KafkaAdmin提供了方法在运行时动态创建topic

  • createOrModifyTopics
  • describeTopics

如果想要使用更多特性可以直接使用AdminClient

@Autowired
private KafkaAdmin admin;

...

    AdminClient client = AdminClient.create(admin.getConfigurationProperties());
    ...
    client.close();

Sending Messages

KafkaTemplate

KafkaTemplate类包装了生产者并且提供方法向Topic发送消息。如下显示了KafkaTemplate相关的方法

CompletableFuture<SendResult<K, V>> sendDefault(V data);

CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, V data);

CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

CompletableFuture<SendResult<K, V>> send(Message<?> message);

Map<MetricName, ? extends Metric> metrics();

List<PartitionInfo> partitionsFor(String topic);

<T> T execute(ProducerCallback<K, V, T> callback);

// Flush the producer.

void flush();

interface ProducerCallback<K, V, T> {

    T doInKafka(Producer<K, V> producer);

}

sendDefault方法需要一个提供给KafkaTemplate的默认Topic。
上述API会接收一个时间戳参数如果时间戳参数没有指定则自动产生一个时间戳并且会将该时间戳存储到记录中。用户指定的时间戳如何存储取决于Kafka topic中关于时间戳类型的配置。如果topic的时间戳类型被配置为CREATE_TIME那么用户指定的时间戳将会被记录到record中。如果时间戳类型被配置为LOG_APPEND_TIME那么用户指定的时间戳将会被忽略并且由broker进行添加broker会添加时间戳为local broker timemetrics方法和partitionsFor方法将会被委托给Producer中的同名方法execute方法则是会提供对底层Producer的直接访问。
如果要使用KafkaTemplate需要配置一个Producer并且将其传递给KafkaTemplate的构造函数

@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // See https://kafka.apache.org/documentation/#producerconfigs for more properties
    return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
    return new KafkaTemplate<Integer, String>(producerFactory());
}

可以通过如下方式覆盖factory的ProducerConfig根据相同的factory创建不同Producer config的template对象

@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
    return new KafkaTemplate<>(pf);
}

@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
    return new KafkaTemplate<>(pf,
            Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}

一个类型为ProducerFactory<?, ?>的bean对象可以被任何泛型类型引用。该bean对象已经被spring boot自动配置
当使用带Message<?>参数的方法时topic、partition、key information都会在message header中被提供

  • KafkaHeaders.TOPIC
  • KafkaHeaders.PARTITION
  • KafkaHeaders.KEY
  • KafkaHeaders.TIMESTAMP

该消息的载荷是data。
可以为KafkaTemplate配置ProducerListener来为发送的结果成功或失败提供一个异步回调而不是通过CompletableFuture。如下是ProducerListener接口的定义:

public interface ProducerListener<K, V> {

    void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);

    void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
            Exception exception);

}

默认情况下template配置了一个LoggingProducerListener,会打印发送失败到日志,但是发送成功时不会做任何事。
为了方便默认的方法实现已经在接口中提供了提供了如果只需要覆盖一个方法只需要对onSuccess或onError进行Override即可。
发送消息会返回一个CompletableFuture<SendResult>对象。可以为异步操作注册一个listener

CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {
    ...
});

SendResult拥有两个属性ProducerRecord和RecordMetaData。
Throwable可以被转化为KafkaProducerException该异常的failedProducerRecord可以包含失败的record。
当为producer config设置了linger.ms时,如果待发送的消息没有达到batch.size会延迟发送消息等待更多的消息出现并被批量发送。默认情况下linger.ms为0不会有延迟但是如果linger.ms有值那么在发送消息之后如果希望消息立马发送需要手动调用flush方法。
如下示例展示了如何通过kafkaTemplate向broker发送消息

public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    CompletableFuture<SendResult<Integer, String>> future = template.send(record);
    future.whenComplete((result, ex) -> {
        if (ex == null) {
            handleSuccess(data);
        }
        else {
            handleFailure(data, record, ex);
        }
    });
}

// Blocking (Sync)
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}

上述示例中ex为KafkaProducerException类型并且有failedProducerRecord属性。

RoutingKafkaTemplate

可以通过RoutingKafkaTemplate在运行时根据目标topic的名称选定producer。

routing template不支持事务、execute、flush、metrics等操作因为这些操作不知道topic名称

routing template需要一个key和value分别为java.util.regex.PatternProducerFactory<Object, Object>的map实例。