系列文章目录
SpringBoot3-第一篇(快速入门)
SpringBoot3-第二篇(Web开发)
SpringBoot3-第三篇(数据访问)
SpringBoot3-第四篇(基础特性)
SpringBoot3-第五篇(核心原理)
SpringBoot3-第六篇(整合NoSQL)
SpringBoot3-第七篇(整合接口文档)
SpringBoot3-第八篇(整合远程调用)
SpringBoot3-第九篇(整合消息服务)
文章目录
https://kafka.apache.org/documentation/
1. 消息队列-场景
1.1 异步
1.2 解耦
1.3 削峰
1.4 缓冲
2. 消息队列-Kafka
2.1 消息模式
2.2 Kafka工作原理
2.3 SpringBoot整合
参照:https://docs.spring.io/spring-kafka/reference/
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
配置
spring.kafka.bootstrap-servers=172.20.128.1:9092
修改C:\Windows\System32\drivers\etc\hosts
文件,配置8.130.32.70 kafka
2.4 消息发送
@SpringBootTest
class Boot07KafkaApplicationTests {
@Autowired
KafkaTemplate kafkaTemplate;
@Test
void contextLoads() throws ExecutionException, InterruptedException {
StopWatch watch = new StopWatch();
watch.start();
CompletableFuture[] futures = new CompletableFuture[10000];
for (int i = 0; i < 10000; i++) {
CompletableFuture send = kafkaTemplate.send("order", "order.create."+i, "订单创建了:"+i);
futures[i]=send;
}
CompletableFuture.allOf(futures).join();
watch.stop();
System.out.println("总耗时:"+watch.getTotalTimeMillis());
}
}
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final KafkaTemplate<String, String> kafkaTemplate;
public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void someMethod() {
this.kafkaTemplate.send("someTopic", "Hello");
}
}
2.5 消息监听
@Component
public class OrderMsgListener {
@KafkaListener(topics = "order",groupId = "order-service")
public void listen(ConsumerRecord record){
System.out.println("收到消息:"+record); //可以监听到发给kafka的新消息,以前的拿不到
}
@KafkaListener(groupId = "order-service-2",topicPartitions = {
@TopicPartition(topic = "order",partitionOffsets = {
@PartitionOffset(partition = "0",initialOffset = "0")
})
})
public void listenAll(ConsumerRecord record){
System.out.println("收到partion-0消息:"+record);
}
}
2.6 参数配置
消费者
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
生产者
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
2.7 自动配置原理
kafka 自动配置在KafkaAutoConfiguration
- 容器中放了
KafkaTemplate
可以进行消息收发 - 容器中放了
KafkaAdmin
可以进行 Kafka 的管理,比如创建 topic 等 - kafka 的配置在
KafkaProperties
中 @EnableKafka
可以开启基于注解的模式