Spring Boot 可以与 Apache Kafka 集成:
添加 Maven 依赖:
在您的 Spring Boot 项目的 pom.xml 文件中添加以下 Maven 依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.1</version> <!-- 请根据实际情况使用最新版本 -->
</dependency>
配置 Kafka 连接:
在 application.properties 或 application.yml 文件中配置 Kafka 的连接信息,例如:
application.properties:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group-id
spring.kafka.consumer.auto-offset-reset=earliest
application.yml:
yaml
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: my-group-id
auto-offset-reset: earliest
创建 Kafka Producer:
创建一个继承自 org.springframework.kafka.core.KafkaTemplate 的自定义类,并重写 send() 方法。在 send() 方法中,您可以将消息发送到 Kafka。例如:
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
创建 Kafka Consumer:
创建一个实现 org.springframework.kafka.core.ConsumerRecordHandler 接口的自定义类,并重写 handle() 方法。在 handle() 方法中,您可以处理接收到的 Kafka 消息。例如:
import org.springframework.kafka.core.ConsumerRecordHandler;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer implements ConsumerRecordHandler<String, String> {
@Override
public void handle(ConsumerRecord<String, String> record) {
System.out.println("Received message: " + record.value());
}
}
配置 Kafka Listener:
在需要接收消息的地方,例如服务层或控制器层,注入您的 Kafka Consumer,并配置消息监听器。例如:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class MyService {
private final KafkaConsumer kafkaConsumer;
@Autowired
public MyService(KafkaConsumer kafkaConsumer) {
this.kafkaConsumer = kafkaConsumer;
}
@KafkaListener(topics = "my-topic", groupId = "my-group-id")
public void listenMessage() {
kafkaConsumer.handle(new ConsumerRecord<>("my-topic", 0, 0, "", "Hello from Kafka!"));
}
}
标签:spring,boot,springframework,kafka,kafaka,org,Kafka,public
From: https://www.cnblogs.com/ArthurHenry/p/17672356.html