首页 > 其他分享 >stream kafka 的使用记录

stream kafka 的使用记录

时间:2023-01-01 09:55:23浏览次数:43  
标签:stream 记录 messaging springframework kafka org import

有一个单体项目使用kafka stream已经有了几天时间
这里记录下使用心得

其中加入的依赖是:

<!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream-binder-kafka -->
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-stream-binder-kafka</artifactId>
	<version>3.2.4</version>
</dependency>

使用时也比较简单
分别创建一个Consumer Producer即可
或许多一个Payload

package fun.implementsstudio.mathforhim.kafka;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.MimeTypeUtils;

@Component
@Slf4j
@EnableBinding(Source.class)
public class Producer {

    @Autowired
    private Source source;

    public void send(Payload payload) {
        source.output().send(MessageBuilder.withPayload(payload)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE)
                .build());
    }

}

package fun.implementsstudio.mathforhim.kafka;

import com.alibaba.fastjson2.JSONObject;
import fun.implementsstudio.mathforhim.dao.MathQuestionRepository;
import fun.implementsstudio.mathforhim.entity.MathQuestion;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@Slf4j
@EnableBinding(Sink.class)
@Component
public class Consumer {

    @Autowired
    private MathQuestionRepository mathQuestionRepository;


    @StreamListener(Sink.INPUT)
    public void consume(Message<Payload> message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        log.info("partition [{}], receive [{}]", partition, message);
        Payload payload = message.getPayload();
        if (payload.getType().equals("question")){
            MathQuestion save = mathQuestionRepository.save(JSONObject.parseObject(
                    payload.getDataJson(), MathQuestion.class));
            log.info("kafka save one question id:{}",save.getId());
        }
    }

}

package fun.implementsstudio.mathforhim.kafka;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class Payload {

    private String type;
    private String dataJson;
}

以上示例中
通过点击创建题目 比如生成上百上千道题
每道题保存时通过kafka进行messaging保存
从而实现接口异步化也可有效实现用户大批量创建数据的最终完整度

标签:stream,记录,messaging,springframework,kafka,org,import
From: https://www.cnblogs.com/ukzq/p/17017749.html

相关文章