1、数据流向:被动接收数据
特点:及时性高,数据延迟小,Kafka的数据发送和接收都是毫秒级的。
2、接入参数
kafka:
security.protocol: SSL
ssl.endpoint.identification.algorithm:
ssl:
protocol: SSL
key-store-type: JKS
trust-store-type: JKS
#key-store-location: file:/root/kafka/ssl/client.keystore.jks
key-store-location: file:/Users/Downloads/kafka/client.keystore.jks
key-store-password: kafka-12345
key-password: kafka-12345
trust-store-location: file:/Users//Downloads/kafka/client.truststore.jks
#trust-store-location: file:/root/kafka/ssl/client.truststore.jks
trust-store-password: kafka-12345
properties:
ssl:
endpoint:
identification:
algorithm: ''
security:
protocol: SSL
bootstrap-servers: iot.test.com:9002
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: kafka-demo-kafka-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3、认证文件
client.keystore.jks
client.truststore.jks
4、依赖坐标
<!-- kafkfa -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
5、主题订阅
主题名称:XXX-Product_event
确认硬件设备对应的主题消息,订阅该主题以接收消息。
6、监听服务-代码示例
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.cscecnf.trolley.entity.PersonnelIdentification;
import com.cscecnf.trolley.util.DateUtils;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
/**
* Kafka监听服务
*/
@Log4j2
@Service
public class KafkaListenerService {
/**
* 消费端:指定监听主题
* @param consumerRecord 监听消息
*/
@KafkaListener(topics = {"XXX-Product_event"})
public void handlerMsg(ConsumerRecord<String, String> consumerRecord) {
log.info("接收到消息:消息主机Key:" + consumerRecord.key() );
log.info("接收到消息:消息值Value:" + consumerRecord.value() + ",消息偏移量:" + consumerRecord.offset());
getXxxMessage(consumerRecord.value());
}
/**
* 获取XX信息并入库
* @param record record
*/
private void getXxxMessage(String record){
if (StringUtils.isEmpty(record)){
return ;
}
JSONObject jsonObject = JSON.parseObject(record);
// 根据实际业务返回的数据结构进行解析;
}
}
标签:对接,kafka,jks,key,org,import,Kafka,数据,store From: https://www.cnblogs.com/heyi-77/p/17614316.html