参考文献:
基本概念:https://zhuanlan.zhihu.com/p/392568942
可靠传输:https://www.zhihu.com/question/483747691/answer/2392949203
kafka基本概念?
kafka有逻辑分区叫做topic,每个topic可以设置多个partition物理分区,每个物理分区可以设置一个或多个副本。生产者producer将数据推送到topic,topic默认轮询分配给某一个partition中,该partition(即partition-leader)将数据复制给相应的副本(partition-follower)。
消费者consumer都有隶属指定的消费者组consumer-group,一个消费者组只允许消费指定topic中的数据,一个topic中的一条数据在一个消费者组中只允许被消费一次,但是可以被多个消费者组消费。
一条数据并不是被消费后立马删除的,而是会留在队列(意思还在topic中)里面,可以设置一定的策略删除消息。
kafka集群是借助zookeeper实现的,一个节点就是一个broker,节点信息在zookeeper中记录。
消费者获取消息是pull的形式,而不是broker推送的。好处就是consumer根据自身消费能力进行消费,如果采用推送的形式,在压力过大的情况下会使consumer崩溃。
如何保证消费的可靠传输?
生产者端,可以设置ack确认回执,默认ack = 0 不回执,当ack = 1就是leader分区将消息写入本地日志文件了然后回执, 当ack = all 就是leader和follower都记录下消息了后回执。
在集群中,即便有哪个节点宕机了,只要是给对应的partition分区创建副本,就可以正常被消费。
在消费者端,修改成手动提交offset偏移量(记录消费到哪一条消息了),消息被正确处理才进行提交,而不是一拉到消息就响应提交。
如何避免重复?
保证消息消费的幂等性,比如给消息创建唯一id,消费者接收到并正确处理的消息存到redis中,每次新的消息进来都先去redis查一下,已经存在的就不做业务处理了。
保证消息消费的顺序
一般我们都是设定多个partition,如果某类消息要保证顺序消费,那必须保证消息都发到了同一个partition(同一个分区就是存到同一个队列了,肯定保证顺序性的),因为多个分区之间是无法保证顺序性的。
但是我们又不能只设置一个partition,所以我们在生产者往topic中发消息的时候指定一个partition或者指定同一个key,因为同一个key的消息可以保证发送到同一个partition。
java中使用kafka?
配置文件配置kafka地址,消费者组
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: Qgchun// 设置默认的生产者消费者所属组id
生产者用kafkatemplate.send("主题","msg")发送消息
@Service
public class MessageServiceKafkaImpl implements MessageService {
@Resource
private KafkaTemplate<String,String> kafkaTemplate;
@Override
public void sendMessage(String id) {
System.out.println("待发送短信的订单已纳入处理队列(kafka),id:"+id);
kafkaTemplate.send("qil0820",id);//使用send方法发送消息,需要传入topic名称
}
@Override
public String doMessage() {
return null;
}
}
消费者在@Component类中用@KafkaListener(topics = "topicA")作用到方法上监听并拉去消息进行消费。
@Component
public class MessageListener {
@KafkaListener(topics = "qil0820")
public void onMessage(ConsumerRecord<String,String> record){
System.out.println("已完成短信发送业务(kafka),id:"+record.value());
}
}
总结
kafka就是一个分布式的消息订阅的中间件,kafka的消息存储在硬盘上,可以重复消费。
有 主题、分区、消息、副本、生产者、消费者等概念;
主题只是消息的逻辑容器,分区才是消息的物理容器;一个主题可以分出多个分区,分区可以分布在多个节点上;一个leader分区可以有0或多个副本分区follower,follower只是负责数据冗余(备份),读写操作都在leader上。
生产者推送消息需要指定主题,分区可以不指定,具体落到哪个分区上就看用的什么策略了;比如推送的消息带key的就对key进行hash计算看到哪个分区上,不带key的消息就轮询到某个分区上;生产者可以设置ack确认回执,默认ack = 0 不回执,当ack = 1就是leader分区将消息写入本地日志文件了然后回执, 当ack = all 就是leader和follower都记录下消息了后回执。
消费者手动拉取消息,需要指定要消费的topic主题。消费完消息后可以指定offset偏移量,一般就是消费的最后一条消息的offset + 1, 下次消费就从这个新的偏移量开始,避免重复消费。有个消费者组的概念,一般消费者都会指定消费者组,多个消费者可拥有同一个groupId消费者组id.其中一个消费组内的消费者不能消费同一个分区,所以一个消费者组对同一消息只能消费一次。而不同消费者组可对一条消息进行不同次消费。
springboot整合kafka需要导入依赖spring-kafka,生产者推送消息用的是kafkatemplate.send()方法,消费者拉取消息需要在方法上用注解@KafkaListener(topics = "test1", groupId = "groupA")并在参数ConsumerRecord中获取消息内容。