首页 > 其他分享 >使用kafka完成数据的实时同步,同步到es中。(使用kafka实现自动上下架 upper、lower)

使用kafka完成数据的实时同步,同步到es中。(使用kafka实现自动上下架 upper、lower)

时间:2024-08-29 13:24:40浏览次数:5  
标签:upper 同步 Autowired 专辑 private kafka albumInfo albumId new

文章目录

  • 上架:新增专辑到 es
  • 下架:删除专辑
  1. 新增:如果是公开的专辑则发送消息给kafka,search通过监听器获取消息同步新增数据
  2. 更新:如果是公开的专辑则发送消息给kafka,search通过监听器获取消息同步更新数据
            如果是私有的专辑则发送消息给kafka,search通过监听器获取消息es删除数据
  3. 删除:发送消息给kafka,search通过监听器获取消息es删除数据

1、发送消息 KafkaService

package com.atguigu.tingshu.common.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;

@Service
public class KafkaService {

    private static final Logger logger = LoggerFactory.getLogger(KafkaService.class);

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 向指定主题发送消息
     * 此方法通过调用重载的sendMsg方法,向指定主题发送消息,使用默认的消息标签和消息键
     *
     * @param topic 发送消息的主题
     * @param msg   需要发送的消息内容
     */
    public void sendMsg(String topic, String msg){
        // 调用重载的sendMsg方法,传入默认值以简化调用
        this.sendMsg(topic, null, null, msg);
    }

    /**
     * 发送消息到指定的Kafka主题
     *
     * @param topic 消息主题
     * @param partition 分区编号
     * @param key 消息键值
     * @param msg 消息内容
     */
    public void sendMsg(String topic, Integer partition, String key, String msg){
        // 发生消息并返回异步结果
        CompletableFuture<SendResult> future = this.kafkaTemplate.send(topic, partition, key, msg);

        // 异步处理发送结果
        future.whenCompleteAsync((result, ex) -> {
            if (ex != null){
                // 如果发送过程中出现异常
                logger.error("生产者发送消息失败!原因:{}", ex.getMessage());
            }
        });
    }

}

  • whenCompleteAsync:异步完成时的处理、当异步操作完成时

在这里插入图片描述

2、生产者 service-album -> AlbumInfoServiceImpl

在这里插入图片描述

2.1、新增 saveAlbumInfo()

  • 新增:如果是公开的专辑则发送消息给kafka,search通过监听器获取消息同步新增数据
    在这里插入图片描述
    在这里插入图片描述

在这里插入图片描述

@Slf4j
@Service
@SuppressWarnings({"unchecked", "rawtypes"})
public class AlbumInfoServiceImpl extends ServiceImpl<AlbumInfoMapper, AlbumInfo> implements AlbumInfoService {
    @Autowired
    private AlbumAttributeValueMapper attributeValueMapper;

    @Autowired
    private AlbumStatService albumStatService;

    @Autowired
    private KafkaService kafkaService;

    @Transactional(rollbackFor = Exception.class)
    @Override
    public void saveAlbumInfo(AlbumInfoVo albumInfoVo) throws FileNotFoundException {
        // 1.保存专辑信息表
        AlbumInfo albumInfo = new AlbumInfo();
        BeanUtils.copyProperties(albumInfoVo, albumInfo);
        // 设置当前用户的id
        Long userId = AuthContextHolder.getUserId();
        albumInfo.setUserId(userId == null ? 1 : userId);
        albumInfo.setTracksForFree(5);
        albumInfo.setSecondsForFree(30);
        albumInfo.setStatus(SystemConstant.ALBUM_STATUS_PASS);
        this.save(albumInfo);
        // 主键回写获取专辑id
        Long albumInfoId = albumInfo.getId();

        // 2.保存专辑标签值表
        List<AlbumAttributeValueVo> albumAttributeValueVoList = albumInfoVo.getAlbumAttributeValueVoList();
        if (!CollectionUtils.isEmpty(albumAttributeValueVoList)) {
            albumAttributeValueVoList.forEach(albumAttributeValueVo -> {
                AlbumAttributeValue albumAttributeValue = new AlbumAttributeValue();
                BeanUtils.copyProperties(albumAttributeValueVo, albumAttributeValue);
                albumAttributeValue.setAlbumId(albumInfoId);
                this.attributeValueMapper.insert(albumAttributeValue);
            });
        }

//		new FileInputStream("xxx");

//		try {
//			TimeUnit.SECONDS.sleep(3);
//		} catch (InterruptedException e) {
//			throw new RuntimeException(e);
//		}

        // 3.保存统计信息:专辑状态表
        // this.saveAlbumStat(albumInfoId);
        this.albumStatService.saveAlbumStat(albumInfoId);

        if (StringUtils.equals(albumInfo.getIsOpen(), "1")) {
            this.kafkaService.sendMsg(KafkaConstant.QUEUE_ALBUM_UPPER, albumInfoId.toString());
        }

//		int i = 1/0;
    }
}

在这里插入图片描述

2.2、更新 updateAlbumInfo()

  • 更新:如果是公开的专辑则发送消息给kafka,search通过监听器获取消息同步更新数据
            如果是私有的专辑则发送消息给kafka,search通过监听器获取消息es删除数据
@Slf4j
@Service
@SuppressWarnings({"unchecked", "rawtypes"})
public class AlbumInfoServiceImpl extends ServiceImpl<AlbumInfoMapper, AlbumInfo> implements AlbumInfoService {

    @Autowired
    private AlbumAttributeValueMapper attributeValueMapper;

    @Autowired
    private KafkaService kafkaService;

    @Transactional
    @Override
    public void updateAlbumInfo(Long albumId, AlbumInfoVo albumInfoVo) {
        AlbumInfo albumInfo = new AlbumInfo();
        BeanUtils.copyProperties(albumInfoVo, albumInfo);
        albumInfo.setId(albumId);
        this.updateById(albumInfo);

        // 更新专辑标签值表:先删除该专辑所有的标签及值 再去新增
        this.attributeValueMapper.delete(new LambdaUpdateWrapper<AlbumAttributeValue>().eq(AlbumAttributeValue::getAlbumId, albumId));
        List<AlbumAttributeValueVo> albumAttributeValueVoList = albumInfoVo.getAlbumAttributeValueVoList();
        if (!CollectionUtils.isEmpty(albumAttributeValueVoList)) {
            albumAttributeValueVoList.forEach(albumAttributeValueVo -> {
                AlbumAttributeValue albumAttributeValue = new AlbumAttributeValue();
                BeanUtils.copyProperties(albumAttributeValueVo, albumAttributeValue);
                albumAttributeValue.setAlbumId(albumId);
                this.attributeValueMapper.insert(albumAttributeValue);
            });
        }

        if (StringUtils.equals(albumInfoVo.getIsOpen(), "1")) {
            this.kafkaService.sendMsg(KafkaConstant.QUEUE_ALBUM_UPPER, albumId.toString());
        } else {
            this.kafkaService.sendMsg(KafkaConstant.QUEUE_ALBUM_LOWER, albumId.toString());
        }
    }
}

在这里插入图片描述

2.3、删除 removeAlbumInfo()

  • 删除:发送消息给kafka,search通过监听器获取消息es删除数据
@Slf4j
@Service
@SuppressWarnings({"unchecked", "rawtypes"})
public class AlbumInfoServiceImpl extends ServiceImpl<AlbumInfoMapper, AlbumInfo> implements AlbumInfoService {

    @Autowired
    private AlbumAttributeValueMapper attributeValueMapper;

    @Autowired
    private AlbumStatMapper albumStatMapper;

    @Autowired
    private KafkaService kafkaService;

    @Transactional
    @Override
    public void removeAlbumInfo(Long albumId) {
        this.removeById(albumId);

        this.albumStatMapper.delete(new LambdaUpdateWrapper<AlbumStat>().eq(AlbumStat::getAlbumId, albumId));

        this.attributeValueMapper.delete(new LambdaUpdateWrapper<AlbumAttributeValue>().eq(AlbumAttributeValue::getAlbumId, albumId));

        this.kafkaService.sendMsg(KafkaConstant.QUEUE_ALBUM_LOWER, albumId.toString());
    }
}

在这里插入图片描述

3、消费者 service-search - > AlbumListener.java

在这里插入图片描述

package com.atguigu.tingshu.search.listener;

@Component
public class AlbumListener {

    @Autowired
    private AlbumInfoFeignClient albumInfoFeignClient;

    @Autowired
    private UserInfoFeignClient userInfoFeignClient;

    @Autowired
    private CategoryFeignClient categoryFeignClient;

    @Autowired
    private ElasticsearchTemplate elasticsearchTemplate;

    @KafkaListener(topics = KafkaConstant.QUEUE_ALBUM_UPPER)
    public void upper(String albumId){
        if (StringUtils.isBlank(albumId)){
            return;
        }

        // 根据专辑id查询专辑
        Result<AlbumInfo> albumInfoResult = this.albumInfoFeignClient.getAlbumInfo(Long.valueOf(albumId));
        Assert.notNull(albumInfoResult, "同步数据时,获取专辑信息失败!");
        AlbumInfo albumInfo = albumInfoResult.getData();
        Assert.notNull(albumInfo, "同步数据时,没有对应的专辑!");

        AlbumInfoIndex albumInfoIndex = new AlbumInfoIndex();
        // 把专辑信息中的数据复制到index对象
        BeanUtils.copyProperties(albumInfo, albumInfoIndex);

        // 查询主播获取主播信息
        Result<UserInfoVo> userInfoVoResult = this.userInfoFeignClient.getUserById(albumInfo.getUserId());
        Assert.notNull(userInfoVoResult, "数据导入时,获取主播信息失败!");
        UserInfoVo userInfoVo = userInfoVoResult.getData();
        if (userInfoVo != null){
            albumInfoIndex.setAnnouncerId(userInfoVo.getId());
            albumInfoIndex.setAnnouncerName(userInfoVo.getNickname());
        }

        // 根据三级分类id查询一二三级分类
        Result<BaseCategoryView> categoryResult = this.categoryFeignClient.getAllLevelCategories(albumInfo.getCategory3Id());
        Assert.notNull(categoryResult, "数据导入时,获取分类信息失败!");
        BaseCategoryView baseCategoryView = categoryResult.getData();
        if (baseCategoryView != null) {
            albumInfoIndex.setCategory1Id(baseCategoryView.getCategory1Id());
            albumInfoIndex.setCategory2Id(baseCategoryView.getCategory2Id());
        }

        // 查询专辑统计信息
//                Result<AlbumStatVo> albumStatesResult = this.albumInfoFeignClient.getAlbumStates(albumInfo.getId());
//                Assert.notNull(albumStatesResult, "数据导入时,获取专辑统计信息失败!");
//                AlbumStatVo albumStatVo = albumStatesResult.getData();
//                if (albumStatVo != null) {
//                    BeanUtils.copyProperties(albumStatVo, albumInfoIndex);
//                }
        // 假数据:
        int playNum = (new Random().nextInt(100) + 1) * 10000;
        albumInfoIndex.setPlayStatNum(playNum);
        int subscribeNum = (new Random().nextInt(100) + 1) * 10000;
        albumInfoIndex.setSubscribeStatNum(subscribeNum);
        int buyNum = (new Random().nextInt(100) + 1) * 10000;
        albumInfoIndex.setBuyStatNum(buyNum);
        int commentNum = (new Random().nextInt(100) + 1) * 10000;
        albumInfoIndex.setCommentStatNum(commentNum);
        // 热度
        albumInfoIndex.setHotScore(playNum * 0.1 + commentNum * 0.2 + subscribeNum * 0.3 + buyNum * 0.4);

        // 标签
        Result<List<AlbumAttributeValue>> albumAttributeValueResult = this.albumInfoFeignClient.getAlbumAttributeValue(albumInfo.getId());
        Assert.notNull(albumAttributeValueResult, "数据导入时,获取标签及值失败!");
        List<AlbumAttributeValue> albumAttributeValues = albumAttributeValueResult.getData();
        if (!CollectionUtils.isEmpty(albumAttributeValues)){
            // 把List<AlbumAttributeValue> 转化成  List<AttributeValueIndex>
            albumInfoIndex.setAttributeValueIndexList(albumAttributeValues.stream().map(albumAttributeValue -> {
                AttributeValueIndex attributeValueIndex = new AttributeValueIndex();
                BeanUtils.copyProperties(albumAttributeValue, attributeValueIndex);
                return attributeValueIndex;
            }).collect(Collectors.toList()));
        }

        this.elasticsearchTemplate.save(albumInfoIndex);
    }

    @KafkaListener(topics = KafkaConstant.QUEUE_ALBUM_LOWER)
    public void lower(String albumId){
        if (StringUtils.isBlank(albumId)){
            return;
        }
        this.elasticsearchTemplate.delete(albumId, AlbumInfoIndex.class);
    }
}

标签:upper,同步,Autowired,专辑,private,kafka,albumInfo,albumId,new
From: https://blog.csdn.net/m0_65152767/article/details/141610060

相关文章

  • 生物实验室远程文件同步,怎么做才能既安全又高效?
    在生物实验室中,有许多数据需要及时地同步,这些数据的同步对于确保实验结果的准确性、保障实验人员的安全、提高实验室管理效率以及满足外部合规性要求至关重要。以下是一些具体需要同步的文件类型:实验原始数据:包括实验过程中收集的所有直接测量结果,如基因序列、蛋白质表达水平、......
  • kafka ---- producer与broker配置详解以及ack机制详解
    一、producer配置1、bootstrap.serverskafkabroker集群的ip列表,格式为:host1:port1,host2:port2,…2、client.id用于追踪消息的源头3、retries当发送失败时客户端会进行重试,重试的次数由retries指定,默认值是2147483647,即Integer.MAX_VALUE;在重试次数耗尽和delivery.......
  • Apache Kafka 简介、使用场景及特点
    ApacheKafka简介、使用场景及特点1.什么是ApacheKafka?ApacheKafka是一种开源的分布式流处理平台,最初由LinkedIn开发,并在2011年成为Apache软件基金会的顶级项目。Kafka专为高吞吐量、低延迟的实时数据处理设计,广泛应用于各种数据流处理场景。Kafka的核心组......
  • window下kafka3启动多个
    准备工作我们先安装好kafka,并保证启动成功,可参考文章Windows下安装Kafka3-CSDN博客复制kafka安装文件kafka3已经内置了zookeeper,所以直接复制就行了修改zookeeper配置文件这里我们修改zookeeper配置文件,主要是快照地址和端口号,并且端口号与第一个不同修改kafka服......
  • SpringBoot配置多个kafka配置
    引入依赖<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.7.14</version></dependency>yml配置有几个就配置几个......
  • Windows下安装Kafka3
    参考文章:Windows下安装Kafka3_kafka3window-CSDN博客安装配置Kafka首先在官网 ApacheKafka 下载Kafka二进制压缩包。无论是在CentOS还是在Windows下都是下载该压缩包,里面已经包含了KafkaLinux和windows平台下的可执行文件了。选择目前比较新的3.6.1版本下载将之前下载......
  • c#关于同步 /异常/多线程/事件 事例
    sync同步async异步,要与await成对使用Thread //计算程序执行时间StopWatch sw=StopWatch.Start();转自:https://codeload.github.com/zhaoxueliang86/WinFormsAsyncAwait/zip/refs/heads/BilibiliB站UP主:银色 usingSystem.Diagnostics;usingSystem.Text;na......
  • Spring Boot 整合 Kafka
    项目目录结构pom.xml<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--Spr......
  • kafka如何合理设置broker、partition、consumer数量
    目录1.broker的数量最好大于等于partition数量2.consumer数量最好和partition数量一致3.总结1.broker的数量最好大于等于partition数量一个partition最好对应一个硬盘,这样能最大限度发挥顺序写的优势。一个broker如果对应多个partition,需要随机分发,顺序IO会退化成随机IO。实......
  • 面试官:Kafka中的key有什么用?
    我们在使用Kafka时,最简单、最常用的方式是只设置topic(主题)和value(消息体),如下所示:这样的话获取消息的代码也很简单,如下所示:@KafkaListener(topics="mytopic",groupId="my-group")publicvoidlisten(Stringdata){System.out.println("监听到消息:"+data);}......