首页 > 其他分享 >第10天

第10天

时间:2022-11-18 16:58:19浏览次数:51  
标签:10 heima 文章 import org com public

01、学习目标

  • 能够搭建XXL-Job环境搭建
  • 能够完成热点文章定时缓存

02、热点文章定时计算:查询所有频道Feign接口

1650501193614

计算完成新热数据后,需要给每个频道缓存一份数据,所以需要查询所有频道信息

在heima-leadnews-api定义远程接口

package com.heima.wemedia.feign;

import com.heima.common.dtos.ResponseResult;
import com.heima.model.wemedia.pojos.WmChannel;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;

import java.util.List;

@FeignClient(name = "leadnews-wemedia")
public interface WemediaFeign {

    /**
     * 查询所有频道
     */
    @GetMapping("/api/v1/channel/channels")
    public ResponseResult<List<WmChannel>> channels();
}


heima-leadnews-wemedia端提供接口(已提供)

package com.heima.wemedia.controller.v1;

import com.heima.common.dtos.ResponseResult;
import com.heima.wemedia.service.WmChannelService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/api/v1/channel")
public class WmchannelController {

    @Autowired
    private WmChannelService wmChannelService;

    @GetMapping("/channels")
    public ResponseResult findAll(){
        return wmChannelService.findAll();
    }
}

03、热点文章定时计算:功能实现(1)-查询前5天文章

1)Mapper提供查询前5天文章

修改ApArticleMapper类

package com.heima.article.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.heima.model.article.dtos.ArticleHomeDto;
import com.heima.model.article.pojos.ApArticle;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.simpleframework.xml.Path;

import java.util.Date;
import java.util.List;

public interface ApArticleMapper extends BaseMapper<ApArticle> {

    List<ApArticle> loadApArticle(@Param("dto") ArticleHomeDto dto, @Param("type") int type);

    List<ApArticle> findArticleListByLastDays(@Param("lastDays") Date lastDays);
}

在ApArticleMapper.xml新增方法

<!-- 查询最近发布的文章 -->
    <select id="findArticleListByLastDays" resultType="com.heima.model.article.pojos.ApArticle">
            SELECT
              aa.*
            FROM
             ap_article aa
               INNER JOIN ap_article_config ac
               ON ac.`article_id` = aa.`id`
               WHERE
                ac.`is_down` = 0
                AND ac.`is_delete` = 0
                AND aa.`publish_time` &gt;= #{lastDays}
    </select>

2)编写业务逻辑

新建业务接口

package com.heima.article.service;

/**
 * 热点文章业务
 */
public interface HotArticleService {

    /**
     * 定时计算热点文章,进行缓存
     */
    public void computeHotArticle();
}

实现

package com.heima.article.service.impl;

import com.heima.article.mapper.ApArticleMapper;
import com.heima.article.service.HotArticleService;
import com.heima.model.article.pojos.ApArticle;
import org.joda.time.DateTime;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Date;
import java.util.List;

@Service
public class HotArticleServiceImpl implements HotArticleService {
    @Autowired
    private ApArticleMapper apArticleMapper;
    @Override
    public void computeHotArticle() {

        //查询最近5天的文章(所有频道的文章)
        //获取前第5天的时间
        Date lastDay = DateTime.now().minusDays(5).toDate();
        List<ApArticle> articleList = apArticleMapper.findArticleListByLastDays(lastDay);

        //统计所有文章的分值(阅读,点赞,收藏,评论)


        //按照不同频道筛选出文章,对文章进行分值倒序,截取前30条,存入redis数据库

    }
}



04、热点文章定时计算:功能实现(2)-计算文章分值

1)Vo对象

定义Vo对象,用于存储带分值的文章

package com.heima.model.article.dtos;

import com.heima.model.article.pojos.ApArticle;
import lombok.Data;

@Data
public class HotArticleVo extends ApArticle {
    /**
     * 文章分值
     */
    private Integer score;
}

定义文章分值权限常量

package com.heima.common.constants;

public class ArticleConstants {
    public static final Integer HOT_ARTICLE_LIKE_WEIGHT = 3;
    public static final Integer HOT_ARTICLE_COMMENT_WEIGHT = 5;
    public static final Integer HOT_ARTICLE_COLLECTION_WEIGHT = 8;
}

2)计算分值逻辑

package com.heima.article.service.impl;

import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.heima.article.mapper.ApArticleMapper;
import com.heima.article.service.HotArticleService;
import com.heima.common.constants.ArticleConstants;
import com.heima.model.article.dtos.HotArticleVo;
import com.heima.model.article.pojos.ApArticle;
import com.heima.utils.common.BeanHelper;
import org.joda.time.DateTime;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;

@Service
public class HotArticleServiceImpl implements HotArticleService {
    @Autowired
    private ApArticleMapper apArticleMapper;
    @Override
    public void computeHotArticle() {

        //查询最近5天的文章(所有频道的文章)
        //获取前第5天的时间
        Date lastDay = DateTime.now().minusDays(5).toDate();
        List<ApArticle> articleList = apArticleMapper.findArticleListByLastDays(lastDay);

        //统计所有文章的分值(阅读,点赞,收藏,评论)
        List<HotArticleVo> hotArticleVoList = computeHotArticleScore(articleList);

        //按照不同频道筛选出文章,对文章进行分值倒序,截取前30条,存入redis数据库

    }

    /**
     * 统计所有文章的分值
     * @param articleList
     * @return
     */
    private List<HotArticleVo> computeHotArticleScore(List<ApArticle> articleList) {
        List<HotArticleVo> hotArticleVoList = new ArrayList<>();

        if(CollectionUtils.isNotEmpty(articleList)){
            for(ApArticle apArticle:articleList){
                HotArticleVo hotArticleVo = BeanHelper.copyProperties(apArticle,HotArticleVo.class);
                //计算分值
                int score = computeScore(apArticle);
                hotArticleVo.setScore(score);
                hotArticleVoList.add(hotArticleVo);
            }
        }

        return hotArticleVoList;
    }

    /**
     * 计算一篇文章的分值
     * @param apArticle
     * @return
     */
    private int computeScore(ApArticle apArticle) {
        int score = 0;

        //阅读分值
        if(apArticle.getViews()!=null){
            score+=apArticle.getViews();
        }
        //点赞
        if(apArticle.getLikes()!=null){
            score+=apArticle.getLikes()* ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT; //加权
        }
        //评论
        if(apArticle.getComment()!=null){
            score+=apArticle.getComment()* ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT; //加权
        }
        //收藏
        if(apArticle.getCollection()!=null){
            score+=apArticle.getCollection()* ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT; //加权
        }
        return score;
    }
}


05、热点文章定时计算:功能实现(3)-缓存文章到Redis

1)article微服务整合Redis

依赖导入

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

添加配置(Nacos中)

spring:
  redis:
    host: 192.168.66.133
    password: leadnews
    port: 6379  
    database: 1

2)article微服务开启Feign

依赖导入

 <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>
        <dependency>
            <groupId>com.heima</groupId>
            <artifactId>heima-leadnews-api</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>

开启Feign

 

3)编写业务逻辑

Redis常量类:

package com.heima.common.constants;

public abstract class RedisConstants {
    public static String TASK_TOPIC_PREFIX = "task_topic_";
    public static final String HOT_ARTICLE_FIRST_PAGE = "hot_article_first_page_";
    public static final String DEFAULT_TAG="__all__";
}

package com.heima.article.service.impl;

import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.heima.article.mapper.ApArticleMapper;
import com.heima.article.service.HotArticleService;
import com.heima.common.constants.ArticleConstants;
import com.heima.common.constants.RedisConstants;
import com.heima.common.dtos.ResponseResult;
import com.heima.model.article.dtos.HotArticleVo;
import com.heima.model.article.pojos.ApArticle;
import com.heima.model.wemedia.pojos.WmChannel;
import com.heima.utils.common.BeanHelper;
import com.heima.utils.common.JsonUtils;
import com.heima.wemedia.feign.WemediaFeign;
import org.joda.time.DateTime;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;

@Service
public class HotArticleServiceImpl implements HotArticleService {
    @Autowired
    private ApArticleMapper apArticleMapper;
    @Autowired
    private WemediaFeign wemediaFeign;
    @Autowired
    private StringRedisTemplate redisTemplate;
    @Override
    public void computeHotArticle() {

        //查询最近5天的文章(所有频道的文章)
        //获取前第5天的时间
        Date lastDay = DateTime.now().minusDays(5).toDate();
        List<ApArticle> articleList = apArticleMapper.findArticleListByLastDays(lastDay);

        //统计所有文章的分值(阅读,点赞,收藏,评论)
        List<HotArticleVo> hotArticleVoList = computeHotArticleScore(articleList);

        //按照不同频道筛选出文章,对文章进行分值倒序,截取前30条,存入redis数据库
        //查询所有频道
        ResponseResult<List<WmChannel>> responseResult = wemediaFeign.channels();
        if(responseResult.getCode().equals(200)){
            List<WmChannel> channelList = responseResult.getData();

            if(CollectionUtils.isNotEmpty(channelList)){
                //指定频道(有频道)
                for(WmChannel channel:channelList){
                   /* List<HotArticleVo> currentChannelHotArticles = new ArrayList<>();
                    for(HotArticleVo vo:hotArticleVoList){
                        if(vo.getChannelId().equals(channel.getId())){
                            currentChannelHotArticles.add(vo);
                        }
                    }*/

                    List<HotArticleVo> currentChannelHotArticles = hotArticleVoList.stream()
                            .filter(vo->vo.getChannelId().equals(channel.getId()))
                            .collect(Collectors.toList());

                    String key = RedisConstants.HOT_ARTICLE_FIRST_PAGE+channel.getId();
                    sortAndCacheHotArticle(key,currentChannelHotArticles);
                }

                //推荐频道(没有选择频道)
                String key = RedisConstants.HOT_ARTICLE_FIRST_PAGE+RedisConstants.DEFAULT_TAG;
                sortAndCacheHotArticle(key,hotArticleVoList);
            }
        }
    }

    /**
     * 对文章分值倒序,截取前30条,存入Redis
     * @param key
     * @param hotArticleVoList
     */
    private void sortAndCacheHotArticle(String key, List<HotArticleVo> hotArticleVoList) {
        //对文章按分值倒序
        //方式一:使用Collection的sort方法
        /**
         * 升序:o1放前面
         * 降序:o2放前面
         */
        /*hotArticleVoList.sort(new Comparator<HotArticleVo>() {
            @Override
            public int compare(HotArticleVo o1, HotArticleVo o2) {
                return o2.getScore().compareTo(o1.getScore());
            }
        });*/

        //方式二:使用Collection的stream方法
        /**
         * 升序:默认 .sorted(Comparator.comparing(HotArticleVo::getScore))
         * 降序:添加reversed()反转方法  .sorted(Comparator.comparing(HotArticleVo::getScore).reversed())
         */
        hotArticleVoList = hotArticleVoList.stream()
                .sorted(Comparator.comparing(HotArticleVo::getScore).reversed())
                .collect(Collectors.toList());


        //截取前30条
        if(hotArticleVoList.size()>30){
            hotArticleVoList = hotArticleVoList.subList(0,30);
        }

        //存入Redis
        redisTemplate.opsForValue().set(key, JsonUtils.toString(hotArticleVoList));
    }

    /**
     * 统计所有文章的分值
     * @param articleList
     * @return
     */
    private List<HotArticleVo> computeHotArticleScore(List<ApArticle> articleList) {
        List<HotArticleVo> hotArticleVoList = new ArrayList<>();

        if(CollectionUtils.isNotEmpty(articleList)){
            for(ApArticle apArticle:articleList){
                HotArticleVo hotArticleVo = BeanHelper.copyProperties(apArticle,HotArticleVo.class);
                //计算分值
                int score = computeScore(apArticle);
                hotArticleVo.setScore(score);
                hotArticleVoList.add(hotArticleVo);
            }
        }

        return hotArticleVoList;
    }

    /**
     * 计算一篇文章的分值
     * @param apArticle
     * @return
     */
    private int computeScore(ApArticle apArticle) {
        int score = 0;

        //阅读分值
        if(apArticle.getViews()!=null){
            score+=apArticle.getViews();
        }
        //点赞
        if(apArticle.getLikes()!=null){
            score+=apArticle.getLikes()* ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT; //加权
        }
        //评论
        if(apArticle.getComment()!=null){
            score+=apArticle.getComment()* ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT; //加权
        }
        //收藏
        if(apArticle.getCollection()!=null){
            score+=apArticle.getCollection()* ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT; //加权
        }
        return score;
    }
}


06、热点文章定时计算:定时更新热点文章

1)创建任务

新建执行器:leadnews-hot-article-executor

image-20210730000549587

新建任务:路由策略为轮询,Cron表达式:0 0 2 * * ?

1650505455681

2)article微服务整合XXL-Job

在article微服务导入依赖

<!--xxl-job-->
        <dependency>
            <groupId>com.xuxueli</groupId>
            <artifactId>xxl-job-core</artifactId>
            <version>2.3.0</version>
        </dependency>

在nacos配置新增配置

xxl:
  job:
    admin:
      addresses: http://localhost:8888/xxl-job-admin
    executor:
      appname: leadnews-hot-article-executor
      port: 9999

XxlJobConfig

package com.heima.article.config;

import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * xxl-job config
 *
 * @author xuxueli 2017-04-28
 */
@Configuration
public class XxlJobConfig {
    private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.executor.appname}")
    private String appname;

    @Value("${xxl.job.executor.port}")
    private int port;


    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setPort(port);
        return xxlJobSpringExecutor;
    }
    

}

3)编写定时任务类

package com.heima.article.job;

import com.heima.article.service.HotArticleService;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 定时更新首页热点文章
 */
@Component
@Slf4j
public class ComputeHotArticleJob {
    @Autowired
    private HotArticleService hotArticleService;

    @XxlJob("hotArticleComputeJob")
    public void hotArticleComputeJob(){
        log.info("定时更新首页热点文章");
        hotArticleService.computeHotArticle();
    }
}



执行后,效果如下:

 1636689669128

07、热点文章定时计算:查询文章接口改造

1)思路分析

image-20210613110712894

2)功能实现

接口:

public interface ApArticleService extends IService<ApArticle> {
    ResponseResult<List<ApArticle>> loadApArticles(ArticleHomeDto dto, int type);

    ResponseResult saveArticle(ArticleDto dto);

    /**
     * 加载文章列表
     */
    public ResponseResult loadV2(ArticleHomeDto dto,Integer type);
}

实现:

 @Override
    public ResponseResult loadApArticleFromCache(ArticleDto dto, int type) {
        //从缓存查询热点文章
        String key = RedisConstants.HOT_ARTICLE_FIRST_PAGE+dto.getTag();
        String redisData = redisTemplate.opsForValue().get(key);
        List<ApArticle> articleList = null;
        if(StringUtils.isNotEmpty(redisData)){
            List<HotArticleVo> hotArticleVoList = JsonUtils.toList(redisData, HotArticleVo.class);
            articleList = BeanHelper.copyWithCollection(hotArticleVoList,ApArticle.class);
            return ResponseResult.okResult(articleList);
        }else{
            return loadApArticle(dto,type);
        }
    }

修改Controller

    /**
     * 加载首页
     */
    @PostMapping("/load")
    public ResponseResult load(@RequestBody ArticleHomeDto dto){
        //return apArticleService.loadApArticle(dto,1);
        return apArticleService.loadApArticleV2(dto,1);
    }

08、文章点赞行为:需求分析

1)需求分析

image-20210727161838807

  • 当前登录的用户点击了”赞“,就要保存当前行为数据(游客不能点赞)
  • 可以取消点赞

点赞需要存储的数据:

1)绑定用户ID

2)绑定文章ID

一篇文章可以被多个用户点赞(文章ID和用户ID是一对多)

Hash结构:
< 文章ID, <用户ID,点赞记录数据> >

​ 1 1 xxxx

​ 2 xxxx

​ 3 xxxx

电商的购物车:用户ID,商品ID,用户ID和商品ID是一对多

Hash结构:
< 用户ID, <商品ID,购买数据对象> >

​ get(用户ID).get(商品ID)

List结构

​ < 用户ID, <购买数据对象(包含商品ID)> >

购买数据包括:商品购买数量,购买时价格,商品名称

注意:需要记录这个赞谁点的?

注意:点赞数据存在Redis/MongoDB中

2)点赞的数据结构

我们可以使用Redis的Hash结构(双层Map结构)存储点赞数据

Key:文章ID

Value:这里的value就使用hash结构,key为userId,value为Dto对象(Dto为接受点赞请求的对象)10

1001:文章ID

​ 1: xx

​ 2:xxx

​ 3:xxxx

09、文章点赞行为:搭建行为微服务

1)创建项目和导入依赖

 1636942165592

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>heima-leadnews-service</artifactId>
        <groupId>com.heima</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>heima-leadnews-behavior</artifactId>

    <dependencies>
        <dependency>
            <groupId>com.heima</groupId>
            <artifactId>heima-leadnews-common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>com.heima</groupId>
            <artifactId>heima-leadnews-model</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>com.heima</groupId>
            <artifactId>heima-leadnews-utils</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

    </dependencies>
</project>

2)启动类

package com.heima.behavior;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;

/**
 * 行为微服务
 */
@SpringBootApplication
@EnableDiscoveryClient
public class BehaviorApplication {
    public static void main(String[] args) {
        SpringApplication.run(BehaviorApplication.class,args);
    }
}

3)配置bootstrap.yml

server:
  port: 9006
spring:
  application:
    name: leadnews-behavior
  cloud:
    nacos:
      discovery:
        server-addr: 192.168.66.133:8848
      config:
        server-addr: 192.168.66.133:8848
        file-extension: yml

4)在Nacos添加行为微服务配置

spring:
  redis:
    host: 192.168.66.133
    password: leadnews
    port: 6379
    database: 2

5)在App网关添加路由

        # 行为管理
        - id: behavior
          uri: lb://leadnews-behavior
          predicates:
            - Path=/behavior/**
          filters:
            - StripPrefix= 1

10、文章点赞行为:功能实现

1)过滤器获取登录用户信息

package com.heima.behavior.filter;

import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.heima.model.user.pojos.ApUser;
import com.heima.utils.common.ThreadLocalUtils;
import org.springframework.stereotype.Component;

import javax.servlet.*;
import javax.servlet.annotation.WebFilter;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;

/**
 * App端用户token获取过滤器
 *   filterName: 过滤器名称
 */
@Component
@WebFilter(filterName = "AppTokenFilter",urlPatterns = "/*")
public class AppTokenFilter extends GenericFilter {

    /**
     * 过滤方法
     * @param servletRequest
     * @param servletResponse
     * @param filterChain
     * @throws IOException
     * @throws ServletException
     */
    @Override
    public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
        HttpServletRequest request = (HttpServletRequest)servletRequest;
        HttpServletResponse response = (HttpServletResponse) servletResponse;

        //从请求头中取出userId
        String userId = request.getHeader("userId");

        //判断userId是否存在
        if(StringUtils.isNotEmpty(userId) && !userId.equals("0")){  //只有登录用户才存入ThreadLocal
            //存入ThreadLocal中
            ApUser user = new ApUser();
            user.setId(Integer.valueOf(userId));
            ThreadLocalUtils.set(user);
        }

        try {
            //放行
            filterChain.doFilter(request,response);
        } finally {
            //手动移除ThreadLocal内存数据,防止内存溢出
            ThreadLocalUtils.remove();
        }
    }
}



2)Dto对象

package com.heima.model.behavior.dtos;

import lombok.Data;

@Data
public class LikesBehaviorDto {


    // 文章、动态、评论等ID
    Long articleId;
    /**
     * 喜欢内容类型
     * 0文章
     * 1动态
     * 2评论
     */
    Short type;

    /**
     * 喜欢操作方式
     * 0 点赞
     * 1 取消点赞
     */
    Short operation;
}

3)Controller

package com.heima.behavior.controller.v1;

import com.heima.behavior.service.ApLikesBehaviorService;
import com.heima.common.dtos.ResponseResult;
import com.heima.model.behavior.dtos.LikesBehaviorDto;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 *  点赞
 */
@RestController
public class ApLikesBehaviorController {
    @Autowired
    private ApLikesBehaviorService apLikesBehaviorService;
    /**
     * 用户文章点赞
     */
    @PostMapping("/api/v1/likes_behavior")
    public ResponseResult likesBehavior(@RequestBody LikesBehaviorDto dto){
        return apLikesBehaviorService.likesBehavior(dto);
    }
}


4)Service

Redis常量

package com.heima.common.constants;

public abstract class RedisConstants {
    public static String TASK_TOPIC_PREFIX = "task_topic_";
    public static final String HOT_ARTICLE_FIRST_PAGE = "hot_article_first_page_";//热点文章缓存key
    public static final String DEFAULT_TAG="__all__";//推荐频道
    public static final String LIKE_BEHAVIOR = "likes_behavior_";//点赞行为key
}
package com.heima.behavior.service.impl;

import com.heima.behavior.service.ApLikesBehaviorService;
import com.heima.common.constants.RedisConstants;
import com.heima.common.dtos.AppHttpCodeEnum;
import com.heima.common.dtos.ResponseResult;
import com.heima.common.exception.LeadNewsException;
import com.heima.model.behavior.dtos.LikesBehaviorDto;
import com.heima.model.user.pojos.ApUser;
import com.heima.utils.common.JsonUtils;
import com.heima.utils.common.ThreadLocalUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;

@Service
public class ApLikesBehaviorServiceImpl implements ApLikesBehaviorService {
    @Autowired
    private StringRedisTemplate redisTemplate;

    @Override
    public ResponseResult likesBehavior(LikesBehaviorDto dto) {
        //判断用户是否登录
        ApUser apUser = (ApUser)ThreadLocalUtils.get();
        if(apUser==null){
            //为游客
            throw new LeadNewsException(AppHttpCodeEnum.NEED_LOGIN);
        }

        /**
         * 点赞记录hash结果
         *   key: articleId
         *   value:
         *       hashKey: userId
         *       hashValue: 时间
         */
        String key = RedisConstants.LIKE_BEHAVIOR+dto.getArticleId().toString();
        String hashKey = apUser.getId().toString();
        //判断点赞还是取消点赞
        if(dto.getOperation()==0){
            //点赞

            //添加redis记录
            Map<String,Object> value = new HashMap<>();
            value.put("time",new Date());
            redisTemplate.opsForHash().put(key,hashKey, JsonUtils.toString(value));
        }else{
            //取消点赞

            //删除redis记录
            redisTemplate.opsForHash().delete(key,hashKey);
        }

        return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
    }
}

11、文章点赞行为:文章ID精度丢失问题

1654313668415

1)解决方案分析

方案一:将文章的id的由long类型手动改为String类型,可以解决此问题。(需要修改表结构)

方案二:可以使用自定义jackson序列化解决,自行把Long转换为String(本项目采用这种方案)

2)jackson序列化

在ApArticle实体上添加@JsonSerialize注解,指定自定义的序列化器

 1626756008443

    @TableId(value = "id", type = IdType.ID_WORKER)
    //自定义json序列化器
    @JsonSerialize(using = Long2StringSerializer.class)
    private Long id;

在heima-leadnews-common项目自定义Long2StringSerializer序列化类

 1626756050257

package com.heima.common.json;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;

import java.io.IOException;

/**
 * Long转换String的序列化器类
 *   泛型:源类型
 */
public class Long2StringSerializer extends JsonSerializer<Long> {
    @Override
    public void serialize(Long value, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException {
        jsonGenerator.writeString(value.toString());
    }
}


12、热点文章实时计算:计算定时计算与实时计算

image-20210730201509223

1657942815717

13、热点文章实时计算:实时流式计算概念

1)概念

一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算。

1588517914652

流式计算就相当于上图的右侧扶梯,是可以源源不断的产生数据,源源不断的接收数据,没有边界。

2)应用场景

  • 日志分析

    网站的用户访问日志进行实时的分析,计算访问量,用户画像,留存率等等,实时的进行数据分析,帮助企业进行决策

  • 大屏看板统计

    可以实时的查看网站注册数量,订单数量,购买数量,金额等。

  • 公交实时数据

    可以随时更新公交车方位,计算多久到达站牌等

  • 实时文章分值计算

    头条类文章的分值计算,通过用户的行为实时文章的分值,分值越高就越被推荐。

流式计算应用场景:并发量高,实时要求高,准确度高

3)技术方案选型

​ OpenResty+Lua脚本(难点高)

  • Hadoop

    1588518932145

  • Apache Storm

    Storm 是一个分布式实时大数据处理系统,可以帮助我们方便地处理海量数据,具有高可靠、高容错、高扩展的特点。是流式框架,有很高的数据吞吐能力。

  • Kafka Stream (本项目使用)百万并发

    可以轻松地将其嵌入任何Java应用程序中,并与用户为其流应用程序所拥有的任何现有打包,部署和操作工具集成。

14、热点文章实时计算:KStream和KTable

1)概述

Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。

Kafka Stream的特点如下:

  • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
  • 除了Kafka外,无任何外部依赖
  • 充分利用Kafka分区机制实现水平扩展和顺序性保证
  • 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)
  • 支持正好一次处理语义
  • 提供记录级的处理能力,从而实现毫秒级的低延迟
  • 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
  • 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)

2)Kafka Streams的关键概念

(1)Stream处理拓扑

  • 是Kafka Stream提出的最重要的抽象概念:它表示一个无限的,不断更新的数据集。流是一个有序的,可重放(反复的使用),不可变的容错序列,数据记录的格式是键值对(key-value)。
  • 通过Kafka Streams编写一个或多个的计算逻辑的处理器拓扑。其中处理器拓扑是一个由流(边缘)连接的流处理(节点)的图。
  • 流处理器处理器拓扑中的一个节点;它表示一个处理的步骤,用来转换流中的数据(从拓扑中的上游处理器一次接受一个输入消息,并且随后产生一个或多个输出消息到其下游处理器中)。

(2)在拓扑中有两个特别的处理器:

  • 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。
  • Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题

1588520036121

1654324621434

2)KStream&KTable

(1)数据结构类似于map,如下图,key-value键值对

1588521104765

(2)KStream

KStream数据流(data stream),即是一段顺序的,可以无限长,不断更新的数据集。
数据流中比较常记录的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。

KStream负责抽象的,就是数据流。与Kafka自身topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据。

为了说明这一点,让我们想象一下以下两个数据记录正在发送到流中:

(“ alice”,1)->(“” alice“,3)

如果您的流处理应用是要总结每个用户的价值,它将返回4alice。为什么?因为第二条数据记录将不被视为先前记录的更新。(insert)新数据

(3)KTable

KTable传统数据库,包含了各种存储了大量状态(state)的表格。KTable负责抽象的,就是表状数据。每一次操作,都是更新插入(update)

为了说明这一点,让我们想象一下以下两个数据记录正在发送到流中:

(“ alice”,1)->(“” alice“,3)

如果您的流处理应用是要总结每个用户的价值,它将返回3alice。为什么?因为第二条数据记录将被视为先前记录的更新。

KStream - 每个新数据都包含了部分信息。

KTable - 每次更新都合并到原记录上。

1657944184027

15、热点文章实时计算:入门案例说明

需求:

每隔5秒统一1次消息中的单词数量

上游系统的消息格式:

key value

1001 hello kafka 10

1002 hello heima 10

下游系统接受的结果:

hello 20

kafka 10

heima 10

 1629427054845

1)引入依赖

在之前的kafka-demo工程的pom文件中引入

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>${kafka.client.version}</version>
    <exclusions>
        <exclusion>
            <artifactId>connect-json</artifactId>
            <groupId>org.apache.kafka</groupId>
        </exclusion>
        <exclusion>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </exclusion>
    </exclusions>
</dependency>

2)创建类

package com.heima.kafka.simple_stream;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class SampleStream {

    public static void main(String[] args) {
        //设置参数
        Properties props = new Properties();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.66.133:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.APPLICATION_ID_CONFIG,"simpleStream");

        //创建StreamBuilder
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamProcess(streamsBuilder);

        //创建KafkaStreams对象
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),props);

        //启动程序
        kafkaStreams.start();
    }

    /**
     * 核心统计逻辑
     * @param streamsBuilder
     */
    private static void streamProcess(StreamsBuilder streamsBuilder) {

        //从上游接受原始数据
        KStream<String, String> kStream = streamsBuilder.stream("input_topic");

        //对原始数据分析统计(***)
        //1)对value数据进行处理(切割)
        //2)把value值覆盖key值
        //3) 对key进行分组
        //4) 对分组结果统计
        KTable<Windowed<Object>, Long> kTable = kStream.flatMapValues(new ValueMapper<String, Iterable<?>>() {
            @Override
            public Iterable<?> apply(String value) { //hello kafka
                String[] array = value.split(" ");
                return Arrays.asList(array);
            }
        }).map(new KeyValueMapper<String, Object, KeyValue<?, ?>>() {
            @Override
            public KeyValue<?, ?> apply(String key, Object value) {
                return new KeyValue<>(value, value);
            }
        }).groupByKey()
                .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
                .count(Materialized.as("total"));

        //把统计结果发送给下游
        kTable.toStream()
                .map(new KeyValueMapper<Windowed<Object>, Long, KeyValue<?, ?>>() {
                    @Override
                    public KeyValue<?, ?> apply(Windowed<Object> windowed, Long value) {
                        return new KeyValue<>(windowed.key(),value.toString());
                    }
                })
                .to("output_topic");

    }
}


3)测试

准备

  • 使用生产者在topic为:input_topic中发送多条消息
  • 使用消费者接收topic为:out_topic

①生产者代码修改SampleStreamProducer

package com.heima.simple_stream;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * 消息生产者
 */
public class ProducerStream {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //准备参数
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.66.133:9092");//服务器连接地址
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//key序列化器
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//value序列化器
        props.put(ProducerConfig.RETRIES_CONFIG,"5");//生产者消息发送失败后的重试次数,

        //连接Kafka
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(props);

        //创建消息对象
        /**
         * ProducerRecord:封装消息的对象
         *   参数一:队列的主题名称
         *   参数二:key
         *   参数三:value
         */
        for(int i=1;i<=20;i++){
            if(i%2==0){
                ProducerRecord<String,String> record = new ProducerRecord<>("input_topic","hello kafka");
                kafkaProducer.send(record);
            }else{
                ProducerRecord<String,String> record = new ProducerRecord<>("input_topic","hello heima");
                kafkaProducer.send(record);
            }
        }

        //释放连接
        kafkaProducer.close();

    }
}


②消费者SampleStreamConsumer

package com.heima.simple_stream;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * 消息消费者者
 */
public class ConsumerStream {

    public static void main(String[] args) {
        //准备参数
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.66.133:9092");//服务器连接地址
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");//key序列化器
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");//value序列化器
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"demo_group");//消费组名称
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");//消费组名称

        /**
         * Kafka的消费者名称
         *   多个消费者在监听同一个主题的前提下。
         *    1) 如果消费组名不同,该主题下的所有消费者同时接收到消息
         *    2)如果消费组名称相同,该主题下的消费者只要一个被接收到消息
         */

        //连接Kafka
        KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(props);

        //监听主题
        kafkaConsumer.subscribe(Collections.singleton("output_topic"));

        while(true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(3000));
            for(ConsumerRecord<String,String> record:records){
                String key = record.key();
                String value = record.value();
                System.out.println("消息:"+key+"-----"+value);
            }
        }
    }
}


结果:

  • 通过流式计算,会把生产者的多条消息汇总成一条发送到消费者中输出

16、课程总结

1)热点文章定时计算

​ 技术:XXL-Job定时任务

​ 搭建XXLJob服务端(调度中心,执行器,任务)

​ 客户端整合XXLJob(依赖,yml配置,Config配置类,@XXlJob注解)

​ 编写计算热点文章方法(查询近几天文章,数据排序,截取数据,存入redis)

2)文章点赞行为

​ 登录用户才可以做,包含点赞和取消点赞

标签:10,heima,文章,import,org,com,public
From: https://www.cnblogs.com/IsMhhla/p/16903767.html

相关文章

  • 关于VS2022 报错:MSB3027 无法将"xxx.dll"复制到"xxx"超出了重试计数 10 问题分析与解
    从标题我们不难看出,这个问题实际就是系统不能够将这个dll文件那过来放到其他目录去。其实我不知道大家是一个什么思路哈,我一开始就是无脑的去网上查找解决方案,但是在网上......
  • Win10系统安装U盘,安装纯净版Win10的通用教程
    安装前准备:1、准备8G或8G以上U盘(32G以内)。2、制作U盘会格式化U盘,U盘内的重要文件也要事先备份好。操作步骤:1、打开微软下载WIN10网址:(如果网址无法打开,可以下载解压运行附件......
  • win10 怎样隐藏某个指定盘?比如隐藏D盘?
     1、ctrl+R,输入 gpedit.msc2、看图操作  3、  4、比如隐藏所有盘符选择如下,选择“已启用”,再挑选选项,点击"应用"后,ctrl+E后,再进入"我的电脑",所有盘符都......
  • DTOJ-2022-11-10-测试-题解
    题目链接ABCA这个套路已经出现了很多次了就是两条线之间的网格图路径数,做法呢就是容斥题意求满足以下条件的\(n\timesm\)的矩阵的个数对\(10^9+7\)取模对于......
  • win10安装cuda、cuDNN和pytorch笔记
    特别注意:由于自己安装时没有做记录,所以下面大部分安装步骤图片都是参考的网络图,但不影响阅读,每一步都讲得很详细1.安装CUDA1.1查看自己显卡最高支持的CUDA版本在桌面......
  • win10安装LLVM
    1、缺少fastBPE在https://github.com/glample/fastBPE下载并拷贝到文件夹下2、安装libclangwin10下安装llvm和clang前提条件:Windows10环境下VS2015已安装,WindowsSDK已......
  • MBR10200FAC-ASEMI塑封肖特基二极管MBR10200FAC
    编辑:llMBR10200FAC-ASEMI塑封肖特基二极管MBR10200FAC型号:MBR10200FAC品牌:ASEMI封装:ITO-220AC特性:肖特基二极管正向电流:10A反向耐压:200V恢复时间:5ns引脚数量:2芯片个数:1芯片......
  • MBR10200FAC-ASEMI塑封肖特基二极管MBR10200FAC
    编辑:llMBR10200FAC-ASEMI塑封肖特基二极管MBR10200FAC型号:MBR10200FAC品牌:ASEMI封装:ITO-220AC特性:肖特基二极管正向电流:10A反向耐压:200V恢复时间:5ns引脚数量:2芯......
  • Java新特性(2):Java 10以后
    您好,我是湘王,这是我的博客园,欢迎您来,欢迎您再来~ 虽然到目前为止Java的版本更新还没有什么惊天动地的改变,但总是会冒出一些有趣的小玩意。前面列举了Java9和Java10的一些......
  • 【2022.11.17】N5105安装PVE系统,关联proxmox
    下载、安装PVE系统先去PVE官网下载新版的ISO文件:ProxmoxVE7.2ISOInstaller写入磁盘后直接进入BIOS,选择U盘启动选择同意选择磁盘如果有网络的话,不用选择,没网络的......