01、学习目标
- 能够搭建XXL-Job环境搭建
- 能够完成热点文章定时缓存
02、热点文章定时计算:查询所有频道Feign接口
计算完成新热数据后,需要给每个频道缓存一份数据,所以需要查询所有频道信息
在heima-leadnews-api定义远程接口
package com.heima.wemedia.feign;
import com.heima.common.dtos.ResponseResult;
import com.heima.model.wemedia.pojos.WmChannel;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import java.util.List;
@FeignClient(name = "leadnews-wemedia")
public interface WemediaFeign {
/**
* 查询所有频道
*/
@GetMapping("/api/v1/channel/channels")
public ResponseResult<List<WmChannel>> channels();
}
heima-leadnews-wemedia端提供接口(已提供)
package com.heima.wemedia.controller.v1;
import com.heima.common.dtos.ResponseResult;
import com.heima.wemedia.service.WmChannelService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api/v1/channel")
public class WmchannelController {
@Autowired
private WmChannelService wmChannelService;
@GetMapping("/channels")
public ResponseResult findAll(){
return wmChannelService.findAll();
}
}
03、热点文章定时计算:功能实现(1)-查询前5天文章
1)Mapper提供查询前5天文章
修改ApArticleMapper类
package com.heima.article.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.heima.model.article.dtos.ArticleHomeDto;
import com.heima.model.article.pojos.ApArticle;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.simpleframework.xml.Path;
import java.util.Date;
import java.util.List;
public interface ApArticleMapper extends BaseMapper<ApArticle> {
List<ApArticle> loadApArticle(@Param("dto") ArticleHomeDto dto, @Param("type") int type);
List<ApArticle> findArticleListByLastDays(@Param("lastDays") Date lastDays);
}
在ApArticleMapper.xml新增方法
<!-- 查询最近发布的文章 -->
<select id="findArticleListByLastDays" resultType="com.heima.model.article.pojos.ApArticle">
SELECT
aa.*
FROM
ap_article aa
INNER JOIN ap_article_config ac
ON ac.`article_id` = aa.`id`
WHERE
ac.`is_down` = 0
AND ac.`is_delete` = 0
AND aa.`publish_time` >= #{lastDays}
</select>
2)编写业务逻辑
新建业务接口
package com.heima.article.service;
/**
* 热点文章业务
*/
public interface HotArticleService {
/**
* 定时计算热点文章,进行缓存
*/
public void computeHotArticle();
}
实现
package com.heima.article.service.impl;
import com.heima.article.mapper.ApArticleMapper;
import com.heima.article.service.HotArticleService;
import com.heima.model.article.pojos.ApArticle;
import org.joda.time.DateTime;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.List;
@Service
public class HotArticleServiceImpl implements HotArticleService {
@Autowired
private ApArticleMapper apArticleMapper;
@Override
public void computeHotArticle() {
//查询最近5天的文章(所有频道的文章)
//获取前第5天的时间
Date lastDay = DateTime.now().minusDays(5).toDate();
List<ApArticle> articleList = apArticleMapper.findArticleListByLastDays(lastDay);
//统计所有文章的分值(阅读,点赞,收藏,评论)
//按照不同频道筛选出文章,对文章进行分值倒序,截取前30条,存入redis数据库
}
}
04、热点文章定时计算:功能实现(2)-计算文章分值
1)Vo对象
定义Vo对象,用于存储带分值的文章
package com.heima.model.article.dtos;
import com.heima.model.article.pojos.ApArticle;
import lombok.Data;
@Data
public class HotArticleVo extends ApArticle {
/**
* 文章分值
*/
private Integer score;
}
定义文章分值权限常量
package com.heima.common.constants;
public class ArticleConstants {
public static final Integer HOT_ARTICLE_LIKE_WEIGHT = 3;
public static final Integer HOT_ARTICLE_COMMENT_WEIGHT = 5;
public static final Integer HOT_ARTICLE_COLLECTION_WEIGHT = 8;
}
2)计算分值逻辑
package com.heima.article.service.impl;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.heima.article.mapper.ApArticleMapper;
import com.heima.article.service.HotArticleService;
import com.heima.common.constants.ArticleConstants;
import com.heima.model.article.dtos.HotArticleVo;
import com.heima.model.article.pojos.ApArticle;
import com.heima.utils.common.BeanHelper;
import org.joda.time.DateTime;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@Service
public class HotArticleServiceImpl implements HotArticleService {
@Autowired
private ApArticleMapper apArticleMapper;
@Override
public void computeHotArticle() {
//查询最近5天的文章(所有频道的文章)
//获取前第5天的时间
Date lastDay = DateTime.now().minusDays(5).toDate();
List<ApArticle> articleList = apArticleMapper.findArticleListByLastDays(lastDay);
//统计所有文章的分值(阅读,点赞,收藏,评论)
List<HotArticleVo> hotArticleVoList = computeHotArticleScore(articleList);
//按照不同频道筛选出文章,对文章进行分值倒序,截取前30条,存入redis数据库
}
/**
* 统计所有文章的分值
* @param articleList
* @return
*/
private List<HotArticleVo> computeHotArticleScore(List<ApArticle> articleList) {
List<HotArticleVo> hotArticleVoList = new ArrayList<>();
if(CollectionUtils.isNotEmpty(articleList)){
for(ApArticle apArticle:articleList){
HotArticleVo hotArticleVo = BeanHelper.copyProperties(apArticle,HotArticleVo.class);
//计算分值
int score = computeScore(apArticle);
hotArticleVo.setScore(score);
hotArticleVoList.add(hotArticleVo);
}
}
return hotArticleVoList;
}
/**
* 计算一篇文章的分值
* @param apArticle
* @return
*/
private int computeScore(ApArticle apArticle) {
int score = 0;
//阅读分值
if(apArticle.getViews()!=null){
score+=apArticle.getViews();
}
//点赞
if(apArticle.getLikes()!=null){
score+=apArticle.getLikes()* ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT; //加权
}
//评论
if(apArticle.getComment()!=null){
score+=apArticle.getComment()* ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT; //加权
}
//收藏
if(apArticle.getCollection()!=null){
score+=apArticle.getCollection()* ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT; //加权
}
return score;
}
}
05、热点文章定时计算:功能实现(3)-缓存文章到Redis
1)article微服务整合Redis
依赖导入
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
添加配置(Nacos中)
spring:
redis:
host: 192.168.66.133
password: leadnews
port: 6379
database: 1
2)article微服务开启Feign
依赖导入
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>com.heima</groupId>
<artifactId>heima-leadnews-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
开启Feign
3)编写业务逻辑
Redis常量类:
package com.heima.common.constants;
public abstract class RedisConstants {
public static String TASK_TOPIC_PREFIX = "task_topic_";
public static final String HOT_ARTICLE_FIRST_PAGE = "hot_article_first_page_";
public static final String DEFAULT_TAG="__all__";
}
package com.heima.article.service.impl;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.heima.article.mapper.ApArticleMapper;
import com.heima.article.service.HotArticleService;
import com.heima.common.constants.ArticleConstants;
import com.heima.common.constants.RedisConstants;
import com.heima.common.dtos.ResponseResult;
import com.heima.model.article.dtos.HotArticleVo;
import com.heima.model.article.pojos.ApArticle;
import com.heima.model.wemedia.pojos.WmChannel;
import com.heima.utils.common.BeanHelper;
import com.heima.utils.common.JsonUtils;
import com.heima.wemedia.feign.WemediaFeign;
import org.joda.time.DateTime;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
@Service
public class HotArticleServiceImpl implements HotArticleService {
@Autowired
private ApArticleMapper apArticleMapper;
@Autowired
private WemediaFeign wemediaFeign;
@Autowired
private StringRedisTemplate redisTemplate;
@Override
public void computeHotArticle() {
//查询最近5天的文章(所有频道的文章)
//获取前第5天的时间
Date lastDay = DateTime.now().minusDays(5).toDate();
List<ApArticle> articleList = apArticleMapper.findArticleListByLastDays(lastDay);
//统计所有文章的分值(阅读,点赞,收藏,评论)
List<HotArticleVo> hotArticleVoList = computeHotArticleScore(articleList);
//按照不同频道筛选出文章,对文章进行分值倒序,截取前30条,存入redis数据库
//查询所有频道
ResponseResult<List<WmChannel>> responseResult = wemediaFeign.channels();
if(responseResult.getCode().equals(200)){
List<WmChannel> channelList = responseResult.getData();
if(CollectionUtils.isNotEmpty(channelList)){
//指定频道(有频道)
for(WmChannel channel:channelList){
/* List<HotArticleVo> currentChannelHotArticles = new ArrayList<>();
for(HotArticleVo vo:hotArticleVoList){
if(vo.getChannelId().equals(channel.getId())){
currentChannelHotArticles.add(vo);
}
}*/
List<HotArticleVo> currentChannelHotArticles = hotArticleVoList.stream()
.filter(vo->vo.getChannelId().equals(channel.getId()))
.collect(Collectors.toList());
String key = RedisConstants.HOT_ARTICLE_FIRST_PAGE+channel.getId();
sortAndCacheHotArticle(key,currentChannelHotArticles);
}
//推荐频道(没有选择频道)
String key = RedisConstants.HOT_ARTICLE_FIRST_PAGE+RedisConstants.DEFAULT_TAG;
sortAndCacheHotArticle(key,hotArticleVoList);
}
}
}
/**
* 对文章分值倒序,截取前30条,存入Redis
* @param key
* @param hotArticleVoList
*/
private void sortAndCacheHotArticle(String key, List<HotArticleVo> hotArticleVoList) {
//对文章按分值倒序
//方式一:使用Collection的sort方法
/**
* 升序:o1放前面
* 降序:o2放前面
*/
/*hotArticleVoList.sort(new Comparator<HotArticleVo>() {
@Override
public int compare(HotArticleVo o1, HotArticleVo o2) {
return o2.getScore().compareTo(o1.getScore());
}
});*/
//方式二:使用Collection的stream方法
/**
* 升序:默认 .sorted(Comparator.comparing(HotArticleVo::getScore))
* 降序:添加reversed()反转方法 .sorted(Comparator.comparing(HotArticleVo::getScore).reversed())
*/
hotArticleVoList = hotArticleVoList.stream()
.sorted(Comparator.comparing(HotArticleVo::getScore).reversed())
.collect(Collectors.toList());
//截取前30条
if(hotArticleVoList.size()>30){
hotArticleVoList = hotArticleVoList.subList(0,30);
}
//存入Redis
redisTemplate.opsForValue().set(key, JsonUtils.toString(hotArticleVoList));
}
/**
* 统计所有文章的分值
* @param articleList
* @return
*/
private List<HotArticleVo> computeHotArticleScore(List<ApArticle> articleList) {
List<HotArticleVo> hotArticleVoList = new ArrayList<>();
if(CollectionUtils.isNotEmpty(articleList)){
for(ApArticle apArticle:articleList){
HotArticleVo hotArticleVo = BeanHelper.copyProperties(apArticle,HotArticleVo.class);
//计算分值
int score = computeScore(apArticle);
hotArticleVo.setScore(score);
hotArticleVoList.add(hotArticleVo);
}
}
return hotArticleVoList;
}
/**
* 计算一篇文章的分值
* @param apArticle
* @return
*/
private int computeScore(ApArticle apArticle) {
int score = 0;
//阅读分值
if(apArticle.getViews()!=null){
score+=apArticle.getViews();
}
//点赞
if(apArticle.getLikes()!=null){
score+=apArticle.getLikes()* ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT; //加权
}
//评论
if(apArticle.getComment()!=null){
score+=apArticle.getComment()* ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT; //加权
}
//收藏
if(apArticle.getCollection()!=null){
score+=apArticle.getCollection()* ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT; //加权
}
return score;
}
}
06、热点文章定时计算:定时更新热点文章
1)创建任务
新建执行器:leadnews-hot-article-executor
新建任务:路由策略为轮询,Cron表达式:0 0 2 * * ?
2)article微服务整合XXL-Job
在article微服务导入依赖
<!--xxl-job-->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.0</version>
</dependency>
在nacos配置新增配置
xxl:
job:
admin:
addresses: http://localhost:8888/xxl-job-admin
executor:
appname: leadnews-hot-article-executor
port: 9999
XxlJobConfig
package com.heima.article.config;
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* xxl-job config
*
* @author xuxueli 2017-04-28
*/
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.port}")
private int port;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setPort(port);
return xxlJobSpringExecutor;
}
}
3)编写定时任务类
package com.heima.article.job;
import com.heima.article.service.HotArticleService;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 定时更新首页热点文章
*/
@Component
@Slf4j
public class ComputeHotArticleJob {
@Autowired
private HotArticleService hotArticleService;
@XxlJob("hotArticleComputeJob")
public void hotArticleComputeJob(){
log.info("定时更新首页热点文章");
hotArticleService.computeHotArticle();
}
}
执行后,效果如下:
07、热点文章定时计算:查询文章接口改造
1)思路分析
2)功能实现
接口:
public interface ApArticleService extends IService<ApArticle> {
ResponseResult<List<ApArticle>> loadApArticles(ArticleHomeDto dto, int type);
ResponseResult saveArticle(ArticleDto dto);
/**
* 加载文章列表
*/
public ResponseResult loadV2(ArticleHomeDto dto,Integer type);
}
实现:
@Override
public ResponseResult loadApArticleFromCache(ArticleDto dto, int type) {
//从缓存查询热点文章
String key = RedisConstants.HOT_ARTICLE_FIRST_PAGE+dto.getTag();
String redisData = redisTemplate.opsForValue().get(key);
List<ApArticle> articleList = null;
if(StringUtils.isNotEmpty(redisData)){
List<HotArticleVo> hotArticleVoList = JsonUtils.toList(redisData, HotArticleVo.class);
articleList = BeanHelper.copyWithCollection(hotArticleVoList,ApArticle.class);
return ResponseResult.okResult(articleList);
}else{
return loadApArticle(dto,type);
}
}
修改Controller
/**
* 加载首页
*/
@PostMapping("/load")
public ResponseResult load(@RequestBody ArticleHomeDto dto){
//return apArticleService.loadApArticle(dto,1);
return apArticleService.loadApArticleV2(dto,1);
}
08、文章点赞行为:需求分析
1)需求分析
- 当前登录的用户点击了”赞“,就要保存当前行为数据(游客不能点赞)
- 可以取消点赞
点赞需要存储的数据:
1)绑定用户ID
2)绑定文章ID
一篇文章可以被多个用户点赞(文章ID和用户ID是一对多)
Hash结构:
< 文章ID, <用户ID,点赞记录数据> >
1 1 xxxx
2 xxxx
3 xxxx
电商的购物车:用户ID,商品ID,用户ID和商品ID是一对多
Hash结构:
< 用户ID, <商品ID,购买数据对象> >
get(用户ID).get(商品ID)
List结构
< 用户ID, <购买数据对象(包含商品ID)> >
购买数据包括:商品购买数量,购买时价格,商品名称
注意:需要记录这个赞谁点的?
注意:点赞数据存在Redis/MongoDB中
2)点赞的数据结构
我们可以使用Redis的Hash结构(双层Map结构)存储点赞数据
Key:文章ID
Value:这里的value就使用hash结构,key为userId,value为Dto对象(Dto为接受点赞请求的对象)10
1001:文章ID
1: xx
2:xxx
3:xxxx
09、文章点赞行为:搭建行为微服务
1)创建项目和导入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>heima-leadnews-service</artifactId>
<groupId>com.heima</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>heima-leadnews-behavior</artifactId>
<dependencies>
<dependency>
<groupId>com.heima</groupId>
<artifactId>heima-leadnews-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.heima</groupId>
<artifactId>heima-leadnews-model</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.heima</groupId>
<artifactId>heima-leadnews-utils</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>
</project>
2)启动类
package com.heima.behavior;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
/**
* 行为微服务
*/
@SpringBootApplication
@EnableDiscoveryClient
public class BehaviorApplication {
public static void main(String[] args) {
SpringApplication.run(BehaviorApplication.class,args);
}
}
3)配置bootstrap.yml
server:
port: 9006
spring:
application:
name: leadnews-behavior
cloud:
nacos:
discovery:
server-addr: 192.168.66.133:8848
config:
server-addr: 192.168.66.133:8848
file-extension: yml
4)在Nacos添加行为微服务配置
spring:
redis:
host: 192.168.66.133
password: leadnews
port: 6379
database: 2
5)在App网关添加路由
# 行为管理
- id: behavior
uri: lb://leadnews-behavior
predicates:
- Path=/behavior/**
filters:
- StripPrefix= 1
10、文章点赞行为:功能实现
1)过滤器获取登录用户信息
package com.heima.behavior.filter;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.heima.model.user.pojos.ApUser;
import com.heima.utils.common.ThreadLocalUtils;
import org.springframework.stereotype.Component;
import javax.servlet.*;
import javax.servlet.annotation.WebFilter;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
/**
* App端用户token获取过滤器
* filterName: 过滤器名称
*/
@Component
@WebFilter(filterName = "AppTokenFilter",urlPatterns = "/*")
public class AppTokenFilter extends GenericFilter {
/**
* 过滤方法
* @param servletRequest
* @param servletResponse
* @param filterChain
* @throws IOException
* @throws ServletException
*/
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest request = (HttpServletRequest)servletRequest;
HttpServletResponse response = (HttpServletResponse) servletResponse;
//从请求头中取出userId
String userId = request.getHeader("userId");
//判断userId是否存在
if(StringUtils.isNotEmpty(userId) && !userId.equals("0")){ //只有登录用户才存入ThreadLocal
//存入ThreadLocal中
ApUser user = new ApUser();
user.setId(Integer.valueOf(userId));
ThreadLocalUtils.set(user);
}
try {
//放行
filterChain.doFilter(request,response);
} finally {
//手动移除ThreadLocal内存数据,防止内存溢出
ThreadLocalUtils.remove();
}
}
}
2)Dto对象
package com.heima.model.behavior.dtos;
import lombok.Data;
@Data
public class LikesBehaviorDto {
// 文章、动态、评论等ID
Long articleId;
/**
* 喜欢内容类型
* 0文章
* 1动态
* 2评论
*/
Short type;
/**
* 喜欢操作方式
* 0 点赞
* 1 取消点赞
*/
Short operation;
}
3)Controller
package com.heima.behavior.controller.v1;
import com.heima.behavior.service.ApLikesBehaviorService;
import com.heima.common.dtos.ResponseResult;
import com.heima.model.behavior.dtos.LikesBehaviorDto;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 点赞
*/
@RestController
public class ApLikesBehaviorController {
@Autowired
private ApLikesBehaviorService apLikesBehaviorService;
/**
* 用户文章点赞
*/
@PostMapping("/api/v1/likes_behavior")
public ResponseResult likesBehavior(@RequestBody LikesBehaviorDto dto){
return apLikesBehaviorService.likesBehavior(dto);
}
}
4)Service
Redis常量
package com.heima.common.constants;
public abstract class RedisConstants {
public static String TASK_TOPIC_PREFIX = "task_topic_";
public static final String HOT_ARTICLE_FIRST_PAGE = "hot_article_first_page_";//热点文章缓存key
public static final String DEFAULT_TAG="__all__";//推荐频道
public static final String LIKE_BEHAVIOR = "likes_behavior_";//点赞行为key
}
package com.heima.behavior.service.impl;
import com.heima.behavior.service.ApLikesBehaviorService;
import com.heima.common.constants.RedisConstants;
import com.heima.common.dtos.AppHttpCodeEnum;
import com.heima.common.dtos.ResponseResult;
import com.heima.common.exception.LeadNewsException;
import com.heima.model.behavior.dtos.LikesBehaviorDto;
import com.heima.model.user.pojos.ApUser;
import com.heima.utils.common.JsonUtils;
import com.heima.utils.common.ThreadLocalUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@Service
public class ApLikesBehaviorServiceImpl implements ApLikesBehaviorService {
@Autowired
private StringRedisTemplate redisTemplate;
@Override
public ResponseResult likesBehavior(LikesBehaviorDto dto) {
//判断用户是否登录
ApUser apUser = (ApUser)ThreadLocalUtils.get();
if(apUser==null){
//为游客
throw new LeadNewsException(AppHttpCodeEnum.NEED_LOGIN);
}
/**
* 点赞记录hash结果
* key: articleId
* value:
* hashKey: userId
* hashValue: 时间
*/
String key = RedisConstants.LIKE_BEHAVIOR+dto.getArticleId().toString();
String hashKey = apUser.getId().toString();
//判断点赞还是取消点赞
if(dto.getOperation()==0){
//点赞
//添加redis记录
Map<String,Object> value = new HashMap<>();
value.put("time",new Date());
redisTemplate.opsForHash().put(key,hashKey, JsonUtils.toString(value));
}else{
//取消点赞
//删除redis记录
redisTemplate.opsForHash().delete(key,hashKey);
}
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}
}
11、文章点赞行为:文章ID精度丢失问题
1)解决方案分析
方案一:将文章的id的由long类型手动改为String类型,可以解决此问题。(需要修改表结构)
方案二:可以使用自定义jackson序列化解决,自行把Long转换为String(本项目采用这种方案)
2)jackson序列化
在ApArticle实体上添加@JsonSerialize注解,指定自定义的序列化器
@TableId(value = "id", type = IdType.ID_WORKER)
//自定义json序列化器
@JsonSerialize(using = Long2StringSerializer.class)
private Long id;
在heima-leadnews-common项目自定义Long2StringSerializer序列化类
package com.heima.common.json;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import java.io.IOException;
/**
* Long转换String的序列化器类
* 泛型:源类型
*/
public class Long2StringSerializer extends JsonSerializer<Long> {
@Override
public void serialize(Long value, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException {
jsonGenerator.writeString(value.toString());
}
}
12、热点文章实时计算:计算定时计算与实时计算
13、热点文章实时计算:实时流式计算概念
1)概念
一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算。
流式计算就相当于上图的右侧扶梯,是可以源源不断的产生数据,源源不断的接收数据,没有边界。
2)应用场景
-
日志分析
网站的用户访问日志进行实时的分析,计算访问量,用户画像,留存率等等,实时的进行数据分析,帮助企业进行决策
-
大屏看板统计
可以实时的查看网站注册数量,订单数量,购买数量,金额等。
-
公交实时数据
可以随时更新公交车方位,计算多久到达站牌等
-
实时文章分值计算
头条类文章的分值计算,通过用户的行为实时文章的分值,分值越高就越被推荐。
流式计算应用场景:并发量高,实时要求高,准确度高
3)技术方案选型
OpenResty+Lua脚本(难点高)
-
Hadoop
-
Apache Storm
Storm 是一个分布式实时大数据处理系统,可以帮助我们方便地处理海量数据,具有高可靠、高容错、高扩展的特点。是流式框架,有很高的数据吞吐能力。
-
Kafka Stream (本项目使用)百万并发
可以轻松地将其嵌入任何Java应用程序中,并与用户为其流应用程序所拥有的任何现有打包,部署和操作工具集成。
14、热点文章实时计算:KStream和KTable
1)概述
Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。
Kafka Stream的特点如下:
- Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
- 除了Kafka外,无任何外部依赖
- 充分利用Kafka分区机制实现水平扩展和顺序性保证
- 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)
- 支持正好一次处理语义
- 提供记录级的处理能力,从而实现毫秒级的低延迟
- 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
- 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)
2)Kafka Streams的关键概念
(1)Stream处理拓扑
- 流是Kafka Stream提出的最重要的抽象概念:它表示一个无限的,不断更新的数据集。流是一个有序的,可重放(反复的使用),不可变的容错序列,数据记录的格式是键值对(key-value)。
- 通过Kafka Streams编写一个或多个的计算逻辑的处理器拓扑。其中处理器拓扑是一个由流(边缘)连接的流处理(节点)的图。
- 流处理器是
处理器拓扑
中的一个节点;它表示一个处理的步骤,用来转换流中的数据(从拓扑中的上游处理器一次接受一个输入消息,并且随后产生一个或多个输出消息到其下游处理器中)。
(2)在拓扑中有两个特别的处理器:
- 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。
- Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题。
2)KStream&KTable
(1)数据结构类似于map,如下图,key-value键值对
(2)KStream
KStream数据流(data stream),即是一段顺序的,可以无限长,不断更新的数据集。
数据流中比较常记录的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。
KStream负责抽象的,就是数据流。与Kafka自身topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据。
为了说明这一点,让我们想象一下以下两个数据记录正在发送到流中:
(“ alice”,1)->(“” alice“,3)
如果您的流处理应用是要总结每个用户的价值,它将返回4
了alice
。为什么?因为第二条数据记录将不被视为先前记录的更新。(insert)新数据
(3)KTable
KTable传统数据库,包含了各种存储了大量状态(state)的表格。KTable负责抽象的,就是表状数据。每一次操作,都是更新插入(update)
为了说明这一点,让我们想象一下以下两个数据记录正在发送到流中:
(“ alice”,1)->(“” alice“,3)
如果您的流处理应用是要总结每个用户的价值,它将返回3
了alice
。为什么?因为第二条数据记录将被视为先前记录的更新。
KStream - 每个新数据都包含了部分信息。
KTable - 每次更新都合并到原记录上。
15、热点文章实时计算:入门案例说明
需求:
每隔5秒统一1次消息中的单词数量
上游系统的消息格式:
key value
1001 hello kafka 10
1002 hello heima 10
下游系统接受的结果:
hello 20
kafka 10
heima 10
1)引入依赖
在之前的kafka-demo工程的pom文件中引入
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.client.version}</version>
<exclusions>
<exclusion>
<artifactId>connect-json</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
2)创建类
package com.heima.kafka.simple_stream;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class SampleStream {
public static void main(String[] args) {
//设置参数
Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.66.133:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"simpleStream");
//创建StreamBuilder
StreamsBuilder streamsBuilder = new StreamsBuilder();
streamProcess(streamsBuilder);
//创建KafkaStreams对象
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),props);
//启动程序
kafkaStreams.start();
}
/**
* 核心统计逻辑
* @param streamsBuilder
*/
private static void streamProcess(StreamsBuilder streamsBuilder) {
//从上游接受原始数据
KStream<String, String> kStream = streamsBuilder.stream("input_topic");
//对原始数据分析统计(***)
//1)对value数据进行处理(切割)
//2)把value值覆盖key值
//3) 对key进行分组
//4) 对分组结果统计
KTable<Windowed<Object>, Long> kTable = kStream.flatMapValues(new ValueMapper<String, Iterable<?>>() {
@Override
public Iterable<?> apply(String value) { //hello kafka
String[] array = value.split(" ");
return Arrays.asList(array);
}
}).map(new KeyValueMapper<String, Object, KeyValue<?, ?>>() {
@Override
public KeyValue<?, ?> apply(String key, Object value) {
return new KeyValue<>(value, value);
}
}).groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
.count(Materialized.as("total"));
//把统计结果发送给下游
kTable.toStream()
.map(new KeyValueMapper<Windowed<Object>, Long, KeyValue<?, ?>>() {
@Override
public KeyValue<?, ?> apply(Windowed<Object> windowed, Long value) {
return new KeyValue<>(windowed.key(),value.toString());
}
})
.to("output_topic");
}
}
3)测试
准备
- 使用生产者在topic为:
input_topic
中发送多条消息 - 使用消费者接收topic为:
out_topic
①生产者代码修改SampleStreamProducer
package com.heima.simple_stream;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* 消息生产者
*/
public class ProducerStream {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//准备参数
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.66.133:9092");//服务器连接地址
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//key序列化器
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//value序列化器
props.put(ProducerConfig.RETRIES_CONFIG,"5");//生产者消息发送失败后的重试次数,
//连接Kafka
KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(props);
//创建消息对象
/**
* ProducerRecord:封装消息的对象
* 参数一:队列的主题名称
* 参数二:key
* 参数三:value
*/
for(int i=1;i<=20;i++){
if(i%2==0){
ProducerRecord<String,String> record = new ProducerRecord<>("input_topic","hello kafka");
kafkaProducer.send(record);
}else{
ProducerRecord<String,String> record = new ProducerRecord<>("input_topic","hello heima");
kafkaProducer.send(record);
}
}
//释放连接
kafkaProducer.close();
}
}
②消费者SampleStreamConsumer
package com.heima.simple_stream;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* 消息消费者者
*/
public class ConsumerStream {
public static void main(String[] args) {
//准备参数
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.66.133:9092");//服务器连接地址
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");//key序列化器
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");//value序列化器
props.put(ConsumerConfig.GROUP_ID_CONFIG,"demo_group");//消费组名称
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");//消费组名称
/**
* Kafka的消费者名称
* 多个消费者在监听同一个主题的前提下。
* 1) 如果消费组名不同,该主题下的所有消费者同时接收到消息
* 2)如果消费组名称相同,该主题下的消费者只要一个被接收到消息
*/
//连接Kafka
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(props);
//监听主题
kafkaConsumer.subscribe(Collections.singleton("output_topic"));
while(true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(3000));
for(ConsumerRecord<String,String> record:records){
String key = record.key();
String value = record.value();
System.out.println("消息:"+key+"-----"+value);
}
}
}
}
结果:
- 通过流式计算,会把生产者的多条消息汇总成一条发送到消费者中输出
16、课程总结
1)热点文章定时计算
技术:XXL-Job定时任务
搭建XXLJob服务端(调度中心,执行器,任务)
客户端整合XXLJob(依赖,yml配置,Config配置类,@XXlJob注解)
编写计算热点文章方法(查询近几天文章,数据排序,截取数据,存入redis)
2)文章点赞行为
登录用户才可以做,包含点赞和取消点赞
标签:10,heima,文章,import,org,com,public From: https://www.cnblogs.com/IsMhhla/p/16903767.html