Files
rikako-note/mq/kafka/Spring for Apache Kafka.md

389 lines
16 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Spring for Apache Kafka
## Introduce
### 依赖
可以通过如下方式来添加Kafka的依赖包
```xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
```
### Consumer Demo
```java
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("topic1")
.partitions(10)
.replicas(1)
.build();
}
@KafkaListener(id = "myId", topics = "topic1")
public void listen(String in) {
System.out.println(in);
}
}
```
application.properties文件配置示例如下
```properties
spring.kafka.consumer.auto-offset-reset=earliest
```
NewTopic的bean对象会导致broker中创建该topic如果该topic已经存在则该topic不需要。
### Producer Demo
```java
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("topic1")
.partitions(10)
.replicas(1)
.build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("topic1", "test");
};
}
}
```
## Reference
### Using Spring for Apache Kafka
#### 连接到Kafka
- Kafka Admin
- ProducerFactory
- ConsumerFactory
上述所有都继承了KafkaResourceFactory类。这允许在运行时动态改变bootstrap servers同Kafka集群建立连接的ip:port通过`setBootstrapServersSupplier(() → …​)`向其传递一个`Supplier<String>`传递的Supplier将会被所有新建连接调用来获取bootstrap servers的ip和port。
消费者和生产者通常是长期活跃的,为了关闭已经存在的所有生产者,可以调用`DefaultKafkaProducerFactory``reset()`方法;如果要关闭现存的所有消费者,可以调用`KafkaListenerEndpointRegistry``stop()`方法调用stop方法之后再调用start方法或者再任何其他的listener容器bean中调用stop和start方法。
为了方便,框架提供了`ABSwitchCluster`来支持两套bootstrap servers集合再同一时间只能由一套处于活跃状态。当想要切换bootstrap servers集群时可以调用`primary()`方法或`secondary()`方法并且在producer factory上调用`reset`方法用于建立新的连接对于消费者在所有listener容器中调用`stop``start`方法。当使用`@KafkaListener`注解时,在`KafkaListenerEndpointRegistry`上调用stop和start方法。
##### Factory Listener
`DefaultKafkaProducerFactory``DefaultKafkaConsumerFactory`可以配置`Listener`来接收通知当生产者或消费者被创建或关闭时Listener接收到提醒。
```java
// Producer Factory Listener
interface Listener<K, V> {
default void producerAdded(String id, Producer<K, V> producer) {
}
default void producerRemoved(String id, Producer<K, V> producer) {
}
}
```
```java
// Consumer Factory Listener
interface Listener<K, V> {
default void consumerAdded(String id, Consumer<K, V> consumer) {
}
default void consumerRemoved(String id, Consumer<K, V> consumer) {
}
}
```
在上述场景中,`id`创建时都是将`client-id`添加到factory bean-name之后之间用`.`隔开。
#### 设置Topic
如果你定义了一个`KafkaAdmin`的bean对象那么其就能够自动添加topic到broker中。为了创建topic可以为每个topic在应用容器中创建一个`NewTopic`bean对象。可以通过`TopicBuilder`工具类来使创建topic bean对象的过程更加简单。
创建Topic的示例如下所示
```java
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("thing1")
.partitions(10)
.replicas(3)
.compact()
.build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("thing2")
.partitions(10)
.replicas(3)
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
@Bean
public NewTopic topic3() {
return TopicBuilder.name("thing3")
.assignReplicas(0, Arrays.asList(0, 1))
.assignReplicas(1, Arrays.asList(1, 2))
.assignReplicas(2, Arrays.asList(2, 0))
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
```
可以省略partitions()方法和replicas()方法在省略的情况下broker默认设置值将会被采用
```java
@Bean
public NewTopic topic4() {
return TopicBuilder.name("defaultBoth")
.build();
}
@Bean
public NewTopic topic5() {
return TopicBuilder.name("defaultPart")
.replicas(1)
.build();
}
@Bean
public NewTopic topic6() {
return TopicBuilder.name("defaultRepl")
.partitions(3)
.build();
}
```
可以定义一个NewTopics类型的bean对象NewTopics对象中包含多个NewTopic对象
```java
@Bean
public KafkaAdmin.NewTopics topics456() {
return new NewTopics(
TopicBuilder.name("defaultBoth")
.build(),
TopicBuilder.name("defaultPart")
.replicas(1)
.build(),
TopicBuilder.name("defaultRepl")
.partitions(3)
.build());
}
```
> 当使用spring boot时KafkaAdmin对象是自动注册的故而只需要注册NewTopic类型的bean对象即可。
默认情况下如果broker不可访问那么只会打出日志信息会继续加载上下文。如果想要将这种情况标记为fatal需要设置admin的`fatalIfBrokerNotAvailable`属性为true设置完属性后如果broker不可访问上下文加载失败。
KafkaAdmin提供了方法在运行时动态创建topic
- `createOrModifyTopics`
- `describeTopics`
如果想要使用更多特性可以直接使用AdminClient
```java
@Autowired
private KafkaAdmin admin;
...
AdminClient client = AdminClient.create(admin.getConfigurationProperties());
...
client.close();
```
#### Sending Messages
##### KafkaTemplate
`KafkaTemplate`类包装了生产者并且提供方法向Topic发送消息。如下显示了KafkaTemplate相关的方法
```java
CompletableFuture<SendResult<K, V>> sendDefault(V data);
CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, V data);
CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
CompletableFuture<SendResult<K, V>> send(Message<?> message);
Map<MetricName, ? extends Metric> metrics();
List<PartitionInfo> partitionsFor(String topic);
<T> T execute(ProducerCallback<K, V, T> callback);
// Flush the producer.
void flush();
interface ProducerCallback<K, V, T> {
T doInKafka(Producer<K, V> producer);
}
```
`sendDefault`方法需要一个提供给KafkaTemplate的默认Topic。
上述API会接收一个时间戳参数如果时间戳参数没有指定则自动产生一个时间戳并且会将该时间戳存储到记录中。用户指定的时间戳如何存储取决于Kafka topic中关于时间戳类型的配置。如果topic的时间戳类型被配置为`CREATE_TIME`那么用户指定的时间戳将会被记录到record中。如果时间戳类型被配置为`LOG_APPEND_TIME`那么用户指定的时间戳将会被忽略并且由broker进行添加broker会添加时间戳为local broker time
`metrics`方法和`partitionsFor`方法将会被委托给Producer中的同名方法`execute`方法则是会提供对底层Producer的直接访问。
如果要使用KafkaTemplate需要配置一个Producer并且将其传递给KafkaTemplate的构造函数
```java
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
```
可以通过如下方式覆盖factory的ProducerConfig根据相同的factory创建不同Producer config的template对象
```java
@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
return new KafkaTemplate<>(pf,
Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}
```
一个类型为`ProducerFactory<?, ?>`的bean对象可以被任何泛型类型引用。该bean对象已经被spring boot自动配置
当使用带`Message<?>`参数的方法时topic、partition、key information都会在message header中被提供
- KafkaHeaders.TOPIC
- KafkaHeaders.PARTITION
- KafkaHeaders.KEY
- KafkaHeaders.TIMESTAMP
该消息的载荷是data。
可以为KafkaTemplate配置ProducerListener来为发送的结果成功或失败提供一个异步回调而不是通过CompletableFuture。如下是`ProducerListener`接口的定义:
```java
public interface ProducerListener<K, V> {
void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);
void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
Exception exception);
}
```
默认情况下template配置了一个`LoggingProducerListener`,会打印发送失败到日志,但是发送成功时不会做任何事。
为了方便默认的方法实现已经在接口中提供了提供了如果只需要覆盖一个方法只需要对onSuccess或onError进行Override即可。
发送消息会返回一个`CompletableFuture<SendResult>`对象。可以为异步操作注册一个listener
```java
CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {
...
});
```
SendResult拥有两个属性ProducerRecord和RecordMetaData。
Throwable可以被转化为KafkaProducerException该异常的failedProducerRecord可以包含失败的record。
当为producer config设置了`linger.ms`时,如果待发送的消息没有达到`batch.size`会延迟发送消息等待更多的消息出现并被批量发送。默认情况下linger.ms为0不会有延迟但是如果linger.ms有值那么在发送消息之后如果希望消息立马发送需要手动调用flush方法。
如下示例展示了如何通过kafkaTemplate向broker发送消息
```java
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
CompletableFuture<SendResult<Integer, String>> future = template.send(record);
future.whenComplete((result, ex) -> {
if (ex == null) {
handleSuccess(data);
}
else {
handleFailure(data, record, ex);
}
});
}
// Blocking (Sync)
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}
```
上述示例中ex为KafkaProducerException类型并且有failedProducerRecord属性。
##### RoutingKafkaTemplate
可以通过RoutingKafkaTemplate在运行时根据目标topic的名称选定producer。
> routing template不支持事务、execute、flush、metrics等操作因为这些操作不知道topic名称
routing template需要一个key和value分别为`java.util.regex.Pattern``ProducerFactory<Object, Object>`的map实例。该map应该是有序的因为map会按顺序遍历例如LinkedHashMap集合。并且应该在map最开始的时候指定更加具体的pattern。
如下示例会展示如何通过一个template向不同的topic发送消息并且值的序列化会采用不同的序列化器
```java
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
ProducerFactory<Object, Object> pf) {
// Clone the PF with a different Serializer, register with Spring for shutdown
Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF);
Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
map.put(Pattern.compile("two"), bytesPF);
map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
return new RoutingKafkaTemplate(map);
}
@Bean
public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
return args -> {
routingTemplate.send("one", "thing1");
routingTemplate.send("two", "thing2".getBytes());
};
}
}
```
##### DefaultKafkaProducerFactory
在上述template的使用示例中创建一个KafkaTemplate需要使用ProducerFactory。
当不使用transaction时默认情况下DefaultKafkaProducerFactory创建一个单例producer单例producer由所有客户端使用。但是如果在template上调用了flush方法可能会造成其使用同一producer的其他线程的延迟。
> DefaultKafkaProducerFactory有一个属性producerPerThread当该属性被设置为true时Kafka会对每一个线程都创建一个producer以此来避免这个问题。
> 当producerPerThread被设置为true时若producer不再需要用户代码必须在factory上调用closeThreadBoundProducer方法这回物理的关闭producer并且将其中ThreadLocal中移除。调用reset或destroy并不会清除这些producer
可以在factory创建之后对producer properties进行更新。