From 8254dd60a5b6e0e4e40d748da4803ed116b2ba48 Mon Sep 17 00:00:00 2001 From: Rikako Wu <496063163@qq.com> Date: Tue, 21 Feb 2023 19:03:30 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BB=A7=E7=BB=AD=E5=85=B3=E4=BA=8EKafka?= =?UTF-8?q?=E6=96=87=E6=A1=A3=E7=9A=84=E9=98=85=E8=AF=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mq/kafka/Spring for Apache Kafka.md | 345 ++++++++++++++++++++++++++++ 1 file changed, 345 insertions(+) create mode 100644 mq/kafka/Spring for Apache Kafka.md diff --git a/mq/kafka/Spring for Apache Kafka.md b/mq/kafka/Spring for Apache Kafka.md new file mode 100644 index 0000000..a8dccfe --- /dev/null +++ b/mq/kafka/Spring for Apache Kafka.md @@ -0,0 +1,345 @@ +# Spring for Apache Kafka +## Introduce +### 依赖 +可以通过如下方式来添加Kafka的依赖包: +```xml + + org.springframework.kafka + spring-kafka + +``` +### Consumer Demo +```java +@SpringBootApplication +public class Application { + + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } + + @Bean + public NewTopic topic() { + return TopicBuilder.name("topic1") + .partitions(10) + .replicas(1) + .build(); + } + + @KafkaListener(id = "myId", topics = "topic1") + public void listen(String in) { + System.out.println(in); + } + +} +``` +application.properties文件配置示例如下: +```properties +spring.kafka.consumer.auto-offset-reset=earliest +``` +NewTopic的bean对象会导致broker中创建该topic,如果该topic已经存在则该topic不需要。 +### Producer Demo +```java +@SpringBootApplication +public class Application { + + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } + + @Bean + public NewTopic topic() { + return TopicBuilder.name("topic1") + .partitions(10) + .replicas(1) + .build(); + } + + @Bean + public ApplicationRunner runner(KafkaTemplate template) { + return args -> { + template.send("topic1", "test"); + }; + } + +} +``` +## Reference +### Using Spring for Apache Kafka +#### 连接到Kafka +- Kafka Admin +- ProducerFactory +- ConsumerFactory + +上述所有都继承了KafkaResourceFactory类。这允许在运行时动态改变bootstrap servers(同Kafka集群建立连接的ip:port),通过`setBootstrapServersSupplier(() → …​)`向其传递一个`Supplier`,传递的Supplier将会被所有新建连接调用来获取bootstrap servers的ip和port。 +消费者和生产者通常是长期活跃的,为了关闭已经存在的所有生产者,可以调用`DefaultKafkaProducerFactory`的`reset()`方法;如果要关闭现存的所有消费者,可以调用`KafkaListenerEndpointRegistry`的`stop()`方法(调用stop方法之后再调用start方法),或者再任何其他的listener容器bean中调用stop和start方法。 +为了方便,框架提供了`ABSwitchCluster`来支持两套bootstrap servers集合,再同一时间只能由一套处于活跃状态。当想要切换bootstrap servers集群时,可以调用`primary()`方法或`secondary()`方法,并且在producer factory上调用`reset`方法用于建立新的连接;对于消费者,在所有listener容器中调用`stop`和`start`方法。当使用`@KafkaListener`注解时,在`KafkaListenerEndpointRegistry`上调用stop和start方法。 +##### Factory Listener +`DefaultKafkaProducerFactory`和`DefaultKafkaConsumerFactory`可以配置`Listener`来接收通知,当生产者或消费者被创建或关闭时,Listener接收到提醒。 +```java +// Producer Factory Listener +interface Listener { + + default void producerAdded(String id, Producer producer) { + } + + default void producerRemoved(String id, Producer producer) { + } + +} +``` +```java +// Consumer Factory Listener +interface Listener { + + default void consumerAdded(String id, Consumer consumer) { + } + + default void consumerRemoved(String id, Consumer consumer) { + } + +} +``` +在上述场景中,`id`创建时都是将`client-id`添加到factory bean-name之后,之间用`.`隔开。 +#### 设置Topic +如果你定义了一个`KafkaAdmin`的bean对象,那么其就能够自动添加topic到broker中。为了创建topic,可以为每个topic在应用容器中创建一个`NewTopic`bean对象。可以通过`TopicBuilder`工具类来使创建topic bean对象的过程更加简单。 +创建Topic的示例如下所示: +```java +@Bean +public KafkaAdmin admin() { + Map configs = new HashMap<>(); + configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + return new KafkaAdmin(configs); +} + +@Bean +public NewTopic topic1() { + return TopicBuilder.name("thing1") + .partitions(10) + .replicas(3) + .compact() + .build(); +} + +@Bean +public NewTopic topic2() { + return TopicBuilder.name("thing2") + .partitions(10) + .replicas(3) + .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd") + .build(); +} + +@Bean +public NewTopic topic3() { + return TopicBuilder.name("thing3") + .assignReplicas(0, Arrays.asList(0, 1)) + .assignReplicas(1, Arrays.asList(1, 2)) + .assignReplicas(2, Arrays.asList(2, 0)) + .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd") + .build(); +} +``` +可以省略partitions()方法和replicas()方法,在省略的情况下broker默认设置值将会被采用: +```java +@Bean +public NewTopic topic4() { + return TopicBuilder.name("defaultBoth") + .build(); +} + +@Bean +public NewTopic topic5() { + return TopicBuilder.name("defaultPart") + .replicas(1) + .build(); +} + +@Bean +public NewTopic topic6() { + return TopicBuilder.name("defaultRepl") + .partitions(3) + .build(); +} +``` +可以定义一个NewTopics类型的bean对象,NewTopics对象中包含多个NewTopic对象: +```java +@Bean +public KafkaAdmin.NewTopics topics456() { + return new NewTopics( + TopicBuilder.name("defaultBoth") + .build(), + TopicBuilder.name("defaultPart") + .replicas(1) + .build(), + TopicBuilder.name("defaultRepl") + .partitions(3) + .build()); +} +``` +> 当使用spring boot时,KafkaAdmin对象是自动注册的,故而只需要注册NewTopic类型的bean对象即可。 + +默认情况下,如果broker不可访问,那么只会打出日志信息,会继续加载上下文。如果想要将这种情况标记为fatal,需要设置admin的`fatalIfBrokerNotAvailable`属性为true,设置完属性后如果broker不可访问,上下文加载失败。 +KafkaAdmin提供了方法在运行时动态创建topic, +- `createOrModifyTopics` +- `describeTopics` + +如果想要使用更多特性,可以直接使用AdminClient: +```java +@Autowired +private KafkaAdmin admin; + +... + + AdminClient client = AdminClient.create(admin.getConfigurationProperties()); + ... + client.close(); +``` +#### Sending Messages +##### KafkaTemplate +`KafkaTemplate`类包装了生产者,并且提供方法向Topic发送消息。如下显示了KafkaTemplate相关的方法: +```java +CompletableFuture> sendDefault(V data); + +CompletableFuture> sendDefault(K key, V data); + +CompletableFuture> sendDefault(Integer partition, K key, V data); + +CompletableFuture> sendDefault(Integer partition, Long timestamp, K key, V data); + +CompletableFuture> send(String topic, V data); + +CompletableFuture> send(String topic, K key, V data); + +CompletableFuture> send(String topic, Integer partition, K key, V data); + +CompletableFuture> send(String topic, Integer partition, Long timestamp, K key, V data); + +CompletableFuture> send(ProducerRecord record); + +CompletableFuture> send(Message message); + +Map metrics(); + +List partitionsFor(String topic); + + T execute(ProducerCallback callback); + +// Flush the producer. + +void flush(); + +interface ProducerCallback { + + T doInKafka(Producer producer); + +} +``` +`sendDefault`方法需要一个提供给KafkaTemplate的默认Topic。 +上述API会接收一个时间戳参数(如果时间戳参数没有指定,则自动产生一个时间戳),并且会将该时间戳存储到记录中。用户指定的时间戳如何存储取决于Kafka topic中关于时间戳类型的配置。如果topic的时间戳类型被配置为`CREATE_TIME`,那么用户指定的时间戳将会被记录到record中。如果时间戳类型被配置为`LOG_APPEND_TIME`,那么用户指定的时间戳将会被忽略,并且由broker进行添加(broker会添加时间戳为local broker time)。 +`metrics`方法和`partitionsFor`方法将会被委托给Producer中的同名方法`execute`方法则是会提供对底层Producer的直接访问。 +如果要使用KafkaTemplate,需要配置一个Producer并且将其传递给KafkaTemplate的构造函数: +```java +@Bean +public ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory<>(producerConfigs()); +} + +@Bean +public Map producerConfigs() { + Map props = new HashMap<>(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + // See https://kafka.apache.org/documentation/#producerconfigs for more properties + return props; +} + +@Bean +public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate(producerFactory()); +} +``` +可以通过如下方式覆盖factory的ProducerConfig,根据相同的factory创建不同Producer config的template对象: +```java +@Bean +public KafkaTemplate stringTemplate(ProducerFactory pf) { + return new KafkaTemplate<>(pf); +} + +@Bean +public KafkaTemplate bytesTemplate(ProducerFactory pf) { + return new KafkaTemplate<>(pf, + Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class)); +} +``` +一个类型为`ProducerFactory`的bean对象可以被任何泛型类型引用。(该bean对象已经被spring boot自动配置) +当使用带`Message`参数的方法时,topic、partition、key information都会在message header中被提供: +- KafkaHeaders.TOPIC +- KafkaHeaders.PARTITION +- KafkaHeaders.KEY +- KafkaHeaders.TIMESTAMP + +该消息的载荷是data。 +可以为KafkaTemplate配置ProducerListener来为发送的结果(成功或失败)提供一个异步回调,而不是通过CompletableFuture。如下是`ProducerListener`接口的定义: +```java +public interface ProducerListener { + + void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata); + + void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, + Exception exception); + +} +``` +默认情况下,template配置了一个`LoggingProducerListener`,会打印发送失败到日志,但是发送成功时不会做任何事。 +为了方便,默认的方法实现已经在接口中提供了提供了,如果只需要覆盖一个方法,只需要对onSuccess或onError进行Override即可。 +发送消息会返回一个`CompletableFuture`对象。可以为异步操作注册一个listener: +```java +CompletableFuture> future = template.send("myTopic", "something"); +future.whenComplete((result, ex) -> { + ... +}); +``` +SendResult拥有两个属性,ProducerRecord和RecordMetaData。 +Throwable可以被转化为KafkaProducerException,该异常的failedProducerRecord可以包含失败的record。 +当为producer config设置了`linger.ms`时,如果待发送的消息没有达到`batch.size`,会延迟发送消息等待更多的消息出现并被批量发送。默认情况下,linger.ms为0,不会有延迟,但是如果linger.ms有值,那么在发送消息之后如果希望消息立马发送,需要手动调用flush方法。 +如下示例展示了如何通过kafkaTemplate向broker发送消息: +```java +public void sendToKafka(final MyOutputData data) { + final ProducerRecord record = createRecord(data); + + CompletableFuture> future = template.send(record); + future.whenComplete((result, ex) -> { + if (ex == null) { + handleSuccess(data); + } + else { + handleFailure(data, record, ex); + } + }); +} + +// Blocking (Sync) +public void sendToKafka(final MyOutputData data) { + final ProducerRecord record = createRecord(data); + + try { + template.send(record).get(10, TimeUnit.SECONDS); + handleSuccess(data); + } + catch (ExecutionException e) { + handleFailure(data, record, e.getCause()); + } + catch (TimeoutException | InterruptedException e) { + handleFailure(data, record, e); + } +} +``` +上述示例中,ex为KafkaProducerException类型,并且有failedProducerRecord属性。 +##### RoutingKafkaTemplate +可以通过RoutingKafkaTemplate在运行时根据目标topic的名称选定producer。 +> routing template不支持事务、execute、flush、metrics等操作,因为这些操作不知道topic名称 + +routing template需要一个key和value分别为`java.util.regex.Pattern`和`ProducerFactory`的map实例。 + +