流程
kafka事务使用的5个API
// 1. 初始化事务
void initTransactions();
// 2. 开启事务
void beginTransaction() throws ProducerFencedException;
// 3. 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition,OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException;
// 4. 提交事务
void commitTransaction() throws ProducerFencedException;
// 5. 放弃事务(类似回滚事务的操作)
void abortTransaction() throws ProducerFencedException;
(1)初始化事务
生产者带着自定义的transactional.id发起initTransactions(),事务协调器会分配一个Pid给生产者,同幂等性一样;
(2)开启事务
生产者调用beginTransaction()开启事务,事务协调器会记录事务状态并保存在事务日志中,事务日志是存储与内部topic __transaction_state里的,对消费者不可见;
(3)提交位移Offset
生产者发送OffsetCommitRequest 请求到消费组协调者,消费组协调者会把 offset 存储到 Kafka 内部主题 __consumer_offsets 中。
(4)提交事务or回滚事务
生产者在完成消息发送后,决定提交事务
调用commitTransaction()或者abortTransaction()
事务协调器会向每个分区Leader写入一个控制消息(abort或者commit),会先发送EndTransaction请求给事务协调器
TC会向内部主题写入prepare_commit或者prepare_abort消息,后会发送给主题分区Leader一个控制消息commit或者abort,告知消息是接受还是丢弃;
如果事务成功提交,事务的消息将对消费者可见,而中止的消息会不可见,消费者在read_committed隔离级别下对于中止消息是不可见的。
标签:事务,流程,throws,void,消息,提交,kafka,ProducerFencedException
From: https://www.cnblogs.com/iamxiaofu/p/18253181