有一个单体项目使用kafka stream已经有了几天时间
这里记录下使用心得
其中加入的依赖是:
<!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream-binder-kafka -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>3.2.4</version>
</dependency>
使用时也比较简单
分别创建一个Consumer Producer即可
或许多一个Payload
package fun.implementsstudio.mathforhim.kafka;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.MimeTypeUtils;
@Component
@Slf4j
@EnableBinding(Source.class)
public class Producer {
@Autowired
private Source source;
public void send(Payload payload) {
source.output().send(MessageBuilder.withPayload(payload)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE)
.build());
}
}
package fun.implementsstudio.mathforhim.kafka;
import com.alibaba.fastjson2.JSONObject;
import fun.implementsstudio.mathforhim.dao.MathQuestionRepository;
import fun.implementsstudio.mathforhim.entity.MathQuestion;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
@Slf4j
@EnableBinding(Sink.class)
@Component
public class Consumer {
@Autowired
private MathQuestionRepository mathQuestionRepository;
@StreamListener(Sink.INPUT)
public void consume(Message<Payload> message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
log.info("partition [{}], receive [{}]", partition, message);
Payload payload = message.getPayload();
if (payload.getType().equals("question")){
MathQuestion save = mathQuestionRepository.save(JSONObject.parseObject(
payload.getDataJson(), MathQuestion.class));
log.info("kafka save one question id:{}",save.getId());
}
}
}
package fun.implementsstudio.mathforhim.kafka;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class Payload {
private String type;
private String dataJson;
}
以上示例中
通过点击创建题目 比如生成上百上千道题
每道题保存时通过kafka进行messaging保存
从而实现接口异步化也可有效实现用户大批量创建数据的最终完整度