diff --git a/mq/kafka/Spring for Apache Kafka.md b/mq/kafka/Spring for Apache Kafka.md
deleted file mode 100644
index 2c994a1..0000000
--- a/mq/kafka/Spring for Apache Kafka.md
+++ /dev/null
@@ -1,701 +0,0 @@
-# Spring for Apache Kafka
-## Introduce
-### 依赖
-可以通过如下方式来添加Kafka的依赖包:
-```xml
-
- org.springframework.kafka
- spring-kafka
-
-```
-### Consumer Demo
-```java
-@SpringBootApplication
-public class Application {
-
- public static void main(String[] args) {
- SpringApplication.run(Application.class, args);
- }
-
- @Bean
- public NewTopic topic() {
- return TopicBuilder.name("topic1")
- .partitions(10)
- .replicas(1)
- .build();
- }
-
- @KafkaListener(id = "myId", topics = "topic1")
- public void listen(String in) {
- System.out.println(in);
- }
-
-}
-```
-application.properties文件配置示例如下:
-```properties
-spring.kafka.consumer.auto-offset-reset=earliest
-```
-NewTopic的bean对象会导致broker中创建该topic,如果该topic已经存在则该topic不需要。
-### Producer Demo
-```java
-@SpringBootApplication
-public class Application {
-
- public static void main(String[] args) {
- SpringApplication.run(Application.class, args);
- }
-
- @Bean
- public NewTopic topic() {
- return TopicBuilder.name("topic1")
- .partitions(10)
- .replicas(1)
- .build();
- }
-
- @Bean
- public ApplicationRunner runner(KafkaTemplate template) {
- return args -> {
- template.send("topic1", "test");
- };
- }
-
-}
-```
-## Reference
-### Using Spring for Apache Kafka
-#### 连接到Kafka
-- Kafka Admin
-- ProducerFactory
-- ConsumerFactory
-
-上述所有都继承了KafkaResourceFactory类。这允许在运行时动态改变bootstrap servers(同Kafka集群建立连接的ip:port),通过`setBootstrapServersSupplier(() → …)`向其传递一个`Supplier`,传递的Supplier将会被所有新建连接调用来获取bootstrap servers的ip和port。
-消费者和生产者通常是长期活跃的,为了关闭已经存在的所有生产者,可以调用`DefaultKafkaProducerFactory`的`reset()`方法;如果要关闭现存的所有消费者,可以调用`KafkaListenerEndpointRegistry`的`stop()`方法(调用stop方法之后再调用start方法),或者再任何其他的listener容器bean中调用stop和start方法。
-为了方便,框架提供了`ABSwitchCluster`来支持两套bootstrap servers集合,再同一时间只能由一套处于活跃状态。当想要切换bootstrap servers集群时,可以调用`primary()`方法或`secondary()`方法,并且在producer factory上调用`reset`方法用于建立新的连接;对于消费者,在所有listener容器中调用`stop`和`start`方法。当使用`@KafkaListener`注解时,在`KafkaListenerEndpointRegistry`上调用stop和start方法。
-##### Factory Listener
-`DefaultKafkaProducerFactory`和`DefaultKafkaConsumerFactory`可以配置`Listener`来接收通知,当生产者或消费者被创建或关闭时,Listener接收到提醒。
-```java
-// Producer Factory Listener
-interface Listener {
-
- default void producerAdded(String id, Producer producer) {
- }
-
- default void producerRemoved(String id, Producer producer) {
- }
-
-}
-```
-```java
-// Consumer Factory Listener
-interface Listener {
-
- default void consumerAdded(String id, Consumer consumer) {
- }
-
- default void consumerRemoved(String id, Consumer consumer) {
- }
-
-}
-```
-在上述场景中,`id`创建时都是将`client-id`添加到factory bean-name之后,之间用`.`隔开。
-#### 设置Topic
-如果你定义了一个`KafkaAdmin`的bean对象,那么其就能够自动添加topic到broker中。为了创建topic,可以为每个topic在应用容器中创建一个`NewTopic`bean对象。可以通过`TopicBuilder`工具类来使创建topic bean对象的过程更加简单。
-创建Topic的示例如下所示:
-```java
-@Bean
-public KafkaAdmin admin() {
- Map configs = new HashMap<>();
- configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- return new KafkaAdmin(configs);
-}
-
-@Bean
-public NewTopic topic1() {
- return TopicBuilder.name("thing1")
- .partitions(10)
- .replicas(3)
- .compact()
- .build();
-}
-
-@Bean
-public NewTopic topic2() {
- return TopicBuilder.name("thing2")
- .partitions(10)
- .replicas(3)
- .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
- .build();
-}
-
-@Bean
-public NewTopic topic3() {
- return TopicBuilder.name("thing3")
- .assignReplicas(0, Arrays.asList(0, 1))
- .assignReplicas(1, Arrays.asList(1, 2))
- .assignReplicas(2, Arrays.asList(2, 0))
- .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
- .build();
-}
-```
-可以省略partitions()方法和replicas()方法,在省略的情况下broker默认设置值将会被采用:
-```java
-@Bean
-public NewTopic topic4() {
- return TopicBuilder.name("defaultBoth")
- .build();
-}
-
-@Bean
-public NewTopic topic5() {
- return TopicBuilder.name("defaultPart")
- .replicas(1)
- .build();
-}
-
-@Bean
-public NewTopic topic6() {
- return TopicBuilder.name("defaultRepl")
- .partitions(3)
- .build();
-}
-```
-可以定义一个NewTopics类型的bean对象,NewTopics对象中包含多个NewTopic对象:
-```java
-@Bean
-public KafkaAdmin.NewTopics topics456() {
- return new NewTopics(
- TopicBuilder.name("defaultBoth")
- .build(),
- TopicBuilder.name("defaultPart")
- .replicas(1)
- .build(),
- TopicBuilder.name("defaultRepl")
- .partitions(3)
- .build());
-}
-```
-> 当使用spring boot时,KafkaAdmin对象是自动注册的,故而只需要注册NewTopic类型的bean对象即可。
-
-默认情况下,如果broker不可访问,那么只会打出日志信息,会继续加载上下文。如果想要将这种情况标记为fatal,需要设置admin的`fatalIfBrokerNotAvailable`属性为true,设置完属性后如果broker不可访问,上下文加载失败。
-KafkaAdmin提供了方法在运行时动态创建topic,
-- `createOrModifyTopics`
-- `describeTopics`
-
-如果想要使用更多特性,可以直接使用AdminClient:
-```java
-@Autowired
-private KafkaAdmin admin;
-
-...
-
- AdminClient client = AdminClient.create(admin.getConfigurationProperties());
- ...
- client.close();
-```
-#### Sending Messages
-##### KafkaTemplate
-`KafkaTemplate`类包装了生产者,并且提供方法向Topic发送消息。如下显示了KafkaTemplate相关的方法:
-```java
-CompletableFuture> sendDefault(V data);
-
-CompletableFuture> sendDefault(K key, V data);
-
-CompletableFuture> sendDefault(Integer partition, K key, V data);
-
-CompletableFuture> sendDefault(Integer partition, Long timestamp, K key, V data);
-
-CompletableFuture> send(String topic, V data);
-
-CompletableFuture> send(String topic, K key, V data);
-
-CompletableFuture> send(String topic, Integer partition, K key, V data);
-
-CompletableFuture> send(String topic, Integer partition, Long timestamp, K key, V data);
-
-CompletableFuture> send(ProducerRecord record);
-
-CompletableFuture> send(Message> message);
-
-Map metrics();
-
-List partitionsFor(String topic);
-
- T execute(ProducerCallback callback);
-
-// Flush the producer.
-
-void flush();
-
-interface ProducerCallback {
-
- T doInKafka(Producer producer);
-
-}
-```
-`sendDefault`方法需要一个提供给KafkaTemplate的默认Topic。
-上述API会接收一个时间戳参数(如果时间戳参数没有指定,则自动产生一个时间戳),并且会将该时间戳存储到记录中。用户指定的时间戳如何存储取决于Kafka topic中关于时间戳类型的配置。如果topic的时间戳类型被配置为`CREATE_TIME`,那么用户指定的时间戳将会被记录到record中。如果时间戳类型被配置为`LOG_APPEND_TIME`,那么用户指定的时间戳将会被忽略,并且由broker进行添加(broker会添加时间戳为local broker time)。
-`metrics`方法和`partitionsFor`方法将会被委托给Producer中的同名方法`execute`方法则是会提供对底层Producer的直接访问。
-如果要使用KafkaTemplate,需要配置一个Producer并且将其传递给KafkaTemplate的构造函数:
-```java
-@Bean
-public ProducerFactory producerFactory() {
- return new DefaultKafkaProducerFactory<>(producerConfigs());
-}
-
-@Bean
-public Map producerConfigs() {
- Map props = new HashMap<>();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- // See https://kafka.apache.org/documentation/#producerconfigs for more properties
- return props;
-}
-
-@Bean
-public KafkaTemplate kafkaTemplate() {
- return new KafkaTemplate(producerFactory());
-}
-```
-可以通过如下方式覆盖factory的ProducerConfig,根据相同的factory创建不同Producer config的template对象:
-```java
-@Bean
-public KafkaTemplate stringTemplate(ProducerFactory pf) {
- return new KafkaTemplate<>(pf);
-}
-
-@Bean
-public KafkaTemplate bytesTemplate(ProducerFactory pf) {
- return new KafkaTemplate<>(pf,
- Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
-}
-```
-一个类型为`ProducerFactory, ?>`的bean对象可以被任何泛型类型引用。(该bean对象已经被spring boot自动配置)
-当使用带`Message>`参数的方法时,topic、partition、key information都会在message header中被提供:
-- KafkaHeaders.TOPIC
-- KafkaHeaders.PARTITION
-- KafkaHeaders.KEY
-- KafkaHeaders.TIMESTAMP
-
-该消息的载荷是data。
-可以为KafkaTemplate配置ProducerListener来为发送的结果(成功或失败)提供一个异步回调,而不是通过CompletableFuture。如下是`ProducerListener`接口的定义:
-```java
-public interface ProducerListener {
-
- void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata);
-
- void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata,
- Exception exception);
-
-}
-```
-默认情况下,template配置了一个`LoggingProducerListener`,会打印发送失败到日志,但是发送成功时不会做任何事。
-为了方便,默认的方法实现已经在接口中提供了提供了,如果只需要覆盖一个方法,只需要对onSuccess或onError进行Override即可。
-发送消息会返回一个`CompletableFuture`对象。可以为异步操作注册一个listener:
-```java
-CompletableFuture> future = template.send("myTopic", "something");
-future.whenComplete((result, ex) -> {
- ...
-});
-```
-SendResult拥有两个属性,ProducerRecord和RecordMetaData。
-Throwable可以被转化为KafkaProducerException,该异常的failedProducerRecord可以包含失败的record。
-当为producer config设置了`linger.ms`时,如果待发送的消息没有达到`batch.size`,会延迟发送消息等待更多的消息出现并被批量发送。默认情况下,linger.ms为0,不会有延迟,但是如果linger.ms有值,那么在发送消息之后如果希望消息立马发送,需要手动调用flush方法。
-如下示例展示了如何通过kafkaTemplate向broker发送消息:
-```java
-public void sendToKafka(final MyOutputData data) {
- final ProducerRecord record = createRecord(data);
-
- CompletableFuture> future = template.send(record);
- future.whenComplete((result, ex) -> {
- if (ex == null) {
- handleSuccess(data);
- }
- else {
- handleFailure(data, record, ex);
- }
- });
-}
-
-// Blocking (Sync)
-public void sendToKafka(final MyOutputData data) {
- final ProducerRecord record = createRecord(data);
-
- try {
- template.send(record).get(10, TimeUnit.SECONDS);
- handleSuccess(data);
- }
- catch (ExecutionException e) {
- handleFailure(data, record, e.getCause());
- }
- catch (TimeoutException | InterruptedException e) {
- handleFailure(data, record, e);
- }
-}
-```
-上述示例中,ex为KafkaProducerException类型,并且有failedProducerRecord属性。
-##### RoutingKafkaTemplate
-可以通过RoutingKafkaTemplate在运行时根据目标topic的名称选定producer。
-> routing template不支持事务、execute、flush、metrics等操作,因为这些操作不知道topic名称
-
-routing template需要一个key和value分别为`java.util.regex.Pattern`和`ProducerFactory