1 引入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2 配置Kafka
配置server、consumer
默认组id在config/consumer.properties
文件下
# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=localhost:9092
# consumer group id
group.id=test-consumer-group
application.yml
spring:
# kafaka
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: test-consumer-group
enable-auto-commit: true # 自动提交
auto-commit-interval: 3000 # 自动提交的频率(ms)
3 访问Kafaka
生产者主动发送消息,消费者被动处理消息。
KafkaTests.java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@SpringBootTest
public class KafkaTests {
@Autowired
private KafkaProducer kafkaProducer;
@Test
public void testKafka() {
kafkaProducer.sendMessage("test", "你好");
kafkaProducer.sendMessage("test", "在吗");
try {
Thread.sleep(1000 * 10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 生产者
@Component
class KafkaProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMessage(String topic, String content) {
kafkaTemplate.send(topic, content);
}
}
// 消费者
@Component
class KafkaConsumer {
@KafkaListener(topics = {"test"})
public void handleMessage(ConsumerRecord record) {
System.out.println(record.value());
}
}
标签:二十六,Spring,springframework,kafka,Kafaka,import,test,org,consumer
From: https://www.cnblogs.com/dalelee/p/16837582.html